This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/sql-store in repository https://gitbox.apache.org/repos/asf/sis.git
commit 249f0f37568a369dc7c9873af1dfd13d7327f559 Author: Alexis Manin <[email protected]> AuthorDate: Mon Sep 30 15:35:31 2019 +0200 wip(SQLStore): improve unit tests and minor cleanup. --- .../java/org/apache/sis/test/sql/TestDatabase.java | 49 +++++++++------- .../sis/internal/sql/feature/FeatureAdapter.java | 7 ++- .../apache/sis/internal/sql/feature/Features.java | 8 +-- .../sis/internal/sql/feature/QueryBuilder.java | 18 ------ .../sis/internal/sql/feature/QueryFeatureSet.java | 67 ++++++++++++++-------- .../sis/internal/sql/feature/SQLQueryBuilder.java | 28 --------- .../apache/sis/internal/sql/feature/StreamSQL.java | 16 ++++++ .../org/apache/sis/storage/sql/SQLStoreTest.java | 44 ++++++++------ 8 files changed, 123 insertions(+), 114 deletions(-) diff --git a/core/sis-metadata/src/test/java/org/apache/sis/test/sql/TestDatabase.java b/core/sis-metadata/src/test/java/org/apache/sis/test/sql/TestDatabase.java index a3d0255..9bb02a1 100644 --- a/core/sis-metadata/src/test/java/org/apache/sis/test/sql/TestDatabase.java +++ b/core/sis-metadata/src/test/java/org/apache/sis/test/sql/TestDatabase.java @@ -17,23 +17,27 @@ package org.apache.sis.test.sql; import java.io.IOException; -import javax.sql.DataSource; import java.sql.Connection; -import java.sql.Statement; import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.SQLDataException; -import org.postgresql.PGProperty; -import org.postgresql.ds.PGSimpleDataSource; -import org.hsqldb.jdbc.JDBCDataSource; -import org.hsqldb.jdbc.JDBCPool; -import org.apache.derby.jdbc.EmbeddedDataSource; +import java.sql.SQLException; +import java.sql.Statement; +import javax.sql.DataSource; + import org.apache.sis.internal.metadata.sql.Initializer; import org.apache.sis.internal.metadata.sql.ScriptRunner; import org.apache.sis.test.TestCase; import org.apache.sis.util.Debug; -import static org.junit.Assume.*; +import org.apache.derby.jdbc.EmbeddedDataSource; + +import org.hsqldb.jdbc.JDBCDataSource; +import org.hsqldb.jdbc.JDBCPool; +import org.postgresql.PGProperty; +import org.postgresql.ds.PGSimpleDataSource; + +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeTrue; /** @@ -203,29 +207,32 @@ public strictfp class TestDatabase implements AutoCloseable { * Current version does not use pooling on the assumption * that connections to local host are fast enough. */ - try (Connection c = ds.getConnection()) { - try (ResultSet reflect = c.getMetaData().getSchemas(null, schema)) { - if (reflect.next()) { - throw new SQLDataException("Schema \"" + schema + "\" already exists in \"" + NAME + "\"."); + if (create) { + try (Connection c = ds.getConnection()) { + try (ResultSet reflect = c.getMetaData().getSchemas(null, schema)) { + if (reflect.next()) { + throw new SQLDataException("Schema \"" + schema + "\" already exists in \"" + NAME + "\"."); + } } - } - if (create) { try (Statement s = c.createStatement()) { s.execute("CREATE SCHEMA \"" + schema + '"'); } + + } catch (SQLException e) { + final String state = e.getSQLState(); + assumeFalse("This test needs a PostgreSQL server running on the local host.", "08001".equals(state)); + assumeFalse("This test needs a PostgreSQL database named \"" + NAME + "\".", "3D000".equals(state)); + throw e; } - } catch (SQLException e) { - final String state = e.getSQLState(); - assumeFalse("This test needs a PostgreSQL server running on the local host.", "08001".equals(state)); - assumeFalse("This test needs a PostgreSQL database named \"" + NAME + "\".", "3D000".equals(state)); - throw e; } return new TestDatabase(ds) { @Override public void close() throws SQLException { final PGSimpleDataSource ds = (PGSimpleDataSource) source; try (Connection c = ds.getConnection()) { try (Statement s = c.createStatement()) { - s.execute("DROP SCHEMA \"" + ds.getCurrentSchema() + "\" CASCADE"); + // Prevents test to hang indefinitely if connections are not properly released in test cases. + s.setQueryTimeout(10); + s.execute("DROP SCHEMA \"" + ds.getCurrentSchema() + "\" CASCADE;"); } } } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java index 17a345a..e346571 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java @@ -1,5 +1,6 @@ package org.apache.sis.internal.sql.feature; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; @@ -26,7 +27,7 @@ class FeatureAdapter { this.attributeMappers = Collections.unmodifiableList(new ArrayList<>(attributeMappers)); } - Feature read(final ResultSet cursor) throws SQLException { + Feature read(final ResultSet cursor, final Connection origin) throws SQLException { final Feature result = readAttributes(cursor); addImports(result, cursor); addExports(result); @@ -47,11 +48,11 @@ class FeatureAdapter { return result; } - List<Feature> prefetch(final int size, final ResultSet cursor) throws SQLException { + List<Feature> prefetch(final int size, final ResultSet cursor, final Connection origin) throws SQLException { // TODO: optimize by resolving import associations by batch import fetching. final ArrayList<Feature> features = new ArrayList<>(size); for (int i = 0 ; i < size && cursor.next() ; i++) { - features.add(read(cursor)); + features.add(read(cursor, origin)); } return features; diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java index 2950311..9559bd7 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java @@ -559,7 +559,7 @@ final class Features implements Spliterator<Feature> { } } - static class Builder implements QueryBuilder { + static class Builder implements StreamSQL.QueryBuilder { final Table parent; long limit, offset; @@ -582,19 +582,19 @@ final class Features implements Spliterator<Feature> { } @Override - public QueryBuilder limit(long limit) { + public StreamSQL.QueryBuilder limit(long limit) { this.limit = limit; return this; } @Override - public QueryBuilder offset(long offset) { + public StreamSQL.QueryBuilder offset(long offset) { this.offset = offset; return this; } @Override - public QueryBuilder distinct(boolean activate) { + public StreamSQL.QueryBuilder distinct(boolean activate) { this.distinct = activate; return this; } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java deleted file mode 100644 index a78ed2a..0000000 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.apache.sis.internal.sql.feature; - -/** - * API to allow overrided SQL Stream to delegate a set of intermediate operations to native driver. - * TODO: move as inner interface of {@link StreamSQL} ? - */ -interface QueryBuilder { - - QueryBuilder limit(long limit); - - QueryBuilder offset(long offset); - - default QueryBuilder distinct() { return distinct(true); } - - QueryBuilder distinct(boolean activate); - - Connector select(ColumnRef... columns); -} diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java index dd0741e..6c16b6a 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java @@ -20,6 +20,7 @@ import org.apache.sis.internal.metadata.sql.SQLBuilder; import org.apache.sis.internal.storage.AbstractFeatureSet; import org.apache.sis.storage.DataStoreException; import org.apache.sis.util.collection.BackingStoreException; +import org.apache.sis.util.logging.WarningListeners; /** * Stores SQL query given at built time, and execute it when calling {@link #features(boolean) data stream}. Note that @@ -126,20 +127,20 @@ public class QueryFeatureSet extends AbstractFeatureSet { * can use {@link #QueryFeatureSet(SQLBuilder, DataSource, Connection) another constructor}. */ QueryFeatureSet(SQLBuilder queryBuilder, Analyzer analyzer, DataSource source, Connection conn) throws SQLException { - super(analyzer.listeners); - this.source = source; + this(queryBuilder, createAdapter(queryBuilder, analyzer, conn), analyzer.listeners, source, conn); + } - String sql = queryBuilder.toString(); - try (PreparedStatement statement = conn.prepareStatement(sql)) { - final SQLTypeSpecification spec = analyzer.create(statement, sql, null); - adapter = analyzer.buildAdapter(spec); - } + QueryFeatureSet(SQLBuilder queryBuilder, FeatureAdapter adapter, WarningListeners listeners, DataSource source, Connection conn) throws SQLException { + super(listeners); + this.source = source; + this.adapter = adapter; /* We will now try to parse offset and limit from input query. If we encounter unsupported/ambiguous case, * we will fallback to pure java management of additional limit and offset. * If we successfully retrieve offset and limit, we'll modify user query to take account of additional * parameters given later. */ + String sql = queryBuilder.toString(); long tmpOffset = 0, tmpLimit = 0; try { Matcher matcher = OFFSET_PATTERN.matcher(sql); @@ -205,7 +206,7 @@ public class QueryFeatureSet extends AbstractFeatureSet { return new StreamSQL(new QueryAdapter(queryBuilder, parallel), source, parallel); } - private final class QueryAdapter implements QueryBuilder { + private final class QueryAdapter implements StreamSQL.QueryBuilder { private final SQLBuilder source; private final boolean parallel; @@ -220,19 +221,19 @@ public class QueryFeatureSet extends AbstractFeatureSet { } @Override - public QueryBuilder limit(long limit) { + public StreamSQL.QueryBuilder limit(long limit) { additionalLimit = limit; return this; } @Override - public QueryBuilder offset(long offset) { + public StreamSQL.QueryBuilder offset(long offset) { additionalOffset = offset; return this; } @Override - public QueryBuilder distinct(boolean activate) { + public StreamSQL.QueryBuilder distinct(boolean activate) { if (distinct == activate) return this; throw new UnsupportedOperationException("Not supported yet: modifying user query"); // "Alexis Manin (Geomatys)" on 24/09/2019 } @@ -260,9 +261,21 @@ public class QueryFeatureSet extends AbstractFeatureSet { } } + private static FeatureAdapter createAdapter(SQLBuilder queryBuilder, Analyzer analyzer, Connection conn) throws SQLException { + String sql = queryBuilder.toString(); + try (PreparedStatement statement = conn.prepareStatement(sql)) { + final SQLTypeSpecification spec = analyzer.create(statement, sql, null); + return analyzer.buildAdapter(spec); + } + } + private final class PreparedQueryConnector implements Connector { final String sql; + /** + * In some cases, detection/modification of SQL offset and limit parameters can fail. In such cases, we amend + * result stream with pure java {@link Stream#skip(long) offset} and {@link Stream#limit(long) limit}. + */ private long additionalOffset, additionalLimit; private final boolean parallel; @@ -281,7 +294,7 @@ public class QueryFeatureSet extends AbstractFeatureSet { final int fetchSize = result.getFetchSize(); final boolean withPrefetch = !allowBatchLoading || (fetchSize < 1 || fetchSize >= Integer.MAX_VALUE); final Spliterator<Feature> spliterator = withPrefetch ? - new ResultSpliterator(result) : new PrefetchSpliterator(result, fetchRatio); + new ResultSpliterator(result, connection) : new PrefetchSpliterator(result, connection, fetchRatio); Stream<Feature> stream = StreamSupport.stream(spliterator, parallel && withPrefetch); if (additionalLimit > 0) stream = stream.limit(additionalLimit); if (additionalOffset > 0) stream = stream.skip(additionalOffset); @@ -300,8 +313,12 @@ public class QueryFeatureSet extends AbstractFeatureSet { @Override public String estimateStatement(boolean count) { - // Require analysis. Things could get complicated if original user query is already a count. - throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)" on 24/09/2019 + if (count) { + // We should check if user query is already a count operation, in which case we would return "select 1" + throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)" on 24/09/2019 + } else { + return sql; + } } } @@ -319,9 +336,11 @@ public class QueryFeatureSet extends AbstractFeatureSet { private abstract class QuerySpliterator implements java.util.Spliterator<Feature> { final ResultSet result; + final Connection origin; - private QuerySpliterator(ResultSet result) { + private QuerySpliterator(ResultSet result, Connection origin) { this.result = result; + this.origin = origin; } @Override @@ -338,15 +357,15 @@ public class QueryFeatureSet extends AbstractFeatureSet { private final class ResultSpliterator extends QuerySpliterator { - private ResultSpliterator(ResultSet result) { - super(result); + private ResultSpliterator(ResultSet result, Connection origin) { + super(result, origin); } @Override public boolean tryAdvance(Consumer<? super Feature> action) { try { if (result.next()) { - final Feature f = adapter.read(result); + final Feature f = adapter.read(result, origin); action.accept(f); return true; } else return false; @@ -371,7 +390,7 @@ public class QueryFeatureSet extends AbstractFeatureSet { * there's not much improvement regarding to naive streaming approach. IMHO, two improvements would really impact * performance positively if done: * <ul> - * <li>Optimisation of batch loading through {@link FeatureAdapter#prefetch(int, ResultSet)}</li> + * <li>Optimisation of batch loading through {@link FeatureAdapter#prefetch(int, ResultSet, Connection)}</li> * <li>Better splitting balance, as stated by {@link Spliterator#trySplit()}</li> * </ul> */ @@ -387,12 +406,12 @@ public class QueryFeatureSet extends AbstractFeatureSet { */ long splittedAmount; - private PrefetchSpliterator(ResultSet result) throws SQLException { - this(result, 0.5f); + private PrefetchSpliterator(ResultSet result, Connection origin) throws SQLException { + this(result, origin, 0.5f); } - private PrefetchSpliterator(ResultSet result, float fetchRatio) throws SQLException { - super(result); + private PrefetchSpliterator(ResultSet result, Connection origin, float fetchRatio) throws SQLException { + super(result, origin); this.fetchSize = Math.max((int) (result.getFetchSize()*fetchRatio), 1); } @@ -430,7 +449,7 @@ public class QueryFeatureSet extends AbstractFeatureSet { if (chunk == null || idx >= chunk.size()) { idx = 0; try { - chunk = adapter.prefetch(fetchSize, result); + chunk = adapter.prefetch(fetchSize, result, origin); } catch (SQLException e) { throw new BackingStoreException(e); } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLQueryBuilder.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLQueryBuilder.java deleted file mode 100644 index 55337d7..0000000 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLQueryBuilder.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.apache.sis.internal.sql.feature; - -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.SQLException; -import javax.sql.DataSource; - -import org.apache.sis.internal.metadata.sql.SQLBuilder; -import org.apache.sis.storage.DataStoreException; -import org.apache.sis.storage.FeatureSet; - -public class SQLQueryBuilder extends SQLBuilder { - - final DataSource source; - - public SQLQueryBuilder(DataSource source, final DatabaseMetaData metadata, final boolean quoteSchema) throws SQLException { - super(metadata, quoteSchema); - this.source = source; - } - - public FeatureSet build(final Connection connection) throws SQLException, DataStoreException { - final Analyzer analyzer = new Analyzer(source, connection.getMetaData(), null, null); - // TODO: defensive copy of this builder. - return new QueryFeatureSet(this, analyzer, source, connection); - } - - -} diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java index a0e9888..30820a9 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java @@ -407,4 +407,20 @@ class StreamSQL extends StreamDecoration<Feature> { } } } + + /** + * API to allow overrided SQL Stream to delegate a set of intermediate operations to native driver. + */ + interface QueryBuilder { + + QueryBuilder limit(long limit); + + QueryBuilder offset(long offset); + + default QueryBuilder distinct() { return distinct(true); } + + QueryBuilder distinct(boolean activate); + + Connector select(ColumnRef... columns); + } } diff --git a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java index 2929194..6345fbc 100644 --- a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java +++ b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java @@ -185,7 +185,7 @@ public final strictfp class SQLStoreTest extends TestCase { final QueryFeatureSet canadaCities; try (Connection conn = source.getConnection()) { final SQLBuilder builder = new SQLBuilder(conn.getMetaData(), false) - .append("SELECT * FROM ").appendIdentifier("features", "Cities"); + .append("SELECT * FROM ").appendIdentifier(SCHEMA, "Cities"); allCities = new QueryFeatureSet(builder, source, conn); /* By re-using the same builder, we ensure a defensive copy is done at feature set creation, avoiding * potential concurrent or security issue due to afterward modification of the query. @@ -207,17 +207,22 @@ public final strictfp class SQLStoreTest extends TestCase { expectedResults.add(city("CAN", "Montréal", "Montreal", 1704694)); expectedResults.add(city("CAN", "Québec", "Quebec", 531902)); - Set<Map<String, Object>> result = canadaCities.features(false) - .map(SQLStoreTest::asMap) - .collect(Collectors.toSet()); + Set<Map<String, Object>> result; + try (Stream<Feature> features = canadaCities.features(false)) { + result = features + .map(SQLStoreTest::asMap) + .collect(Collectors.toSet()); + } assertEquals("Query result is not consistent with expected one", expectedResults, result); expectedResults.add(city("FRA", "Paris", "Paris", 2206488)); expectedResults.add(city("JPN", "東京", "Tōkyō", 13622267)); - result = allCities.features(false) - .map(SQLStoreTest::asMap) - .collect(Collectors.toSet()); + try (Stream<Feature> features = allCities.features(false)) { + result = features + .map(SQLStoreTest::asMap) + .collect(Collectors.toSet()); + } assertEquals("Query result is not consistent with expected one", expectedResults, result); } @@ -266,7 +271,7 @@ public final strictfp class SQLStoreTest extends TestCase { private void verifyLimitOffsetAndColumnSelectionFromQuery(final DataSource source) throws Exception { // Ensure multiline text is accepted final String query = "SELECT \"english_name\" as \"title\" \n\r" + - "FROM features.\"Parks\" \n" + + "FROM "+SCHEMA+".\"Parks\" \n" + "ORDER BY \"english_name\" ASC \n" + "OFFSET 2 ROWS FETCH NEXT 3 ROWS ONLY"; final QueryFeatureSet qfs; @@ -286,9 +291,13 @@ public final strictfp class SQLStoreTest extends TestCase { assertEquals("Column length constraint should be visible from attribute type.", 20, ((AttributeType)precision).getDefaultValue()); assertFalse("Built feature type should have exactly one attribute.", props.hasNext()); - Function<Stream<Feature>, String[]> getNames = in -> in - .map(f -> f.getPropertyValue("title").toString()) - .toArray(size -> new String[size]); + Function<Stream<Feature>, String[]> getNames = in -> { + try (Stream<Feature> closeable = in) { + return in + .map(f -> f.getPropertyValue("title").toString()) + .toArray(size -> new String[size]); + } + }; String[] parkNames = getNames.apply( qfs.features(false) @@ -319,16 +328,19 @@ public final strictfp class SQLStoreTest extends TestCase { */ private void verifyDistinctQuery(DataSource source) throws SQLException { // Ensure multiline text is accepted - final String query = "SELECT \"country\" FROM features.\"Parks\" ORDER BY \"country\""; + final String query = "SELECT \"country\" FROM "+SCHEMA+".\"Parks\" ORDER BY \"country\""; final QueryFeatureSet qfs; try (Connection conn = source.getConnection()) { qfs = new QueryFeatureSet(query, source, conn); } - final Object[] expected = qfs.features(false) - .distinct() - .map(f -> f.getPropertyValue("country")) - .toArray(); + final Object[] expected; + try (Stream<Feature> features = qfs.features(false)) { + expected = features + .distinct() + .map(f -> f.getPropertyValue("country")) + .toArray(); + } assertArrayEquals("Distinct country names, sorted in ascending order", new String[]{"CAN", "FRA", "JPN"}, expected); }
