This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/sql-store in repository https://gitbox.apache.org/repos/asf/sis.git
commit ef0f925a62fbfe0d3936ef2c9e321126db875181 Author: Alexis Manin <[email protected]> AuthorDate: Thu Sep 26 19:19:33 2019 +0200 fix(SQLStore): better handling of parallelization flag. Add a benchmark to test query spliterator flavors. --- storage/sis-sqlstore/pom.xml | 14 +- .../apache/sis/internal/sql/feature/Connector.java | 23 +++- .../sis/internal/sql/feature/FeatureAdapter.java | 2 +- .../sis/internal/sql/feature/QueryBuilder.java | 1 + .../sis/internal/sql/feature/QueryFeatureSet.java | 111 +++++++++++++--- .../sql/feature/QuerySpliteratorsBench.java | 142 +++++++++++++++++++++ .../apache/sis/internal/sql/feature/StreamSQL.java | 19 ++- .../org/apache/sis/internal/sql/feature/Table.java | 2 +- 8 files changed, 286 insertions(+), 28 deletions(-) diff --git a/storage/sis-sqlstore/pom.xml b/storage/sis-sqlstore/pom.xml index 45231fe..1e905f2 100644 --- a/storage/sis-sqlstore/pom.xml +++ b/storage/sis-sqlstore/pom.xml @@ -120,7 +120,7 @@ <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> - <scope>test</scope> + <scope>compile</scope> </dependency> <dependency> <groupId>org.hsqldb</groupId> @@ -132,6 +132,18 @@ <artifactId>postgresql</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <version>1.21</version> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>1.21</version> + </dependency> + </dependencies> </project> diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java index c99bd7f..9ce296c 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java @@ -8,8 +8,29 @@ import org.opengis.feature.Feature; import org.apache.sis.storage.DataStoreException; -public interface Connector { +/** + * Simple abstraction to describe a component capable of loading data from an SQL connection. Used + */ +interface Connector { + /** + * Triggers Loading of data through an existing connection. + * + * @param connection The database connection to use for data loading. Note its the caller responsability to close + * the connection, and it should not be done before stream terminal operation is over. + * @return Features loaded from input connection. It is recommended to implement lazy solutions, however it's an + * implementation dependant choice. + * @throws SQLException If an error occurs while exchanging information with the database. + * @throws DataStoreException If a data model dependant error occurs. + */ Stream<Feature> connect(Connection connection) throws SQLException, DataStoreException; + /** + * Provides an approximate query to resume data loaded. + * + * @param count If the query estimation is needed for a count operation, in which case the returned query should be + * a count query. + * @return SQL query describing the way this component load data. Never null. However, implementations are free to + * throw {@link UnsupportedOperationException} if they do not support such operation. + */ String estimateStatement(final boolean count); } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java index 0162e67..17a345a 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java @@ -57,7 +57,7 @@ class FeatureAdapter { return features; } - static class PropertyMapper { + static final class PropertyMapper { // TODO: by using a indexed implementation of Feature, we could avoid the name mapping. However, a JMH benchmark // would be required in order to be sure it's impacting performance positively. final String propertyName; diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java index 68d798e..a78ed2a 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java @@ -2,6 +2,7 @@ package org.apache.sis.internal.sql.feature; /** * API to allow overrided SQL Stream to delegate a set of intermediate operations to native driver. + * TODO: move as inner interface of {@link StreamSQL} ? */ interface QueryBuilder { diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java index 0565036..fdf518d 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java @@ -4,6 +4,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.List; import java.util.Spliterator; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -78,6 +79,13 @@ public class QueryFeatureSet extends AbstractFeatureSet { private final boolean distinct; /** + * Debug flag to activate (use {@link PrefetchSpliterator}) or de-activate (use {@link ResultSpliterator}) + * batch loading of results. + */ + boolean allowBatchLoading = true; + float fetchRatio = 0.5f; + + /** * Same as {@link #QueryFeatureSet(SQLBuilder, DataSource, Connection)}, except query is provided by a fixed text * instead of a builder. */ @@ -186,19 +194,21 @@ public class QueryFeatureSet extends AbstractFeatureSet { @Override public Stream<Feature> features(boolean parallel) { - return new StreamSQL(new QueryAdapter(queryBuilder), source); + return new StreamSQL(new QueryAdapter(queryBuilder, parallel), source, parallel); } private final class QueryAdapter implements QueryBuilder { private final SQLBuilder source; + private final boolean parallel; private long additionalOffset, additionalLimit; - QueryAdapter(SQLBuilder source) { + QueryAdapter(SQLBuilder source, boolean parallel) { // defensive copy this.source = new SQLBuilder(source); this.source.append(source.toString()); + this.parallel = parallel; } @Override @@ -236,7 +246,7 @@ public class QueryFeatureSet extends AbstractFeatureSet { } Features.addOffsetLimit(source, nativeOffset, nativeLimit); - return new PreparedQueryConnector(source.toString(), javaOffset, javaLimit); + return new PreparedQueryConnector(source.toString(), javaOffset, javaLimit, parallel); } throw new UnsupportedOperationException("Not supported yet: modifying user query"); // "Alexis Manin (Geomatys)" on 24/09/2019 } @@ -246,19 +256,25 @@ public class QueryFeatureSet extends AbstractFeatureSet { final String sql; private long additionalOffset, additionalLimit; + private final boolean parallel; - private PreparedQueryConnector(String sql, long additionalOffset, long additionalLimit) { + private PreparedQueryConnector(String sql, long additionalOffset, long additionalLimit, boolean parallel) { this.sql = sql; this.additionalOffset = additionalOffset; this.additionalLimit = additionalLimit; + this.parallel = parallel; } @Override public Stream<Feature> connect(Connection connection) throws SQLException, DataStoreException { final PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(100); final ResultSet result = statement.executeQuery(); - - Stream<Feature> stream = StreamSupport.stream(new ResultSpliterator(result), false); + final int fetchSize = result.getFetchSize(); + final boolean withPrefetch = !allowBatchLoading || (fetchSize < 1 || fetchSize >= Integer.MAX_VALUE); + final Spliterator<Feature> spliterator = withPrefetch ? + new ResultSpliterator(result) : new PrefetchSpliterator(result, fetchRatio); + Stream<Feature> stream = StreamSupport.stream(spliterator, parallel && withPrefetch); if (additionalLimit > 0) stream = stream.limit(additionalLimit); if (additionalOffset > 0) stream = stream.skip(additionalOffset); @@ -276,19 +292,38 @@ public class QueryFeatureSet extends AbstractFeatureSet { @Override public String estimateStatement(boolean count) { + // Require analysis. Things could get complicated if original user query is already a count. throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)" on 24/09/2019 } } - private final class ResultSpliterator implements Spliterator<Feature> { + private abstract class QuerySpliterator implements java.util.Spliterator<Feature> { final ResultSet result; - private ResultSpliterator(ResultSet result) { + private QuerySpliterator(ResultSet result) { this.result = result; } @Override + public long estimateSize() { + return originLimit > 0 ? originLimit : Long.MAX_VALUE; + } + + @Override + public int characteristics() { + // TODO: determine if it's order by analysing user query. SIZED is not possible, as limit is an upper threshold. + return Spliterator.IMMUTABLE | Spliterator.NONNULL | (distinct ? Spliterator.DISTINCT : 0); + } + } + + private final class ResultSpliterator extends QuerySpliterator { + + private ResultSpliterator(ResultSet result) { + super(result); + } + + @Override public boolean tryAdvance(Consumer<? super Feature> action) { try { if (result.next()) { @@ -305,23 +340,63 @@ public class QueryFeatureSet extends AbstractFeatureSet { public Spliterator<Feature> trySplit() { return null; } + } + + private static SQLBuilder fromQuery(final String query, final Connection conn) throws SQLException { + return new SQLBuilder(conn.getMetaData(), true) + .append(query); + } + + private final class PrefetchSpliterator extends QuerySpliterator { + + final int fetchSize; + + int idx; + List<Feature> chunk; + + private PrefetchSpliterator(ResultSet result) throws SQLException { + this(result, 0.5f); + } + + private PrefetchSpliterator(ResultSet result, float fetchRatio) throws SQLException { + super(result); + this.fetchSize = Math.max((int) (result.getFetchSize()*fetchRatio), 1); + } @Override - public long estimateSize() { - // TODO: economic size estimation ? A count query seems overkill for the aim of this API. Howver, we could - // analyze user query in search for a limit value. - return originLimit > 0 ? originLimit : Long.MAX_VALUE; + public boolean tryAdvance(Consumer<? super Feature> action) { + if (ensureChunkAvailable()) { + action.accept(chunk.get(idx++)); + return true; + } + return false; + } + + public Spliterator<Feature> trySplit() { + if (!ensureChunkAvailable()) return null; + final Spliterator<Feature> chunkSpliterator = idx == 0 ? + chunk.spliterator() : chunk.subList(idx, chunk.size()).spliterator(); + chunk = null; + idx = 0; + return chunkSpliterator; } @Override public int characteristics() { - // TODO: determine if it's sorted by analysing user query. SIZED is not possible, as limit is an upper threshold. - return Spliterator.IMMUTABLE | Spliterator.NONNULL; + return super.characteristics() | Spliterator.CONCURRENT; } - } - private static SQLBuilder fromQuery(final String query, final Connection conn) throws SQLException { - return new SQLBuilder(conn.getMetaData(), true) - .append(query); + private boolean ensureChunkAvailable() { + if (chunk == null || idx >= chunk.size()) { + idx = 0; + try { + chunk = adapter.prefetch(fetchSize, result); + } catch (SQLException e) { + throw new BackingStoreException(e); + } + return chunk != null && !chunk.isEmpty(); + } + return true; + } } } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java new file mode 100644 index 0000000..f2c62ef --- /dev/null +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java @@ -0,0 +1,142 @@ +package org.apache.sis.internal.sql.feature; + +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.sis.internal.metadata.sql.Initializer; + +import org.apache.derby.jdbc.EmbeddedDataSource; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import static org.apache.sis.util.ArgumentChecks.ensureStrictlyPositive; + +@Fork(value = 2, jvmArgs = {"-server", "-Xmx2g"} ) +@Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +public class QuerySpliteratorsBench { + + @State(Scope.Benchmark) + public static class DatabaseInput { + + @Param({"1000", "100000"}) + int numRows; + + @Param({"true", "false"}) + boolean parallel; + + @Param({"true", "false"}) + boolean prefetch; + + EmbeddedDataSource db; + QueryFeatureSet fs; + + public DatabaseInput() {} + + @Setup(Level.Trial) + public void setup() throws SQLException { + ensureStrictlyPositive("Number of rows", numRows); + + db = new EmbeddedDataSource(); + db.setDatabaseName("memory:spliterators"); + db.setDataSourceName("Apache SIS test database"); + db.setCreateDatabase("create"); + + try (Connection c = db.getConnection()) { + c.createStatement().execute( + "CREATE TABLE TEST (str CHARACTER VARYING(20), myInt INTEGER, myDouble DOUBLE)" + ); + final PreparedStatement st = c.prepareStatement("INSERT INTO TEST values (?, ?, ?)"); + + final Random rand = new Random(); + int rows = 1; + final byte[] txt = new byte[20]; + do { + for (int i = 0; i < 500 ; i++, rows++) { + rand.nextBytes(txt); + st.setString(1, new String(txt, StandardCharsets.US_ASCII)); + st.setInt(2, rand.nextInt()); + st.setDouble(3, rand.nextDouble()); + st.addBatch(); + } + st.executeBatch(); + st.clearBatch(); + } while (rows < numRows); + + fs = new QueryFeatureSet("SELECT * FROM TEST", db, c); + fs.allowBatchLoading = prefetch; + } + } + + @TearDown + public void dropDatabase() throws SQLException { + db.setCreateDatabase("no"); + db.setConnectionAttributes("drop=true"); + try { + db.getConnection().close(); + } catch (SQLException e) { // This is the expected exception. + if (!Initializer.isSuccessfulShutdown(e)) { + throw e; + } + } + } + } + + @Benchmark + public void test(DatabaseInput input) throws SQLException { + final int sum = input.fs.features(input.parallel).mapToInt(f -> 1).sum(); + if (sum != input.numRows) throw new AssertionError("..." + sum + "..." + "WTF ?!"); + } +/* + @Test + public void benchmark() throws Exception { + System.out.println("COMMON POOL: "+ ForkJoinPool.getCommonPoolParallelism()); + final DatabaseInput db = new DatabaseInput(); + db.numRows = 100000; + db.parallel = true; + + long start = System.nanoTime(); + db.setup(); + System.out.println("Insertion time: "+((System.nanoTime()-start)/1e6)+" ms"); + + // warmup + for (int i = 0 ; i < 5 ; i++) { + test(db); + test(db); + } + + // go + long prefetch = 0, noprefetch = 0; + for (int i = 0 ; i < 100 ; i++) { + start = System.nanoTime(); + test(db); + prefetch += System.nanoTime()-start; + + start = System.nanoTime(); + test(db); + noprefetch += System.nanoTime()-start; + } + + System.out.println(String.format( + "Performances:%nP: %d%nI: %d", + (long) (prefetch / 1e7), (long) (noprefetch / 1e8) + )); + } +*/ + public static void main(String... args) throws Exception { + org.openjdk.jmh.Main.main(args); + } +} diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java index 890a9dd..a0e9888 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java @@ -78,13 +78,14 @@ class StreamSQL extends StreamDecoration<Feature> { private Consumer<SQLException> warningConsumer = e -> Logging.getLogger("sis.sql").log(Level.FINE, "Cannot properly close a connection", e); - StreamSQL(final Table source) { - this(new Features.Builder(source), source.source); + StreamSQL(final Table source, boolean parallel) { + this(new Features.Builder(source), source.source, parallel); } - StreamSQL(QueryBuilder queryAdapter, DataSource source) { + StreamSQL(QueryBuilder queryAdapter, DataSource source, boolean parallel) { this.queryAdapter = queryAdapter; this.source = source; + this.parallel = parallel; } public void setWarningConsumer(Consumer<SQLException> warningConsumer) { @@ -171,8 +172,14 @@ class StreamSQL extends StreamDecoration<Feature> { @Override public long count() { // Avoid opening a connection if sql text cannot be evaluated. - final String sql = select().estimateStatement(true); - try (Connection conn = source.getConnection()) { + final String sql; + try { + sql = select().estimateStatement(true); + } catch (UnsupportedOperationException e) { + // If underlying connector does not support query estimation, we will fallback on brut-force counting. + return super.count(); + } + try (Connection conn = QueryFeatureSet.connectReadOnly(source)) { try (Statement st = conn.createStatement(); ResultSet rs = st.executeQuery(sql)) { if (rs.next()) { @@ -199,7 +206,7 @@ class StreamSQL extends StreamDecoration<Feature> { }) .onClose(() -> closeRef(connectionRef, true)); if (peekAction != null) featureStream = featureStream.peek(peekAction); - return parallel ? featureStream : featureStream.parallel(); + return parallel ? featureStream.parallel() : featureStream; } /** diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java index 80fc52b..81b8c75 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java @@ -418,7 +418,7 @@ final class Table extends AbstractFeatureSet { */ @Override public Stream<Feature> features(final boolean parallel) throws DataStoreException { - return new StreamSQL(this); + return new StreamSQL(this, parallel); } /**
