This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 708faa26882f98eb743c94ff37d6950a3c03e3f5
Author: Alexis Manin <[email protected]>
AuthorDate: Thu Sep 26 19:19:33 2019 +0200

    fix(SQLStore): better handling of parallelization flag. Add a benchmark to 
test query spliterator flavors.
---
 storage/sis-sqlstore/pom.xml                       |  14 +-
 .../apache/sis/internal/sql/feature/Analyzer.java  |  26 ++--
 .../apache/sis/internal/sql/feature/Connector.java |  23 +++-
 .../sis/internal/sql/feature/FeatureAdapter.java   |   2 +-
 .../sis/internal/sql/feature/PrimaryKey.java       |  12 +-
 .../sis/internal/sql/feature/QueryBuilder.java     |   1 +
 .../sis/internal/sql/feature/QueryFeatureSet.java  | 149 +++++++++++++++++---
 .../sql/feature/QuerySpliteratorsBench.java        | 150 +++++++++++++++++++++
 .../internal/sql/feature/SQLTypeSpecification.java |  62 ++++++++-
 .../apache/sis/internal/sql/feature/StreamSQL.java |  19 ++-
 .../org/apache/sis/internal/sql/feature/Table.java |   2 +-
 .../sis/internal/sql/feature/package-info.java     |   5 +
 12 files changed, 418 insertions(+), 47 deletions(-)

diff --git a/storage/sis-sqlstore/pom.xml b/storage/sis-sqlstore/pom.xml
index 45231fe..1e905f2 100644
--- a/storage/sis-sqlstore/pom.xml
+++ b/storage/sis-sqlstore/pom.xml
@@ -120,7 +120,7 @@
     <dependency>
       <groupId>org.apache.derby</groupId>
       <artifactId>derby</artifactId>
-      <scope>test</scope>
+      <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>org.hsqldb</groupId>
@@ -132,6 +132,18 @@
       <artifactId>postgresql</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-core</artifactId>
