This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch 745-daylight
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git

commit 51cc13bccc2d8bfb7661369d890ee7d147ed8a5c
Author: Bertil Chapuis <[email protected]>
AuthorDate: Sat Sep 23 19:36:10 2023 +0200

    Implement compression at the import stage
---
 .../org/apache/baremaps/utils/Compression.java     | 43 ++++++++++++++++++++++
 .../baremaps/workflow/tasks/DecompressFile.java    |  3 +-
 .../baremaps/workflow/tasks/ImportOsmChange.java   |  9 ++++-
 3 files changed, 52 insertions(+), 3 deletions(-)

diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/utils/Compression.java 
b/baremaps-core/src/main/java/org/apache/baremaps/utils/Compression.java
new file mode 100644
index 00000000..4e204aaa
--- /dev/null
+++ b/baremaps-core/src/main/java/org/apache/baremaps/utils/Compression.java
@@ -0,0 +1,43 @@
+package org.apache.baremaps.utils;
+
+import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public enum Compression {
+    none,
+    gzip,
+    bzip2;
+
+    public static Compression detect(Path file) {
+        if (file.toString().endsWith(".gz")) {
+            return gzip;
+        } else if (file.toString().endsWith(".bz2")) {
+            return bzip2;
+        } else {
+            return none;
+        }
+    }
+
+    public InputStream decompress(InputStream inputStream) throws IOException {
+        return switch (this) {
+            case gzip -> new GZIPInputStream(inputStream);
+            case bzip2 -> new BZip2CompressorInputStream(inputStream);
+            default -> inputStream;
+        };
+    }
+
+    public OutputStream compress(OutputStream outputStream) throws IOException 
{
+        return switch (this) {
+            case gzip -> new GZIPOutputStream(outputStream);
+            case bzip2 -> new BZip2CompressorOutputStream(outputStream);
+            default -> outputStream;
+        };
+    }
+}
diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/DecompressFile.java
 
b/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/DecompressFile.java
index 9cfb8b3b..7b1ed2de 100644
--- 
a/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/DecompressFile.java
+++ 
b/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/DecompressFile.java
@@ -19,6 +19,7 @@ import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.ZipFile;
+
 import org.apache.baremaps.workflow.Task;
 import org.apache.baremaps.workflow.WorkflowContext;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -34,7 +35,7 @@ public record DecompressFile(Path source, Path target, 
Compression compression)
     targz,
     tarbz2,
     gzip,
-    bzip2
+    bzip2;
   }
 
   private static final Logger logger = 
LoggerFactory.getLogger(UngzipFile.class);
diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/ImportOsmChange.java
 
b/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/ImportOsmChange.java
index f32f8c6f..4066f75f 100644
--- 
a/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/ImportOsmChange.java
+++ 
b/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/ImportOsmChange.java
@@ -23,15 +23,20 @@ import 
org.apache.baremaps.openstreetmap.function.EntityProjectionTransformer;
 import org.apache.baremaps.openstreetmap.postgres.*;
 import org.apache.baremaps.openstreetmap.repository.ChangeImporter;
 import org.apache.baremaps.openstreetmap.xml.XmlChangeReader;
+import org.apache.baremaps.utils.Compression;
 import org.apache.baremaps.workflow.Task;
 import org.apache.baremaps.workflow.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public record ImportOsmChange(Path file, String database, Integer srid) 
implements Task {
+public record ImportOsmChange(Path file, String database, Integer srid, 
Compression compression) implements Task {
 
   private static final Logger logger = 
LoggerFactory.getLogger(ImportOsmChange.class);
 
+  public ImportOsmChange(Path file, String database, Integer srid) {
+    this (file, database, srid, Compression.detect(file));
+  }
+
   @Override
   public void execute(WorkflowContext context) throws Exception {
     var datasource = context.getDataSource(database);
@@ -49,7 +54,7 @@ public record ImportOsmChange(Path file, String database, 
Integer srid) implemen
     var prepareChange = consumeThenReturn(prepareGeometries);
     var importChange = new ChangeImporter(nodeRepository, wayRepository, 
relationRepository);
 
-    try (var changeInputStream = new 
BufferedInputStream(Files.newInputStream(file))) {
+    try (var changeInputStream = new 
BufferedInputStream(compression.decompress(Files.newInputStream(file)))) {
       new 
XmlChangeReader().stream(changeInputStream).map(prepareChange).forEach(importChange);
     }
   }

Reply via email to