This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/sql-store in repository https://gitbox.apache.org/repos/asf/sis.git
commit 3f36c855cff67f19fc7dce84d9e7e4a507b5ae6e Author: Alexis Manin <[email protected]> AuthorDate: Tue Aug 27 14:32:31 2019 +0200 fix(SQLStore): fix peek operation management on overriden SQL stream. --- .../apache/sis/internal/sql/feature/StreamSQL.java | 71 ++++++++-------- .../org/apache/sis/storage/sql/SQLStoreTest.java | 97 ++++++++++++++++++---- 2 files changed, 121 insertions(+), 47 deletions(-) diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java index 1a59655..f6d8c3d 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java @@ -120,7 +120,7 @@ class StreamSQL extends StreamDecoration<Feature> { peekAction = action; } else { // Safe cast, because Stream values are strongly typed to O. - peekAction = peekAction.andThen((Consumer)action); + peekAction = peekAction.andThen((Consumer) action); } return this; @@ -149,7 +149,8 @@ class StreamSQL extends StreamDecoration<Feature> { throw new BackingStoreException("Cannot create SQL COUNT query", e); } try (Connection conn = queryBuilder.parent.source.getConnection()) { - try (final Statement st = conn.createStatement(); final ResultSet rs = st.executeQuery(sql)) { + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(sql)) { if (rs.next()) { return rs.getLong(1); } else return 0; @@ -161,19 +162,21 @@ class StreamSQL extends StreamDecoration<Feature> { @Override protected synchronized Stream<Feature> createDecoratedStream() { - final AtomicReference<Connection> connectionRef = new AtomicReference<>(); - return Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection())) - .map(Supplier::get) - .peek(connectionRef::set) - .flatMap(conn -> { - try { - final Features iter = queryBuilder.build(conn); - return StreamSupport.stream(iter, parallel).onClose(iter); - } catch (SQLException | DataStoreException e) { - throw new BackingStoreException(e); - } - }) - .onClose(() -> queryBuilder.parent.closeRef(connectionRef)); + final AtomicReference<Connection> connectionRef = new AtomicReference<>(); + Stream<Feature> featureStream = Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection())) + .map(Supplier::get) + .peek(connectionRef::set) + .flatMap(conn -> { + try { + final Features iter = queryBuilder.build(conn); + return StreamSupport.stream(iter, parallel).onClose(iter); + } catch (SQLException | DataStoreException e) { + throw new BackingStoreException(e); + } + }) + .onClose(() -> queryBuilder.parent.closeRef(connectionRef)); + if (peekAction != null) featureStream = featureStream.peek(peekAction); + return featureStream; } /** @@ -202,10 +205,9 @@ class StreamSQL extends StreamDecoration<Feature> { * @param <I> Type of object received as input of mapping operation. * @param <O> Return type of mapping operation. */ - private static class MappedStream<I, O> extends StreamDecoration<O> { - private final Function<? super I, ? extends O> mapper; + private static final class MappedStream<I, O> extends StreamDecoration<O> { + private Function<? super I, ? extends O> mapper; private Stream<I> source; - private Consumer<? super O> peekAction; private MappedStream(Function<? super I, ? extends O> mapper, Stream<I> source) { this.mapper = mapper; @@ -213,15 +215,8 @@ class StreamSQL extends StreamDecoration<Feature> { } @Override - @SuppressWarnings("unchecked") public Stream<O> peek(Consumer<? super O> action) { - if (peekAction == null) { - peekAction = action; - } else { - // Safe cast, because Stream values are strongly typed to O. - peekAction = peekAction.andThen((Consumer)action); - } - + mapper = concatenate(mapper, action); return this; } @@ -280,8 +275,7 @@ class StreamSQL extends StreamDecoration<Feature> { // Break possible infinite loop by sinking source content through its spliterator (terminal op). final Stream<I> sink = StreamSupport.stream(source.spliterator(), source.isParallel()); sink.onClose(source::close); - final Stream<O> result = sink.map(mapper); - return peekAction == null? result : result.peek(peekAction); + return sink.map(mapper); } } @@ -291,12 +285,10 @@ class StreamSQL extends StreamDecoration<Feature> { * * @param <T> Type of objects contained in source stream (before double mapping). */ - private static class ToDoubleStream<T> extends DoubleStreamDecoration { + private static final class ToDoubleStream<T> extends DoubleStreamDecoration { Stream<T> source; - final ToDoubleFunction<T> toDouble; - - private DoubleConsumer peekAction; + ToDoubleFunction<T> toDouble; private ToDoubleStream(Stream<T> source, ToDoubleFunction<T> toDouble) { this.source = source; @@ -305,7 +297,12 @@ class StreamSQL extends StreamDecoration<Feature> { @Override public DoubleStream peek(DoubleConsumer action) { - peekAction = peekAction == null? action : peekAction.andThen(action); + final ToDoubleFunction<T> toDoubleFixedRef = toDouble; + toDouble = t -> { + final double value = toDoubleFixedRef.applyAsDouble(t); + action.accept(value); + return value; + }; return this; } @@ -372,4 +369,12 @@ class StreamSQL extends StreamDecoration<Feature> { return sink.mapToDouble(toDouble); } } + + private static <I, O> Function<? super I, ? extends O> concatenate(final Function<? super I, ? extends O> function, final Consumer<? super O> consumer) { + return i -> { + final O o = function.apply(i); + consumer.accept(o); + return o; + }; + } } diff --git a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java index 7269f20..271bdb7 100644 --- a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java +++ b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java @@ -16,26 +16,37 @@ */ package org.apache.sis.storage.sql; -import java.util.Map; -import java.util.HashMap; -import java.util.HashSet; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; + +import org.opengis.feature.AttributeType; +import org.opengis.feature.Feature; +import org.opengis.feature.FeatureAssociationRole; +import org.opengis.feature.FeatureType; +import org.opengis.feature.PropertyType; + +import org.apache.sis.storage.DataStoreException; import org.apache.sis.storage.FeatureSet; import org.apache.sis.storage.StorageConnector; -import org.apache.sis.test.sql.TestDatabase; import org.apache.sis.test.TestCase; +import org.apache.sis.test.sql.TestDatabase; + import org.junit.Test; -import static org.apache.sis.test.Assert.*; +import static org.apache.sis.test.Assert.assertEquals; +import static org.apache.sis.test.Assert.assertInstanceOf; +import static org.apache.sis.test.Assert.assertNotEquals; +import static org.apache.sis.test.Assert.assertNotNull; +import static org.apache.sis.test.Assert.assertSame; +import static org.apache.sis.test.Assert.assertTrue; +import static org.apache.sis.test.Assert.fail; // Branch-dependent imports -import org.opengis.feature.Feature; -import org.opengis.feature.FeatureType; -import org.opengis.feature.PropertyType; -import org.opengis.feature.AttributeType; -import org.opengis.feature.FeatureAssociationRole; /** @@ -52,6 +63,13 @@ public final strictfp class SQLStoreTest extends TestCase { */ private static final String SCHEMA = "features"; + private static final int[] POPULATIONS = { + 13622267, // Tokyo, 2016. + 2206488, // Paris, 2017. + 1704694, // Montréal, 2016. + 531902 // Québec, 2016. + }; + /** * Number of time that the each country has been seen while iterating over the cities. */ @@ -130,6 +148,10 @@ public final strictfp class SQLStoreTest extends TestCase { try (Stream<Feature> features = cities.features(false)) { features.forEach((f) -> verifyContent(f)); } + + // Now, we'll check that overloaded stream operations are functionally stable, even stacked. + verifyStreamOperations(cities); + } } assertEquals(Integer.valueOf(2), countryCount.remove("CAN")); @@ -139,6 +161,53 @@ public final strictfp class SQLStoreTest extends TestCase { } /** + * Checks that operations stacked on feature stream are well executed. This test focus on mapping and peeking + * actions overloaded by sql streams. We'd like to test skip and limit operations too, but ignore it for now, + * because ordering of results matters for such a test. + * + * @implNote Most of stream operations used here are meaningless. We just want to ensure that the pipeline does not + * skip any operation. + * + * @param cities The feature set to read from. We expect a feature set containing all cities defined for the test + * class. + * @throws DataStoreException Let's propagate any error raised by input feature set. + */ + private static void verifyStreamOperations(final FeatureSet cities) throws DataStoreException { + try (Stream<Feature> features = cities.features(false)) { + final AtomicInteger peekCount = new AtomicInteger(); + final AtomicInteger mapCount = new AtomicInteger(); + final long populations = features.peek(f -> peekCount.incrementAndGet()) + .peek(f -> peekCount.incrementAndGet()) + .map(f -> { + mapCount.incrementAndGet(); + return f; + }) + .peek(f -> peekCount.incrementAndGet()) + .map(f -> { + mapCount.incrementAndGet(); + return f; + }) + .map(f -> f.getPropertyValue("population")) + .mapToDouble(obj -> ((Number) obj).doubleValue()) + .peek(f -> peekCount.incrementAndGet()) + .peek(f -> peekCount.incrementAndGet()) + .boxed() + .mapToDouble(d -> {mapCount.incrementAndGet(); return d;}) + .mapToObj(d -> {mapCount.incrementAndGet(); return d;}) + .mapToDouble(d -> {mapCount.incrementAndGet(); return d;}) + .map(d -> {mapCount.incrementAndGet(); return d;}) + .mapToLong(d -> (long) d) + .sum(); + + long expectedPopulations = 0; + for (long pop : POPULATIONS) expectedPopulations += pop; + assertEquals("Overall population count via Stream pipeline", expectedPopulations, populations); + assertEquals("Number of mapping (by element in the stream)", 24, mapCount.get()); + assertEquals("Number of peeking (by element in the stream)", 20, peekCount.get()); + } + } + + /** * Verifies the result of analyzing the structure of the {@code "Cities"} table. */ private static void verifyFeatureType(final FeatureType type, final String[] expectedNames, final Object[] expectedTypes) { @@ -178,7 +247,7 @@ public final strictfp class SQLStoreTest extends TestCase { englishName = "Tōkyō"; country = "JPN"; countryName = "日本"; - population = 13622267; // In 2016. + population = POPULATIONS[0]; parks = new String[] {"Yoyogi-kōen", "Shinjuku Gyoen"}; break; } @@ -186,7 +255,7 @@ public final strictfp class SQLStoreTest extends TestCase { englishName = "Paris"; country = "FRA"; countryName = "France"; - population = 2206488; // In 2017. + population = POPULATIONS[1]; parks = new String[] {"Tuileries Garden", "Luxembourg Garden"}; break; } @@ -194,7 +263,7 @@ public final strictfp class SQLStoreTest extends TestCase { englishName = "Montreal"; country = "CAN"; countryName = "Canada"; - population = 1704694; // In 2016. + population = POPULATIONS[2]; isCanada = true; parks = new String[] {"Mount Royal"}; break; @@ -203,7 +272,7 @@ public final strictfp class SQLStoreTest extends TestCase { englishName = "Quebec"; country = "CAN"; countryName = "Canada"; - population = 531902; // In 2016. + population = POPULATIONS[3]; isCanada = true; parks = new String[] {}; break;