+      <version>1.21</version>
+    </dependency>
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-generator-annprocess</artifactId>
+      <version>1.21</version>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
index 6ac8fd7..9348d01 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
@@ -22,6 +22,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.*;
+import java.util.function.Supplier;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import javax.sql.DataSource;
@@ -140,6 +141,7 @@ final class Analyzer {
      * The namespace created with {@link #catalog} and {@link #schema}.
      */
     private transient NameSpace namespace;
+    public static final Supplier<GenericName> RANDOME_NAME = () -> 
Names.createGenericName("sis", ":", UUID.randomUUID().toString());
 
     /**
      * Creates a new analyzer for the database described by given metadata.
@@ -334,9 +336,9 @@ final class Analyzer {
 
     public FeatureAdapter buildAdapter(final SQLTypeSpecification spec) throws 
SQLException {
         final FeatureTypeBuilder builder = new FeatureTypeBuilder(nameFactory, 
functions.library, locale);
-        builder.setName(spec.getName() == null ? 
Names.createGenericName("sis", ":", UUID.randomUUID().toString()) : 
spec.getName());
-        builder.setDefinition(spec.getDefinition());
-        final String geomCol = spec.getPrimaryGeometryColumn().orElse("");
+        builder.setName(spec.getName().orElseGet(RANDOME_NAME));
+        spec.getDefinition().ifPresent(builder::setDefinition);
+        final String geomCol = 
spec.getPrimaryGeometryColumn().map(ColumnRef::getAttributeName).orElse("");
         final List pkCols = 
spec.getPK().map(PrimaryKey::getColumns).orElse(Collections.EMPTY_LIST);
         List<PropertyMapper> attributes = new ArrayList<>();
         // JDBC column indices are 1 based.
@@ -470,15 +472,15 @@ final class Analyzer {
         }
 
         @Override
-        public GenericName getName() {
-            return id.getName(Analyzer.this);
+        public Optional<GenericName> getName() {
+            return Optional.of(id.getName(Analyzer.this));
         }
 
         /**
          * The remarks are opportunistically stored in id.freeText if known by 
the caller.
          */
         @Override
-        public String getDefinition() throws SQLException {
+        public Optional<String> getDefinition() throws SQLException {
             String remarks = id.freeText;
             if (id instanceof Relation) {
                 try (ResultSet reflect = metadata.getTables(id.catalog, 
schemaEsc, tableEsc, null)) {
@@ -493,7 +495,7 @@ final class Analyzer {
                     }
                 }
             }
-            return remarks;
+            return Optional.ofNullable(remarks);
         }
 
         @Override
@@ -551,7 +553,7 @@ final class Analyzer {
         }
 
         @Override
-        public Optional<String> getPrimaryGeometryColumn() {
+        public Optional<ColumnRef> getPrimaryGeometryColumn() {
             return Optional.empty();
             //throw new UnsupportedOperationException("Not supported yet"); // 
"Alexis Manin (Geomatys)" on 20/09/2019
         }
@@ -589,13 +591,13 @@ final class Analyzer {
         }
 
         @Override
-        public GenericName getName() throws SQLException {
-            return name;
+        public Optional<GenericName> getName() throws SQLException {
+            return Optional.of(name);
         }
 
         @Override
-        public String getDefinition() throws SQLException {
-            return query;
+        public Optional<String> getDefinition() throws SQLException {
+            return Optional.of(query);
         }
 
         @Override
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
index c99bd7f..9ce296c 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
@@ -8,8 +8,29 @@ import org.opengis.feature.Feature;
 
 import org.apache.sis.storage.DataStoreException;
 
-public interface Connector {
+/**
+ * Simple abstraction to describe a component capable of loading data from an 
SQL connection. Used
+ */
+interface Connector {
+    /**
+     * Triggers Loading of data through an existing connection.
+     *
+     * @param connection The database connection to use for data loading. Note 
its the caller responsability to close
+     *                   the connection, and it should not be done before 
stream terminal operation is over.
+     * @return Features loaded from input connection. It is recommended to 
implement lazy solutions, however it's an
+     * implementation dependant choice.
+     * @throws SQLException If an error occurs while exchanging information 
with the database.
+     * @throws DataStoreException If a data model dependant error occurs.
+     */
     Stream<Feature> connect(Connection connection) throws SQLException, 
DataStoreException;
 
+    /**
+     * Provides an approximate query to resume data loaded.
+     *
+     * @param count If the query estimation is needed for a count operation, 
in which case the returned query should be
+     *              a count query.
+     * @return SQL query describing the way this component load data. Never 
null. However, implementations are free to
+     * throw {@link UnsupportedOperationException} if they do not support such 
operation.
+     */
     String estimateStatement(final boolean count);
 }
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
index 0162e67..17a345a 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
@@ -57,7 +57,7 @@ class FeatureAdapter {
         return features;
     }
 
-    static class PropertyMapper {
+    static final class PropertyMapper {
         // TODO: by using a indexed implementation of Feature, we could avoid 
the name mapping. However, a JMH benchmark
         // would be required in order to be sure it's impacting performance 
positively.
         final String propertyName;
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
index 1e68fd3..e220abc 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
@@ -10,6 +10,9 @@ import org.apache.sis.util.ArgumentChecks;
 /**
  * Represents SQL primary key constraint. Main information is columns 
composing the key.
  *
+ * @implNote For now, only list of columns composing the key are returned. 
However, in the future it would be possible
+ * to add other information, as a value type to describe how to expose primary 
key value.
+ *
  * @author "Alexis Manin (Geomatys)"
  */
 interface PrimaryKey {
@@ -20,13 +23,16 @@ interface PrimaryKey {
         return Optional.of(new Composite(cols));
     }
 
-    //Class<T> getViewType();
+    /**
+     *
+     * @return List of column names composing the key. Should neither be null 
nor empty.
+     */
     List<String> getColumns();
 
     class Simple implements PrimaryKey {
         final String column;
 
-        public Simple(String column) {
+        Simple(String column) {
             this.column = column;
         }
 
@@ -40,7 +46,7 @@ interface PrimaryKey {
          */
         private final List<String> columns;
 
-        public Composite(List<String> columns) {
+        Composite(List<String> columns) {
             ArgumentChecks.ensureNonEmpty("Primary key column names", columns);
             this.columns = Collections.unmodifiableList(new 
ArrayList<>(columns));
         }
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
index 68d798e..a78ed2a 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
@@ -2,6 +2,7 @@ package org.apache.sis.internal.sql.feature;
 
 /**
  * API to allow overrided SQL Stream to delegate a set of intermediate 
operations to native driver.
+ * TODO: move as inner interface of {@link StreamSQL} ?
  */
 interface QueryBuilder {
 
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
index 0565036..ffbdc8b 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
@@ -4,6 +4,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
 import java.util.Spliterator;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
@@ -78,6 +79,21 @@ public class QueryFeatureSet extends AbstractFeatureSet {
     private final boolean distinct;
 
     /**
+     * Debug flag to activate (use {@link PrefetchSpliterator}) or de-activate 
(use {@link ResultSpliterator})
+     * batch loading of results.
+     */
+    boolean allowBatchLoading = true;
+    /**
+     * Profiling variable. Define the fraction (0 none, 1 all) of a single 
fetch (as defined by {@link ResultSet#getFetchSize()}
+     * that {@link PrefetchSpliterator} will load in one go.
+     */
+    float fetchRatio = 0.5f;
+    /**
+     * Profiling variable, serves to define {{@link 
PreparedStatement#setFetchSize(int)} SQL result fetch size}.
+     */
+    int fetchSize = 100;
+
+    /**
      * Same as {@link #QueryFeatureSet(SQLBuilder, DataSource, Connection)}, 
except query is provided by a fixed text
      * instead of a builder.
      */
@@ -186,19 +202,21 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
     @Override
     public Stream<Feature> features(boolean parallel) {
-        return new StreamSQL(new QueryAdapter(queryBuilder), source);
+        return new StreamSQL(new QueryAdapter(queryBuilder, parallel), source, 
parallel);
     }
 
     private final class QueryAdapter implements QueryBuilder {
 
         private final SQLBuilder source;
+        private final boolean parallel;
 
         private long additionalOffset, additionalLimit;
 
-        QueryAdapter(SQLBuilder source) {
+        QueryAdapter(SQLBuilder source, boolean parallel) {
             // defensive copy
             this.source = new SQLBuilder(source);
             this.source.append(source.toString());
+            this.parallel = parallel;
         }
 
         @Override
@@ -236,7 +254,7 @@ public class QueryFeatureSet extends AbstractFeatureSet {
                 }
 
                 Features.addOffsetLimit(source, nativeOffset, nativeLimit);
-                return new PreparedQueryConnector(source.toString(), 
javaOffset, javaLimit);
+                return new PreparedQueryConnector(source.toString(), 
javaOffset, javaLimit, parallel);
             }
             throw new UnsupportedOperationException("Not supported yet: 
modifying user query"); // "Alexis Manin (Geomatys)" on 24/09/2019
         }
@@ -246,19 +264,25 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
         final String sql;
         private long additionalOffset, additionalLimit;
+        private final boolean parallel;
 
-        private PreparedQueryConnector(String sql, long additionalOffset, long 
additionalLimit) {
+        private PreparedQueryConnector(String sql, long additionalOffset, long 
additionalLimit, boolean parallel) {
             this.sql = sql;
             this.additionalOffset = additionalOffset;
             this.additionalLimit = additionalLimit;
+            this.parallel = parallel;
         }
 
         @Override
         public Stream<Feature> connect(Connection connection) throws 
SQLException, DataStoreException {
             final PreparedStatement statement = 
connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+            statement.setFetchSize(fetchSize);
             final ResultSet result = statement.executeQuery();
-
-            Stream<Feature> stream = StreamSupport.stream(new 
ResultSpliterator(result), false);
+            final int fetchSize = result.getFetchSize();
+            final boolean withPrefetch = !allowBatchLoading || (fetchSize < 1 
|| fetchSize >= Integer.MAX_VALUE);
+            final Spliterator<Feature> spliterator = withPrefetch ?
+                    new ResultSpliterator(result) : new 
PrefetchSpliterator(result, fetchRatio);
+            Stream<Feature> stream = StreamSupport.stream(spliterator, 
parallel && withPrefetch);
             if (additionalLimit > 0) stream = stream.limit(additionalLimit);
             if (additionalOffset > 0) stream = stream.skip(additionalOffset);
 
@@ -276,19 +300,49 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
         @Override
         public String estimateStatement(boolean count) {
+            // Require analysis. Things could get complicated if original user 
query is already a count.
             throw new UnsupportedOperationException("Not supported yet"); // 
"Alexis Manin (Geomatys)" on 24/09/2019
         }
     }
 
-    private final class ResultSpliterator implements Spliterator<Feature> {
+    /**
+     * Base class for loading SQL query result loading through {@link 
Spliterator} API. Concrete implementations comes
+     * in two experimental flavors :
+     * <ul>
+     *     <li>Sequential streaming: {@link ResultSpliterator}</li>
+     *     <li>Parallelizable batch  streaming: {@link 
PrefetchSpliterator}</li>
+     * </ul>
+     *
+     * {@link QuerySpliteratorsBench A benchmark is available} to compare both 
implementations, which could be useful in
+     * the future to determine which implementation to priorize. For now 
results does not show much difference.
+     */
+    private abstract class QuerySpliterator  implements 
java.util.Spliterator<Feature> {
 
         final ResultSet result;
 
-        private ResultSpliterator(ResultSet result) {
+        private QuerySpliterator(ResultSet result) {
             this.result = result;
         }
 
         @Override
+        public long estimateSize() {
+            return originLimit > 0 ? originLimit : Long.MAX_VALUE;
+        }
+
+        @Override
+        public int characteristics() {
+            // TODO: determine if it's order by analysing user query. SIZED is 
not possible, as limit is an upper threshold.
+            return Spliterator.IMMUTABLE | Spliterator.NONNULL | (distinct ? 
Spliterator.DISTINCT : 0);
+        }
+    }
+
+    private final class ResultSpliterator extends QuerySpliterator {
+
+        private ResultSpliterator(ResultSet result) {
+            super(result);
+        }
+
+        @Override
         public boolean tryAdvance(Consumer<? super Feature> action) {
             try {
                 if (result.next()) {
@@ -305,23 +359,84 @@ public class QueryFeatureSet extends AbstractFeatureSet {
         public Spliterator<Feature> trySplit() {
             return null;
         }
+    }
+
+    private static SQLBuilder fromQuery(final String query, final Connection 
conn) throws SQLException {
+        return new SQLBuilder(conn.getMetaData(), true)
+                .append(query);
+    }
+
+    /**
+     * An attempt to optimize feature loading through batching and potential 
parallelization. For now, it looks like
+     * there's not much improvement regarding to naive streaming approach. 
IMHO, two improvements would really impact
+     * performance positively if done:
+     * <ul>
+     *     <li>Optimisation of batch loading through {@link 
FeatureAdapter#prefetch(int, ResultSet)}</li>
+     *     <li>Better splitting balance, as stated by {@link 
Spliterator#trySplit()}</li>
+     * </ul>
+     */
+    private final class PrefetchSpliterator extends QuerySpliterator {
+
+        final int fetchSize;
+
+        int idx;
+        List<Feature> chunk;
+        /**
+         * According to {@link Spliterator#trySplit()} documentation, the 
original size estimation must be reduced after
+         * split to remain consistent.
+         */
+        long splittedAmount = 0;
+
+        private PrefetchSpliterator(ResultSet result) throws SQLException {
+            this(result, 0.5f);
+        }
+
+        private PrefetchSpliterator(ResultSet result, float fetchRatio) throws 
SQLException {
+            super(result);
+            this.fetchSize = Math.max((int) 
(result.getFetchSize()*fetchRatio), 1);
+        }
+
+        @Override
+        public boolean tryAdvance(Consumer<? super Feature> action) {
+            if (ensureChunkAvailable()) {
+                action.accept(chunk.get(idx++));
+                return true;
+            }
+            return false;
+        }
+
+        public Spliterator<Feature> trySplit() {
+            if (!ensureChunkAvailable()) return null;
+            final List<Feature> remainingChunk = chunk.subList(idx, 
chunk.size());
+            splittedAmount += remainingChunk.size();
+            final Spliterator<Feature> chunkSpliterator = idx == 0 ?
+                    chunk.spliterator() : remainingChunk.spliterator();
+            chunk = null;
+            idx = 0;
+            return chunkSpliterator;
+        }
 
         @Override
         public long estimateSize() {
-            // TODO: economic size estimation ? A count query seems overkill 
for the aim of this API. Howver, we could
-            // analyze user query in search for a limit value.
-            return originLimit > 0 ? originLimit : Long.MAX_VALUE;
+            return super.estimateSize() - splittedAmount;
         }
 
         @Override
         public int characteristics() {
-            // TODO: determine if it's sorted by analysing user query. SIZED 
is not possible, as limit is an upper threshold.
-            return Spliterator.IMMUTABLE | Spliterator.NONNULL;
+            return super.characteristics() | Spliterator.CONCURRENT;
         }
-    }
 
-    private static SQLBuilder fromQuery(final String query, final Connection 
conn) throws SQLException {
-        return new SQLBuilder(conn.getMetaData(), true)
-                .append(query);
+        private boolean ensureChunkAvailable() {
+            if (chunk == null || idx >= chunk.size()) {
+                idx = 0;
+                try {
+                    chunk = adapter.prefetch(fetchSize, result);
+                } catch (SQLException e) {
+                    throw new BackingStoreException(e);
+                }
+                return chunk != null && !chunk.isEmpty();
+            }
+            return true;
+        }
     }
 }
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java
new file mode 100644
index 0000000..bb519d9
--- /dev/null
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java
@@ -0,0 +1,150 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sis.internal.metadata.sql.Initializer;
+
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.sis.util.ArgumentChecks.ensureStrictlyPositive;
+
+@Fork(value = 2, jvmArgs = {"-server", "-Xmx2g"} )
+@Warmup(iterations = 2, time = 4, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 2, time = 4, timeUnit = TimeUnit.SECONDS)
+public class QuerySpliteratorsBench {
+
+    @State(Scope.Benchmark)
+    public static class DatabaseInput {
+
+        @Param({"1000", "100000"})
+        int numRows;
+
+        @Param({"true", "false"})
+        boolean parallel;
+
+        @Param({"true", "false"})
+        boolean prefetch;
+
+        @Param({"10", "100", "1000"})
+        int fetchSize;
+
+        @Param({"0.5", "1", "2"})
+        float fetchRatio;
+
+        EmbeddedDataSource db;
+        QueryFeatureSet fs;
+
+        public DatabaseInput() {}
+
+        @Setup(Level.Trial)
+        public void setup() throws SQLException {
+            ensureStrictlyPositive("Number of rows", numRows);
+
+            db = new EmbeddedDataSource();
+            db.setDatabaseName("memory:spliterators");
+            db.setDataSourceName("Apache SIS test database");
+            db.setCreateDatabase("create");
+
+            try (Connection c = db.getConnection()) {
+                c.createStatement().execute(
+                        "CREATE TABLE TEST (str CHARACTER VARYING(20), myInt 
INTEGER, myDouble DOUBLE)"
+                );
+                final PreparedStatement st = c.prepareStatement("INSERT INTO 
TEST values (?, ?, ?)");
+
+                final Random rand = new Random();
+                int rows = 1;
+                final byte[] txt = new byte[20];
+                do {
+                    for (int i = 0; i < 500 ; i++, rows++) {
+                        rand.nextBytes(txt);
+                        st.setString(1, new String(txt, 
StandardCharsets.US_ASCII));
+                        st.setInt(2, rand.nextInt());
+                        st.setDouble(3, rand.nextDouble());
+                        st.addBatch();
+                    }
+                    st.executeBatch();
+                    st.clearBatch();
+                } while (rows < numRows);
+
+                fs = new QueryFeatureSet("SELECT * FROM TEST", db, c);
+                fs.allowBatchLoading = prefetch;
+                fs.fetchSize = fetchSize;
+                fs.fetchRatio = fetchRatio;
+            }
+        }
+
+        @TearDown
+        public void dropDatabase() throws SQLException {
+            db.setCreateDatabase("no");
+            db.setConnectionAttributes("drop=true");
+            try {
+                db.getConnection().close();
+            } catch (SQLException e) {                          // This is the 
expected exception.
+                if (!Initializer.isSuccessfulShutdown(e)) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    @Benchmark
+    public void test(DatabaseInput input) throws SQLException {
+        final int sum = input.fs.features(input.parallel).mapToInt(f -> 
1).sum();
+        if (sum != input.numRows) throw new AssertionError("..." + sum + "..." 
+ "WTF ?!");
+    }
+/*
+    @Test
+    public void benchmark() throws Exception {
+        System.out.println("COMMON POOL: "+ 
ForkJoinPool.getCommonPoolParallelism());
+        final DatabaseInput db = new DatabaseInput();
+        db.numRows = 100000;
+        db.parallel = true;
+
+        long start = System.nanoTime();
+        db.setup();
+        System.out.println("Insertion time: 
"+((System.nanoTime()-start)/1e6)+" ms");
+
+        // warmup
+        for (int i = 0 ;  i < 5 ; i++) {
+            test(db);
+            test(db);
+        }
+
+        // go
+        long prefetch = 0, noprefetch = 0;
+        for (int i = 0 ; i < 100 ; i++) {
+            start = System.nanoTime();
+            test(db);
+            prefetch += System.nanoTime()-start;
+
+            start = System.nanoTime();
+            test(db);
+            noprefetch += System.nanoTime()-start;
+        }
+
+        System.out.println(String.format(
+                "Performances:%nP: %d%nI: %d",
+                (long) (prefetch / 1e7), (long) (noprefetch / 1e8)
+        ));
+    }
+*/
+    public static void main(String... args) throws Exception {
+        org.openjdk.jmh.Main.main(args);
+    }
+}
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
index 0e35edf..e54823d 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
@@ -1,35 +1,87 @@
 package org.apache.sis.internal.sql.feature;
 
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Optional;
 
 import org.opengis.util.GenericName;
 
+import org.apache.sis.feature.builder.FeatureTypeBuilder;
+import org.apache.sis.internal.feature.AttributeConvention;
 import org.apache.sis.storage.DataStoreContentException;
 
+/**
+ * Defines an application schema inferred from an SQL database (query, table, 
etc.). Implementations will be used by
+ * {@link Analyzer} to create an {@link FeatureAdapter adaptation layer to the 
feature model}. Default implementations
+ * can be retrieved for tables and queries respectively through {@link 
Analyzer#create(TableReference, TableReference)}
+ * and {@link Analyzer#create(PreparedStatement, String, GenericName)} methods.
+ */
 interface SQLTypeSpecification {
     /**
+     * Identifying name for the application schema. It is strongly recommended 
to be present, for SIS engine to be
+     * capable to create insightful models. However, in corner cases where no 
proper names could be provided, an empty
+     * value is allowed.
      *
-     * @return Name for the feature type to build. Nullable.
+     * @implNote SIS {@link FeatureTypeBuilder feature type builder} 
<em>requires</em> a name, and current
+     * {@link Analyzer#buildAdapter(SQLTypeSpecification) analysis 
implementation} will create a random UUID if
+     * necessary.
+     *
+     * @return Name for the feature type to build.
      * @throws SQLException If an error occurs while retrieving information 
from database.
      */
-    GenericName getName() throws SQLException;
+    Optional<GenericName> getName() throws SQLException;
 
     /**
+     * Gives an optional description of the application schema.This 
information is not necessary for any kind of
+     * computation, but allows to give end-user global information about the 
schema (s)he's manipulating.
      *
-     * @return A succint description of the data source. Nullable.
+     * @return A brief description of the data source.
      * @throws SQLException If an error occurs while retrieving information 
from database.
      */
-    String getDefinition() throws SQLException;
+    Optional<String> getDefinition() throws SQLException;
 
+    /**
+     * Primary key definition of source schema. Can be empty if no primary key 
is defined (Example: query definition).
+     *
+     * @return Primary key definition if any, otherwise an empty shell.
+     * @throws SQLException If an error occurs while exchanging information 
with underlying database.
+     */
     Optional<PrimaryKey> getPK() throws SQLException;
 
+    /**
+     *
+     * @return Ordered list of columns in application schema. Order is 
important, and will be relied upon to retrieve
+     *  {@link ResultSet#getObject(int) result values by index}.
+     */
     List<SQLColumn> getColumns();
 
+    /**
+     *
+     * @return All identified relations based on a foreign key in 
<em>current</em> application schema (1..1 or n..1).
+     * Corresponds to {@link Relation.Direction#IMPORT}. Can be empty but not 
null.
+     *
+     * @throws SQLException If an error occurs while exchanging information 
with underlying database.
+     */
     List<Relation> getImports() throws SQLException;
 
+    /**
+     *
+     * @return All identified relations based on foreign key located in 
<em>another</em> application schema (1..n).
+     * Corresponds to {@link Relation.Direction#EXPORT}. Can be empty but not 
null.
+     * @throws SQLException If an error occurs while exchanging information 
with underlying database.
+     * @throws DataStoreContentException If a schema problem is encountered.
+     */
     List<Relation> getExports() throws SQLException, DataStoreContentException;
 
-    default Optional<String> getPrimaryGeometryColumn() {return 
Optional.empty();}
+    /**
+     * In case target schema contains geographic information, this serves to 
identify without ambiguity which column
+     * contains what could be considered main geolocation (as stated by {@link 
AttributeConvention#GEOMETRY_PROPERTY}).
+     * This is a very important information in case application schema 
contains multiple geometric fields.
+     *
+     * @return The name of the column/attribute to be considered as main 
geometry information, or an empty shell if
+     * unknown.
+     */
+    default Optional<ColumnRef> getPrimaryGeometryColumn() {return 
Optional.empty();}
 }
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
index 890a9dd..a0e9888 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
@@ -78,13 +78,14 @@ class StreamSQL extends StreamDecoration<Feature> {
 
     private Consumer<SQLException> warningConsumer = e -> 
Logging.getLogger("sis.sql").log(Level.FINE, "Cannot properly close a 
connection", e);
 
-    StreamSQL(final Table source) {
-        this(new Features.Builder(source), source.source);
+    StreamSQL(final Table source, boolean parallel) {
+        this(new Features.Builder(source), source.source, parallel);
     }
 
-    StreamSQL(QueryBuilder queryAdapter, DataSource source) {
+    StreamSQL(QueryBuilder queryAdapter, DataSource source, boolean parallel) {
         this.queryAdapter = queryAdapter;
         this.source = source;
+        this.parallel = parallel;
     }
 
     public void setWarningConsumer(Consumer<SQLException> warningConsumer) {
@@ -171,8 +172,14 @@ class StreamSQL extends StreamDecoration<Feature> {
     @Override
     public long count() {
         // Avoid opening a connection if sql text cannot be evaluated.
-        final String sql = select().estimateStatement(true);
-        try (Connection conn = source.getConnection()) {
+        final String sql;
+        try {
+            sql = select().estimateStatement(true);
+        } catch (UnsupportedOperationException e) {
+            // If underlying connector does not support query estimation, we 
will fallback on brut-force counting.
+            return super.count();
+        }
+        try (Connection conn = QueryFeatureSet.connectReadOnly(source)) {
             try (Statement st = conn.createStatement();
                  ResultSet rs = st.executeQuery(sql)) {
                 if (rs.next()) {
@@ -199,7 +206,7 @@ class StreamSQL extends StreamDecoration<Feature> {
                 })
                 .onClose(() -> closeRef(connectionRef, true));
         if (peekAction != null) featureStream = featureStream.peek(peekAction);
-        return parallel ? featureStream : featureStream.parallel();
+        return parallel ? featureStream.parallel() : featureStream;
     }
 
     /**
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
index 80fc52b..81b8c75 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
@@ -418,7 +418,7 @@ final class Table extends AbstractFeatureSet {
      */
     @Override
     public Stream<Feature> features(final boolean parallel) throws 
DataStoreException {
-        return new StreamSQL(this);
+        return new StreamSQL(this, parallel);
     }
 
     /**
diff --git 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java
 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java
index 2c198a8..19c3151 100644
--- 
a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java
+++ 
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java
@@ -25,8 +25,13 @@
  * This package is for internal use by SIS only. Classes in this package
  * may change in incompatible ways in any future version without notice.
  *
+ * @implNote Feature type analysis is done through {@link 
org.apache.sis.internal.sql.feature.Analyzer} class.
+ * It relies on internal {@link 
org.apache.sis.internal.sql.feature.SQLTypeSpecification} API to fetch SQL 
schema
+ * information, and build {@link 
org.apache.sis.internal.sql.feature.FeatureAdapter an adapter to feature model 
from it}.
+ *
  * @author  Johann Sorel (Geomatys)
  * @author  Martin Desruisseaux (Geomatys)
+ * @author  Alexis Manin (Geomatys)
  * @version 1.0
  * @since   1.0
  * @module

Reply via email to