This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch fix-stream-exceptions in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit 4be92946038881f89a39cb33c7b1c955a9498e35 Author: Bertil Chapuis <[email protected]> AuthorDate: Sat Dec 2 13:26:08 2023 +0100 Improve hanlding of exceptions in the tile stream --- .../baremaps/stream/BufferedSpliterator.java | 2 +- .../org/apache/baremaps/stream/StreamUtils.java | 1 - .../tilestore/postgres/PostgresTileStore.java | 6 ++-- .../apache/baremaps/stream/StreamUtilsTest.java | 42 ++++++++++++++++++++++ 4 files changed, 46 insertions(+), 5 deletions(-) diff --git a/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java b/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java index 3a520933..6cc33854 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/stream/BufferedSpliterator.java @@ -127,7 +127,7 @@ class BufferedSpliterator<T> implements Spliterator<CompletableFuture<T>> { @Override public <T> void registerCompletion(CompletableFuture<T> future, Consumer<CompletableFuture<T>> resultConsumer) { - future.thenAccept(result -> resultConsumer.accept(future)); + future.whenComplete((result, error) -> resultConsumer.accept(future)); } } diff --git a/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java b/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java index 349a2c06..707fd198 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/stream/StreamUtils.java @@ -152,7 +152,6 @@ public class StreamUtils { try { return f.get(); } catch (InterruptedException | ExecutionException e) { - Thread.currentThread().interrupt(); throw new StreamException(e); } }); diff --git a/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java b/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java index 382adb98..60cceffd 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/tilestore/postgres/PostgresTileStore.java @@ -153,7 +153,7 @@ public class PostgresTileStore implements TileStore { // Add a union between queries if (queryCount > 0) { - layerSql.append("UNION "); + layerSql.append("UNION ALL "); } // Add the sql to the layer sql @@ -164,7 +164,7 @@ public class PostgresTileStore implements TileStore { .replace("$zoom", String.valueOf(zoom)); var querySqlWithParams = String.format( "SELECT ST_AsMVTGeom(t.geom, ST_TileEnvelope(?, ?, ?)) AS geom, t.tags, t.id " + - "FROM (%s) AS t WHERE t.geom && ST_TileEnvelope(?, ?, ?, margin => (64.0/4096))", + "FROM (%s) AS t WHERE t.geom IS NOT NULL AND t.geom && ST_TileEnvelope(?, ?, ?, margin => (64.0/4096))", querySql); layerSql.append(querySqlWithParams); @@ -195,7 +195,7 @@ public class PostgresTileStore implements TileStore { } // Add the tail of the tile sql - var tileQueryTail = " mvtTile"; + var tileQueryTail = " AS mvtTile"; tileSql.append(tileQueryTail); // Format the sql query diff --git a/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java b/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java index ee1cc354..23957274 100644 --- a/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java +++ b/baremaps-core/src/test/java/org/apache/baremaps/stream/StreamUtilsTest.java @@ -18,8 +18,10 @@ package org.apache.baremaps.stream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.List; +import java.util.function.Function; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; @@ -31,4 +33,44 @@ class StreamUtilsTest { List<List<Integer>> partitions = StreamUtils.partition(list.stream(), 10).toList(); assertEquals(partitions.size(), 10); } + + @Test + void bufferInSourceOrder() { + List<Integer> l1 = IntStream.range(0, 100).boxed().toList(); + List<Integer> l2 = StreamUtils.bufferInSourceOrder(l1.stream(), i -> i, 10).toList(); + assertEquals(l2.size(), l1.size()); + assertEquals(l2, l1); + } + + @Test + void bufferInSourceOrderWithException() { + assertThrows(StreamException.class, () -> { + List<Integer> l1 = IntStream.range(0, 100).boxed().toList(); + Function<Integer, Integer> throwException = i -> { + throw new RuntimeException(); + }; + StreamUtils.bufferInSourceOrder(l1.stream(), throwException, 10).sorted().toList(); + }); + } + + @Test + void bufferInCompletionOrder() { + List<Integer> l1 = IntStream.range(0, 100).boxed().toList(); + List<Integer> l2 = + StreamUtils.bufferInCompletionOrder(l1.stream(), i -> i, 10).sorted().toList(); + assertEquals(l2.size(), l1.size()); + assertEquals(l2, l1); + } + + @Test + void bufferInCompletionOrderWithException() { + assertThrows(StreamException.class, () -> { + List<Integer> l1 = IntStream.range(0, 100).boxed().toList(); + Function<Integer, Integer> throwException = i -> { + throw new RuntimeException(); + }; + StreamUtils.bufferInCompletionOrder(l1.stream(), throwException, 10).sorted().toList(); + }); + } + }
