This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git


The following commit(s) were added to refs/heads/main by this push:
     new f4b28204 Improve hanlding of exceptions in the tile stream (#807)
f4b28204 is described below

commit f4b282049fbda04e6f7123ba39f282f6df924dc3
Author: Bertil Chapuis <[email protected]>
AuthorDate: Tue Dec 5 14:56:55 2023 +0100

    Improve hanlding of exceptions in the tile stream (#807)
---
 .../baremaps/stream/BufferedSpliterator.java       |  2 +-
 .../org/apache/baremaps/stream/StreamUtils.java    |  4 ++-
 .../tilestore/postgres/PostgresTileStore.java      |  9 ++---
 .../apache/baremaps/stream/StreamUtilsTest.java    | 42 ++++++++++++++++++++++
 .../tilestore/postgres/PostgresTileStoreTest.java  |  2 +-
 5 files changed, 52 insertions(+), 7 deletions(-)

diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java
 
b/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java
index 3a520933..6cc33854 100644
--- 
a/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java
+++ 
b/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java
@@ -127,7 +127,7 @@ class BufferedSpliterator<T> implements 
Spliterator<CompletableFuture<T>> {
     @Override
     public <T> void registerCompletion(CompletableFuture<T> future,
         Consumer<CompletableFuture<T>> resultConsumer) {
-      future.thenAccept(result -> resultConsumer.accept(future));
+      future.whenComplete((result, error) -> resultConsumer.accept(future));
     }
   }
 
diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java 
b/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java
index 349a2c06..31b53722 100644
--- a/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java
+++ b/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java
@@ -151,7 +151,9 @@ public class StreamUtils {
     return buffer(asyncStream, completionOrder, bufferSize).map(f -> {
       try {
         return f.get();
-      } catch (InterruptedException | ExecutionException e) {
+      } catch (ExecutionException e) {
+        throw new StreamException(e);
+      } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new StreamException(e);
       }
diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java
 
b/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java
index 382adb98..8aa412a6 100644
--- 
a/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java
+++ 
b/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java
@@ -153,7 +153,7 @@ public class PostgresTileStore implements TileStore {
 
           // Add a union between queries
           if (queryCount > 0) {
-            layerSql.append("UNION ");
+            layerSql.append("UNION ALL ");
           }
 
           // Add the sql to the layer sql
@@ -163,8 +163,9 @@ public class PostgresTileStore implements TileStore {
               .replace("?", "??")
               .replace("$zoom", String.valueOf(zoom));
           var querySqlWithParams = String.format(
-              "SELECT ST_AsMVTGeom(t.geom, ST_TileEnvelope(?, ?, ?)) AS geom, 
t.tags, t.id " +
-                  "FROM (%s) AS t WHERE t.geom && ST_TileEnvelope(?, ?, ?, 
margin => (64.0/4096))",
+              "SELECT ST_AsMVTGeom(t.geom, ST_TileEnvelope(?, ?, ?)) AS geom, 
t.tags - 'id' AS tags, t.id AS id "
+                  +
+                  "FROM (%s) AS t WHERE t.geom IS NOT NULL AND t.geom && 
ST_TileEnvelope(?, ?, ?, margin => (64.0/4096))",
               querySql);
           layerSql.append(querySqlWithParams);
 
@@ -195,7 +196,7 @@ public class PostgresTileStore implements TileStore {
     }
 
     // Add the tail of the tile sql
-    var tileQueryTail = " mvtTile";
+    var tileQueryTail = " AS mvtTile";
     tileSql.append(tileQueryTail);
 
     // Format the sql query
diff --git 
a/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java 
b/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java
index ee1cc354..23957274 100644
--- 
a/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java
+++ 
b/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java
@@ -18,8 +18,10 @@
 package org.apache.baremaps.stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.List;
+import java.util.function.Function;
 import java.util.stream.IntStream;
 import org.junit.jupiter.api.Test;
 
@@ -31,4 +33,44 @@ class StreamUtilsTest {
     List<List<Integer>> partitions = StreamUtils.partition(list.stream(), 
10).toList();
     assertEquals(partitions.size(), 10);
   }
+
+  @Test
+  void bufferInSourceOrder() {
+    List<Integer> l1 = IntStream.range(0, 100).boxed().toList();
+    List<Integer> l2 = StreamUtils.bufferInSourceOrder(l1.stream(), i -> i, 
10).toList();
+    assertEquals(l2.size(), l1.size());
+    assertEquals(l2, l1);
+  }
+
+  @Test
+  void bufferInSourceOrderWithException() {
+    assertThrows(StreamException.class, () -> {
+      List<Integer> l1 = IntStream.range(0, 100).boxed().toList();
+      Function<Integer, Integer> throwException = i -> {
+        throw new RuntimeException();
+      };
+      StreamUtils.bufferInSourceOrder(l1.stream(), throwException, 
10).sorted().toList();
+    });
+  }
+
+  @Test
+  void bufferInCompletionOrder() {
+    List<Integer> l1 = IntStream.range(0, 100).boxed().toList();
+    List<Integer> l2 =
+        StreamUtils.bufferInCompletionOrder(l1.stream(), i -> i, 
10).sorted().toList();
+    assertEquals(l2.size(), l1.size());
+    assertEquals(l2, l1);
+  }
+
+  @Test
+  void bufferInCompletionOrderWithException() {
+    assertThrows(StreamException.class, () -> {
+      List<Integer> l1 = IntStream.range(0, 100).boxed().toList();
+      Function<Integer, Integer> throwException = i -> {
+        throw new RuntimeException();
+      };
+      StreamUtils.bufferInCompletionOrder(l1.stream(), throwException, 
10).sorted().toList();
+    });
+  }
+
 }
diff --git 
a/baremaps-core/src/test/java/org/apache/baremaps/tilestore/postgres/PostgresTileStoreTest.java
 
b/baremaps-core/src/test/java/org/apache/baremaps/tilestore/postgres/PostgresTileStoreTest.java
index 24892ac1..ac51eb20 100644
--- 
a/baremaps-core/src/test/java/org/apache/baremaps/tilestore/postgres/PostgresTileStoreTest.java
+++ 
b/baremaps-core/src/test/java/org/apache/baremaps/tilestore/postgres/PostgresTileStoreTest.java
@@ -40,7 +40,7 @@ class PostgresTileStoreTest {
             List.of(new TilesetQuery(0, 20, "SELECT id, tags, geom FROM 
table")))));
     var query = PostgresTileStore.prepareQuery(tileset, 10);
     assertEquals(
-        "SELECT (SELECT ST_AsMVT(mvtGeom.*, 'a') FROM (SELECT 
ST_AsMVTGeom(t.geom, ST_TileEnvelope(?, ?, ?)) AS geom, t.tags, t.id FROM 
(SELECT id, tags, geom FROM table) AS t WHERE t.geom && ST_TileEnvelope(?, ?, 
?, margin => (64.0/4096))) AS mvtGeom) || (SELECT ST_AsMVT(mvtGeom.*, 'b') FROM 
(SELECT ST_AsMVTGeom(t.geom, ST_TileEnvelope(?, ?, ?)) AS geom, t.tags, t.id 
FROM (SELECT id, tags, geom FROM table) AS t WHERE t.geom && ST_TileEnvelope(?, 
?, ?, margin => (64.0/4096))) AS mvtGeom) [...]
+        "SELECT (SELECT ST_AsMVT(mvtGeom.*, 'a') FROM (SELECT 
ST_AsMVTGeom(t.geom, ST_TileEnvelope(?, ?, ?)) AS geom, t.tags - 'id' AS tags, 
t.id AS id FROM (SELECT id, tags, geom FROM table) AS t WHERE t.geom IS NOT 
NULL AND t.geom && ST_TileEnvelope(?, ?, ?, margin => (64.0/4096))) AS mvtGeom) 
|| (SELECT ST_AsMVT(mvtGeom.*, 'b') FROM (SELECT ST_AsMVTGeom(t.geom, 
ST_TileEnvelope(?, ?, ?)) AS geom, t.tags - 'id' AS tags, t.id AS id FROM 
(SELECT id, tags, geom FROM table) AS t WHERE t.geo [...]
         query.sql());
   }
 }

Reply via email to