This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/sql-store in repository https://gitbox.apache.org/repos/asf/sis.git
commit 708faa26882f98eb743c94ff37d6950a3c03e3f5 Author: Alexis Manin <[email protected]> AuthorDate: Thu Sep 26 19:19:33 2019 +0200 fix(SQLStore): better handling of parallelization flag. Add a benchmark to test query spliterator flavors. --- storage/sis-sqlstore/pom.xml | 14 +- .../apache/sis/internal/sql/feature/Analyzer.java | 26 ++-- .../apache/sis/internal/sql/feature/Connector.java | 23 +++- .../sis/internal/sql/feature/FeatureAdapter.java | 2 +- .../sis/internal/sql/feature/PrimaryKey.java | 12 +- .../sis/internal/sql/feature/QueryBuilder.java | 1 + .../sis/internal/sql/feature/QueryFeatureSet.java | 149 +++++++++++++++++--- .../sql/feature/QuerySpliteratorsBench.java | 150 +++++++++++++++++++++ .../internal/sql/feature/SQLTypeSpecification.java | 62 ++++++++- .../apache/sis/internal/sql/feature/StreamSQL.java | 19 ++- .../org/apache/sis/internal/sql/feature/Table.java | 2 +- .../sis/internal/sql/feature/package-info.java | 5 + 12 files changed, 418 insertions(+), 47 deletions(-) diff --git a/storage/sis-sqlstore/pom.xml b/storage/sis-sqlstore/pom.xml index 45231fe..1e905f2 100644 --- a/storage/sis-sqlstore/pom.xml +++ b/storage/sis-sqlstore/pom.xml @@ -120,7 +120,7 @@ <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> - <scope>test</scope> + <scope>compile</scope> </dependency> <dependency> <groupId>org.hsqldb</groupId> @@ -132,6 +132,18 @@ <artifactId>postgresql</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <version>1.21</version> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>1.21</version> + </dependency> + </dependencies> </project> diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java index 6ac8fd7..9348d01 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java @@ -22,6 +22,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.*; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.LogRecord; import javax.sql.DataSource; @@ -140,6 +141,7 @@ final class Analyzer { * The namespace created with {@link #catalog} and {@link #schema}. */ private transient NameSpace namespace; + public static final Supplier<GenericName> RANDOME_NAME = () -> Names.createGenericName("sis", ":", UUID.randomUUID().toString()); /** * Creates a new analyzer for the database described by given metadata. @@ -334,9 +336,9 @@ final class Analyzer { public FeatureAdapter buildAdapter(final SQLTypeSpecification spec) throws SQLException { final FeatureTypeBuilder builder = new FeatureTypeBuilder(nameFactory, functions.library, locale); - builder.setName(spec.getName() == null ? Names.createGenericName("sis", ":", UUID.randomUUID().toString()) : spec.getName()); - builder.setDefinition(spec.getDefinition()); - final String geomCol = spec.getPrimaryGeometryColumn().orElse(""); + builder.setName(spec.getName().orElseGet(RANDOME_NAME)); + spec.getDefinition().ifPresent(builder::setDefinition); + final String geomCol = spec.getPrimaryGeometryColumn().map(ColumnRef::getAttributeName).orElse(""); final List pkCols = spec.getPK().map(PrimaryKey::getColumns).orElse(Collections.EMPTY_LIST); List<PropertyMapper> attributes = new ArrayList<>(); // JDBC column indices are 1 based. @@ -470,15 +472,15 @@ final class Analyzer { } @Override - public GenericName getName() { - return id.getName(Analyzer.this); + public Optional<GenericName> getName() { + return Optional.of(id.getName(Analyzer.this)); } /** * The remarks are opportunistically stored in id.freeText if known by the caller. */ @Override - public String getDefinition() throws SQLException { + public Optional<String> getDefinition() throws SQLException { String remarks = id.freeText; if (id instanceof Relation) { try (ResultSet reflect = metadata.getTables(id.catalog, schemaEsc, tableEsc, null)) { @@ -493,7 +495,7 @@ final class Analyzer { } } } - return remarks; + return Optional.ofNullable(remarks); } @Override @@ -551,7 +553,7 @@ final class Analyzer { } @Override - public Optional<String> getPrimaryGeometryColumn() { + public Optional<ColumnRef> getPrimaryGeometryColumn() { return Optional.empty(); //throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)" on 20/09/2019 } @@ -589,13 +591,13 @@ final class Analyzer { } @Override - public GenericName getName() throws SQLException { - return name; + public Optional<GenericName> getName() throws SQLException { + return Optional.of(name); } @Override - public String getDefinition() throws SQLException { - return query; + public Optional<String> getDefinition() throws SQLException { + return Optional.of(query); } @Override diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java index c99bd7f..9ce296c 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java @@ -8,8 +8,29 @@ import org.opengis.feature.Feature; import org.apache.sis.storage.DataStoreException; -public interface Connector { +/** + * Simple abstraction to describe a component capable of loading data from an SQL connection. Used + */ +interface Connector { + /** + * Triggers Loading of data through an existing connection. + * + * @param connection The database connection to use for data loading. Note its the caller responsability to close + * the connection, and it should not be done before stream terminal operation is over. + * @return Features loaded from input connection. It is recommended to implement lazy solutions, however it's an + * implementation dependant choice. + * @throws SQLException If an error occurs while exchanging information with the database. + * @throws DataStoreException If a data model dependant error occurs. + */ Stream<Feature> connect(Connection connection) throws SQLException, DataStoreException; + /** + * Provides an approximate query to resume data loaded. + * + * @param count If the query estimation is needed for a count operation, in which case the returned query should be + * a count query. + * @return SQL query describing the way this component load data. Never null. However, implementations are free to + * throw {@link UnsupportedOperationException} if they do not support such operation. + */ String estimateStatement(final boolean count); } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java index 0162e67..17a345a 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java @@ -57,7 +57,7 @@ class FeatureAdapter { return features; } - static class PropertyMapper { + static final class PropertyMapper { // TODO: by using a indexed implementation of Feature, we could avoid the name mapping. However, a JMH benchmark // would be required in order to be sure it's impacting performance positively. final String propertyName; diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java index 1e68fd3..e220abc 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java @@ -10,6 +10,9 @@ import org.apache.sis.util.ArgumentChecks; /** * Represents SQL primary key constraint. Main information is columns composing the key. * + * @implNote For now, only list of columns composing the key are returned. However, in the future it would be possible + * to add other information, as a value type to describe how to expose primary key value. + * * @author "Alexis Manin (Geomatys)" */ interface PrimaryKey { @@ -20,13 +23,16 @@ interface PrimaryKey { return Optional.of(new Composite(cols)); } - //Class<T> getViewType(); + /** + * + * @return List of column names composing the key. Should neither be null nor empty. + */ List<String> getColumns(); class Simple implements PrimaryKey { final String column; - public Simple(String column) { + Simple(String column) { this.column = column; } @@ -40,7 +46,7 @@ interface PrimaryKey { */ private final List<String> columns; - public Composite(List<String> columns) { + Composite(List<String> columns) { ArgumentChecks.ensureNonEmpty("Primary key column names", columns); this.columns = Collections.unmodifiableList(new ArrayList<>(columns)); } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java index 68d798e..a78ed2a 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java @@ -2,6 +2,7 @@ package org.apache.sis.internal.sql.feature; /** * API to allow overrided SQL Stream to delegate a set of intermediate operations to native driver. + * TODO: move as inner interface of {@link StreamSQL} ? */ interface QueryBuilder { diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java index 0565036..ffbdc8b 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java @@ -4,6 +4,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.List; import java.util.Spliterator; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -78,6 +79,21 @@ public class QueryFeatureSet extends AbstractFeatureSet { private final boolean distinct; /** + * Debug flag to activate (use {@link PrefetchSpliterator}) or de-activate (use {@link ResultSpliterator}) + * batch loading of results. + */ + boolean allowBatchLoading = true; + /** + * Profiling variable. Define the fraction (0 none, 1 all) of a single fetch (as defined by {@link ResultSet#getFetchSize()} + * that {@link PrefetchSpliterator} will load in one go. + */ + float fetchRatio = 0.5f; + /** + * Profiling variable, serves to define {{@link PreparedStatement#setFetchSize(int)} SQL result fetch size}. + */ + int fetchSize = 100; + + /** * Same as {@link #QueryFeatureSet(SQLBuilder, DataSource, Connection)}, except query is provided by a fixed text * instead of a builder. */ @@ -186,19 +202,21 @@ public class QueryFeatureSet extends AbstractFeatureSet { @Override public Stream<Feature> features(boolean parallel) { - return new StreamSQL(new QueryAdapter(queryBuilder), source); + return new StreamSQL(new QueryAdapter(queryBuilder, parallel), source, parallel); } private final class QueryAdapter implements QueryBuilder { private final SQLBuilder source; + private final boolean parallel; private long additionalOffset, additionalLimit; - QueryAdapter(SQLBuilder source) { + QueryAdapter(SQLBuilder source, boolean parallel) { // defensive copy this.source = new SQLBuilder(source); this.source.append(source.toString()); + this.parallel = parallel; } @Override @@ -236,7 +254,7 @@ public class QueryFeatureSet extends AbstractFeatureSet { } Features.addOffsetLimit(source, nativeOffset, nativeLimit); - return new PreparedQueryConnector(source.toString(), javaOffset, javaLimit); + return new PreparedQueryConnector(source.toString(), javaOffset, javaLimit, parallel); } throw new UnsupportedOperationException("Not supported yet: modifying user query"); // "Alexis Manin (Geomatys)" on 24/09/2019 } @@ -246,19 +264,25 @@ public class QueryFeatureSet extends AbstractFeatureSet { final String sql; private long additionalOffset, additionalLimit; + private final boolean parallel; - private PreparedQueryConnector(String sql, long additionalOffset, long additionalLimit) { + private PreparedQueryConnector(String sql, long additionalOffset, long additionalLimit, boolean parallel) { this.sql = sql; this.additionalOffset = additionalOffset; this.additionalLimit = additionalLimit; + this.parallel = parallel; } @Override public Stream<Feature> connect(Connection connection) throws SQLException, DataStoreException { final PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(fetchSize); final ResultSet result = statement.executeQuery(); - - Stream<Feature> stream = StreamSupport.stream(new ResultSpliterator(result), false); + final int fetchSize = result.getFetchSize(); + final boolean withPrefetch = !allowBatchLoading || (fetchSize < 1 || fetchSize >= Integer.MAX_VALUE); + final Spliterator<Feature> spliterator = withPrefetch ? + new ResultSpliterator(result) : new PrefetchSpliterator(result, fetchRatio); + Stream<Feature> stream = StreamSupport.stream(spliterator, parallel && withPrefetch); if (additionalLimit > 0) stream = stream.limit(additionalLimit); if (additionalOffset > 0) stream = stream.skip(additionalOffset); @@ -276,19 +300,49 @@ public class QueryFeatureSet extends AbstractFeatureSet { @Override public String estimateStatement(boolean count) { + // Require analysis. Things could get complicated if original user query is already a count. throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)" on 24/09/2019 } } - private final class ResultSpliterator implements Spliterator<Feature> { + /** + * Base class for loading SQL query result loading through {@link Spliterator} API. Concrete implementations comes + * in two experimental flavors : + * <ul> + * <li>Sequential streaming: {@link ResultSpliterator}</li> + * <li>Parallelizable batch streaming: {@link PrefetchSpliterator}</li> + * </ul> + * + * {@link QuerySpliteratorsBench A benchmark is available} to compare both implementations, which could be useful in + * the future to determine which implementation to priorize. For now results does not show much difference. + */ + private abstract class QuerySpliterator implements java.util.Spliterator<Feature> { final ResultSet result; - private ResultSpliterator(ResultSet result) { + private QuerySpliterator(ResultSet result) { this.result = result; } @Override + public long estimateSize() { + return originLimit > 0 ? originLimit : Long.MAX_VALUE; + } + + @Override + public int characteristics() { + // TODO: determine if it's order by analysing user query. SIZED is not possible, as limit is an upper threshold. + return Spliterator.IMMUTABLE | Spliterator.NONNULL | (distinct ? Spliterator.DISTINCT : 0); + } + } + + private final class ResultSpliterator extends QuerySpliterator { + + private ResultSpliterator(ResultSet result) { + super(result); + } + + @Override public boolean tryAdvance(Consumer<? super Feature> action) { try { if (result.next()) { @@ -305,23 +359,84 @@ public class QueryFeatureSet extends AbstractFeatureSet { public Spliterator<Feature> trySplit() { return null; } + } + + private static SQLBuilder fromQuery(final String query, final Connection conn) throws SQLException { + return new SQLBuilder(conn.getMetaData(), true) + .append(query); + } + + /** + * An attempt to optimize feature loading through batching and potential parallelization. For now, it looks like + * there's not much improvement regarding to naive streaming approach. IMHO, two improvements would really impact + * performance positively if done: + * <ul> + * <li>Optimisation of batch loading through {@link FeatureAdapter#prefetch(int, ResultSet)}</li> + * <li>Better splitting balance, as stated by {@link Spliterator#trySplit()}</li> + * </ul> + */ + private final class PrefetchSpliterator extends QuerySpliterator { + + final int fetchSize; + + int idx; + List<Feature> chunk; + /** + * According to {@link Spliterator#trySplit()} documentation, the original size estimation must be reduced after + * split to remain consistent. + */ + long splittedAmount = 0; + + private PrefetchSpliterator(ResultSet result) throws SQLException { + this(result, 0.5f); + } + + private PrefetchSpliterator(ResultSet result, float fetchRatio) throws SQLException { + super(result); + this.fetchSize = Math.max((int) (result.getFetchSize()*fetchRatio), 1); + } + + @Override + public boolean tryAdvance(Consumer<? super Feature> action) { + if (ensureChunkAvailable()) { + action.accept(chunk.get(idx++)); + return true; + } + return false; + } + + public Spliterator<Feature> trySplit() { + if (!ensureChunkAvailable()) return null; + final List<Feature> remainingChunk = chunk.subList(idx, chunk.size()); + splittedAmount += remainingChunk.size(); + final Spliterator<Feature> chunkSpliterator = idx == 0 ? + chunk.spliterator() : remainingChunk.spliterator(); + chunk = null; + idx = 0; + return chunkSpliterator; + } @Override public long estimateSize() { - // TODO: economic size estimation ? A count query seems overkill for the aim of this API. Howver, we could - // analyze user query in search for a limit value. - return originLimit > 0 ? originLimit : Long.MAX_VALUE; + return super.estimateSize() - splittedAmount; } @Override public int characteristics() { - // TODO: determine if it's sorted by analysing user query. SIZED is not possible, as limit is an upper threshold. - return Spliterator.IMMUTABLE | Spliterator.NONNULL; + return super.characteristics() | Spliterator.CONCURRENT; } - } - private static SQLBuilder fromQuery(final String query, final Connection conn) throws SQLException { - return new SQLBuilder(conn.getMetaData(), true) - .append(query); + private boolean ensureChunkAvailable() { + if (chunk == null || idx >= chunk.size()) { + idx = 0; + try { + chunk = adapter.prefetch(fetchSize, result); + } catch (SQLException e) { + throw new BackingStoreException(e); + } + return chunk != null && !chunk.isEmpty(); + } + return true; + } } } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java new file mode 100644 index 0000000..bb519d9 --- /dev/null +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java @@ -0,0 +1,150 @@ +package org.apache.sis.internal.sql.feature; + +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.sis.internal.metadata.sql.Initializer; + +import org.apache.derby.jdbc.EmbeddedDataSource; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import static org.apache.sis.util.ArgumentChecks.ensureStrictlyPositive; + +@Fork(value = 2, jvmArgs = {"-server", "-Xmx2g"} ) +@Warmup(iterations = 2, time = 4, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 2, time = 4, timeUnit = TimeUnit.SECONDS) +public class QuerySpliteratorsBench { + + @State(Scope.Benchmark) + public static class DatabaseInput { + + @Param({"1000", "100000"}) + int numRows; + + @Param({"true", "false"}) + boolean parallel; + + @Param({"true", "false"}) + boolean prefetch; + + @Param({"10", "100", "1000"}) + int fetchSize; + + @Param({"0.5", "1", "2"}) + float fetchRatio; + + EmbeddedDataSource db; + QueryFeatureSet fs; + + public DatabaseInput() {} + + @Setup(Level.Trial) + public void setup() throws SQLException { + ensureStrictlyPositive("Number of rows", numRows); + + db = new EmbeddedDataSource(); + db.setDatabaseName("memory:spliterators"); + db.setDataSourceName("Apache SIS test database"); + db.setCreateDatabase("create"); + + try (Connection c = db.getConnection()) { + c.createStatement().execute( + "CREATE TABLE TEST (str CHARACTER VARYING(20), myInt INTEGER, myDouble DOUBLE)" + ); + final PreparedStatement st = c.prepareStatement("INSERT INTO TEST values (?, ?, ?)"); + + final Random rand = new Random(); + int rows = 1; + final byte[] txt = new byte[20]; + do { + for (int i = 0; i < 500 ; i++, rows++) { + rand.nextBytes(txt); + st.setString(1, new String(txt, StandardCharsets.US_ASCII)); + st.setInt(2, rand.nextInt()); + st.setDouble(3, rand.nextDouble()); + st.addBatch(); + } + st.executeBatch(); + st.clearBatch(); + } while (rows < numRows); + + fs = new QueryFeatureSet("SELECT * FROM TEST", db, c); + fs.allowBatchLoading = prefetch; + fs.fetchSize = fetchSize; + fs.fetchRatio = fetchRatio; + } + } + + @TearDown + public void dropDatabase() throws SQLException { + db.setCreateDatabase("no"); + db.setConnectionAttributes("drop=true"); + try { + db.getConnection().close(); + } catch (SQLException e) { // This is the expected exception. + if (!Initializer.isSuccessfulShutdown(e)) { + throw e; + } + } + } + } + + @Benchmark + public void test(DatabaseInput input) throws SQLException { + final int sum = input.fs.features(input.parallel).mapToInt(f -> 1).sum(); + if (sum != input.numRows) throw new AssertionError("..." + sum + "..." + "WTF ?!"); + } +/* + @Test + public void benchmark() throws Exception { + System.out.println("COMMON POOL: "+ ForkJoinPool.getCommonPoolParallelism()); + final DatabaseInput db = new DatabaseInput(); + db.numRows = 100000; + db.parallel = true; + + long start = System.nanoTime(); + db.setup(); + System.out.println("Insertion time: "+((System.nanoTime()-start)/1e6)+" ms"); + + // warmup + for (int i = 0 ; i < 5 ; i++) { + test(db); + test(db); + } + + // go + long prefetch = 0, noprefetch = 0; + for (int i = 0 ; i < 100 ; i++) { + start = System.nanoTime(); + test(db); + prefetch += System.nanoTime()-start; + + start = System.nanoTime(); + test(db); + noprefetch += System.nanoTime()-start; + } + + System.out.println(String.format( + "Performances:%nP: %d%nI: %d", + (long) (prefetch / 1e7), (long) (noprefetch / 1e8) + )); + } +*/ + public static void main(String... args) throws Exception { + org.openjdk.jmh.Main.main(args); + } +} diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java index 0e35edf..e54823d 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java @@ -1,35 +1,87 @@ package org.apache.sis.internal.sql.feature; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.Optional; import org.opengis.util.GenericName; +import org.apache.sis.feature.builder.FeatureTypeBuilder; +import org.apache.sis.internal.feature.AttributeConvention; import org.apache.sis.storage.DataStoreContentException; +/** + * Defines an application schema inferred from an SQL database (query, table, etc.). Implementations will be used by + * {@link Analyzer} to create an {@link FeatureAdapter adaptation layer to the feature model}. Default implementations + * can be retrieved for tables and queries respectively through {@link Analyzer#create(TableReference, TableReference)} + * and {@link Analyzer#create(PreparedStatement, String, GenericName)} methods. + */ interface SQLTypeSpecification { /** + * Identifying name for the application schema. It is strongly recommended to be present, for SIS engine to be + * capable to create insightful models. However, in corner cases where no proper names could be provided, an empty + * value is allowed. * - * @return Name for the feature type to build. Nullable. + * @implNote SIS {@link FeatureTypeBuilder feature type builder} <em>requires</em> a name, and current + * {@link Analyzer#buildAdapter(SQLTypeSpecification) analysis implementation} will create a random UUID if + * necessary. + * + * @return Name for the feature type to build. * @throws SQLException If an error occurs while retrieving information from database. */ - GenericName getName() throws SQLException; + Optional<GenericName> getName() throws SQLException; /** + * Gives an optional description of the application schema.This information is not necessary for any kind of + * computation, but allows to give end-user global information about the schema (s)he's manipulating. * - * @return A succint description of the data source. Nullable. + * @return A brief description of the data source. * @throws SQLException If an error occurs while retrieving information from database. */ - String getDefinition() throws SQLException; + Optional<String> getDefinition() throws SQLException; + /** + * Primary key definition of source schema. Can be empty if no primary key is defined (Example: query definition). + * + * @return Primary key definition if any, otherwise an empty shell. + * @throws SQLException If an error occurs while exchanging information with underlying database. + */ Optional<PrimaryKey> getPK() throws SQLException; + /** + * + * @return Ordered list of columns in application schema. Order is important, and will be relied upon to retrieve + * {@link ResultSet#getObject(int) result values by index}. + */ List<SQLColumn> getColumns(); + /** + * + * @return All identified relations based on a foreign key in <em>current</em> application schema (1..1 or n..1). + * Corresponds to {@link Relation.Direction#IMPORT}. Can be empty but not null. + * + * @throws SQLException If an error occurs while exchanging information with underlying database. + */ List<Relation> getImports() throws SQLException; + /** + * + * @return All identified relations based on foreign key located in <em>another</em> application schema (1..n). + * Corresponds to {@link Relation.Direction#EXPORT}. Can be empty but not null. + * @throws SQLException If an error occurs while exchanging information with underlying database. + * @throws DataStoreContentException If a schema problem is encountered. + */ List<Relation> getExports() throws SQLException, DataStoreContentException; - default Optional<String> getPrimaryGeometryColumn() {return Optional.empty();} + /** + * In case target schema contains geographic information, this serves to identify without ambiguity which column + * contains what could be considered main geolocation (as stated by {@link AttributeConvention#GEOMETRY_PROPERTY}). + * This is a very important information in case application schema contains multiple geometric fields. + * + * @return The name of the column/attribute to be considered as main geometry information, or an empty shell if + * unknown. + */ + default Optional<ColumnRef> getPrimaryGeometryColumn() {return Optional.empty();} } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java index 890a9dd..a0e9888 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java @@ -78,13 +78,14 @@ class StreamSQL extends StreamDecoration<Feature> { private Consumer<SQLException> warningConsumer = e -> Logging.getLogger("sis.sql").log(Level.FINE, "Cannot properly close a connection", e); - StreamSQL(final Table source) { - this(new Features.Builder(source), source.source); + StreamSQL(final Table source, boolean parallel) { + this(new Features.Builder(source), source.source, parallel); } - StreamSQL(QueryBuilder queryAdapter, DataSource source) { + StreamSQL(QueryBuilder queryAdapter, DataSource source, boolean parallel) { this.queryAdapter = queryAdapter; this.source = source; + this.parallel = parallel; } public void setWarningConsumer(Consumer<SQLException> warningConsumer) { @@ -171,8 +172,14 @@ class StreamSQL extends StreamDecoration<Feature> { @Override public long count() { // Avoid opening a connection if sql text cannot be evaluated. - final String sql = select().estimateStatement(true); - try (Connection conn = source.getConnection()) { + final String sql; + try { + sql = select().estimateStatement(true); + } catch (UnsupportedOperationException e) { + // If underlying connector does not support query estimation, we will fallback on brut-force counting. + return super.count(); + } + try (Connection conn = QueryFeatureSet.connectReadOnly(source)) { try (Statement st = conn.createStatement(); ResultSet rs = st.executeQuery(sql)) { if (rs.next()) { @@ -199,7 +206,7 @@ class StreamSQL extends StreamDecoration<Feature> { }) .onClose(() -> closeRef(connectionRef, true)); if (peekAction != null) featureStream = featureStream.peek(peekAction); - return parallel ? featureStream : featureStream.parallel(); + return parallel ? featureStream.parallel() : featureStream; } /** diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java index 80fc52b..81b8c75 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java @@ -418,7 +418,7 @@ final class Table extends AbstractFeatureSet { */ @Override public Stream<Feature> features(final boolean parallel) throws DataStoreException { - return new StreamSQL(this); + return new StreamSQL(this, parallel); } /** diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java index 2c198a8..19c3151 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java @@ -25,8 +25,13 @@ * This package is for internal use by SIS only. Classes in this package * may change in incompatible ways in any future version without notice. * + * @implNote Feature type analysis is done through {@link org.apache.sis.internal.sql.feature.Analyzer} class. + * It relies on internal {@link org.apache.sis.internal.sql.feature.SQLTypeSpecification} API to fetch SQL schema + * information, and build {@link org.apache.sis.internal.sql.feature.FeatureAdapter an adapter to feature model from it}. + * * @author Johann Sorel (Geomatys) * @author Martin Desruisseaux (Geomatys) + * @author Alexis Manin (Geomatys) * @version 1.0 * @since 1.0 * @module
