This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch fix-stream-exceptions
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git

commit 4be92946038881f89a39cb33c7b1c955a9498e35
Author: Bertil Chapuis <[email protected]>
AuthorDate: Sat Dec 2 13:26:08 2023 +0100

    Improve hanlding of exceptions in the tile stream
---
 .../baremaps/stream/BufferedSpliterator.java       |  2 +-
 .../org/apache/baremaps/stream/StreamUtils.java    |  1 -
 .../tilestore/postgres/PostgresTileStore.java      |  6 ++--
 .../apache/baremaps/stream/StreamUtilsTest.java    | 42 ++++++++++++++++++++++
 4 files changed, 46 insertions(+), 5 deletions(-)

diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java
 
b/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java
index 3a520933..6cc33854 100644
--- 
a/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java
+++ 
b/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java
@@ -127,7 +127,7 @@ class BufferedSpliterator<T> implements 
Spliterator<CompletableFuture<T>> {
     @Override
     public <T> void registerCompletion(CompletableFuture<T> future,
         Consumer<CompletableFuture<T>> resultConsumer) {
-      future.thenAccept(result -> resultConsumer.accept(future));
+      future.whenComplete((result, error) -> resultConsumer.accept(future));
     }
   }
 
diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java 
b/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java
index 349a2c06..707fd198 100644
--- a/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java
+++ b/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java
@@ -152,7 +152,6 @@ public class StreamUtils {
       try {
         return f.get();
       } catch (InterruptedException | ExecutionException e) {
-        Thread.currentThread().interrupt();
         throw new StreamException(e);
       }
     });
diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java
 
b/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java
index 382adb98..60cceffd 100644
--- 
a/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java
+++ 
b/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java
@@ -153,7 +153,7 @@ public class PostgresTileStore implements TileStore {
 
           // Add a union between queries
           if (queryCount > 0) {
-            layerSql.append("UNION ");
+            layerSql.append("UNION ALL ");
           }
 
           // Add the sql to the layer sql
@@ -164,7 +164,7 @@ public class PostgresTileStore implements TileStore {
               .replace("$zoom", String.valueOf(zoom));
           var querySqlWithParams = String.format(
               "SELECT ST_AsMVTGeom(t.geom, ST_TileEnvelope(?, ?, ?)) AS geom, 
t.tags, t.id " +
-                  "FROM (%s) AS t WHERE t.geom && ST_TileEnvelope(?, ?, ?, 
margin => (64.0/4096))",
+                  "FROM (%s) AS t WHERE t.geom IS NOT NULL AND t.geom && 
ST_TileEnvelope(?, ?, ?, margin => (64.0/4096))",
               querySql);
           layerSql.append(querySqlWithParams);
 
@@ -195,7 +195,7 @@ public class PostgresTileStore implements TileStore {
     }
 
     // Add the tail of the tile sql
-    var tileQueryTail = " mvtTile";
+    var tileQueryTail = " AS mvtTile";
     tileSql.append(tileQueryTail);
 
     // Format the sql query
diff --git 
a/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java 
b/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java
index ee1cc354..23957274 100644
--- 
a/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java
+++ 
b/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java
@@ -18,8 +18,10 @@
 package org.apache.baremaps.stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.List;
+import java.util.function.Function;
 import java.util.stream.IntStream;
 import org.junit.jupiter.api.Test;
 
@@ -31,4 +33,44 @@ class StreamUtilsTest {
     List<List<Integer>> partitions = StreamUtils.partition(list.stream(), 
10).toList();
     assertEquals(partitions.size(), 10);
   }
+
+  @Test
+  void bufferInSourceOrder() {
+    List<Integer> l1 = IntStream.range(0, 100).boxed().toList();
+    List<Integer> l2 = StreamUtils.bufferInSourceOrder(l1.stream(), i -> i, 
10).toList();
+    assertEquals(l2.size(), l1.size());
+    assertEquals(l2, l1);
+  }
+
+  @Test
+  void bufferInSourceOrderWithException() {
+    assertThrows(StreamException.class, () -> {
+      List<Integer> l1 = IntStream.range(0, 100).boxed().toList();
+      Function<Integer, Integer> throwException = i -> {
+        throw new RuntimeException();
+      };
+      StreamUtils.bufferInSourceOrder(l1.stream(), throwException, 
10).sorted().toList();
+    });
+  }
+
+  @Test
+  void bufferInCompletionOrder() {
+    List<Integer> l1 = IntStream.range(0, 100).boxed().toList();
+    List<Integer> l2 =
+        StreamUtils.bufferInCompletionOrder(l1.stream(), i -> i, 
10).sorted().toList();
+    assertEquals(l2.size(), l1.size());
+    assertEquals(l2, l1);
+  }
+
+  @Test
+  void bufferInCompletionOrderWithException() {
+    assertThrows(StreamException.class, () -> {
+      List<Integer> l1 = IntStream.range(0, 100).boxed().toList();
+      Function<Integer, Integer> throwException = i -> {
+        throw new RuntimeException();
+      };
+      StreamUtils.bufferInCompletionOrder(l1.stream(), throwException, 
10).sorted().toList();
+    });
+  }
+
 }

Reply via email to