This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/sql-store in repository https://gitbox.apache.org/repos/asf/sis.git
commit 2e4946ccb239188cca7b3012588bac3e22a82ef5 Author: Alexis Manin <[email protected]> AuthorDate: Tue Aug 20 17:55:56 2019 +0200 feat(SQL-Store): add support for limit, offset and distinct operations via SQL calls through java.util.Stream API Still need to allow mixing map operations though --- .../sis/internal/metadata/sql/SQLBuilder.java | 11 ++ ...{DecoratedStream.java => StreamDecoration.java} | 94 ++++++----- .../java/org/apache/sis/util/ArgumentChecks.java | 30 +++- .../apache/sis/internal/sql/feature/ColumnRef.java | 60 +++++++ .../apache/sis/internal/sql/feature/Features.java | 176 +++++++++++++++++---- .../sis/internal/sql/feature/SpatialFunctions.java | 22 +-- .../apache/sis/internal/sql/feature/StreamSQL.java | 144 +++++++++++++++++ .../org/apache/sis/internal/sql/feature/Table.java | 110 +++---------- 8 files changed, 471 insertions(+), 176 deletions(-) diff --git a/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java b/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java index c4ab4c4..1cb7e8a 100644 --- a/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java +++ b/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java @@ -121,6 +121,17 @@ public class SQLBuilder { } /** + * Appends the given long. + * + * @param n the long to append. + * @return this builder, for method call chaining. + */ + public final SQLBuilder append(final long n) { + buffer.append(n); + return this; + } + + /** * Appends the given character. * * @param c the character to append. diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java similarity index 63% rename from core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java rename to core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java index eb398d9..53637b2 100644 --- a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java @@ -21,223 +21,219 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.Stream; -public class DecoratedStream<T> implements Stream<T> { - - final Stream<T> source; - - protected DecoratedStream(Stream<T> source) { - this.source = source; - } +public abstract class StreamDecoration<T> implements Stream<T> { @Override public Stream<T> filter(Predicate<? super T> predicate) { - return source.filter(predicate); + return getDecoratedStream().filter(predicate); } @Override public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { - return source.map(mapper); + return getDecoratedStream().map(mapper); } @Override public IntStream mapToInt(ToIntFunction<? super T> mapper) { - return source.mapToInt(mapper); + return getDecoratedStream().mapToInt(mapper); } @Override public LongStream mapToLong(ToLongFunction<? super T> mapper) { - return source.mapToLong(mapper); + return getDecoratedStream().mapToLong(mapper); } @Override public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { - return source.mapToDouble(mapper); + return getDecoratedStream().mapToDouble(mapper); } @Override public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) { - return source.flatMap(mapper); + return getDecoratedStream().flatMap(mapper); } @Override public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) { - return source.flatMapToInt(mapper); + return getDecoratedStream().flatMapToInt(mapper); } @Override public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) { - return source.flatMapToLong(mapper); + return getDecoratedStream().flatMapToLong(mapper); } @Override public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) { - return source.flatMapToDouble(mapper); + return getDecoratedStream().flatMapToDouble(mapper); } @Override public Stream<T> distinct() { - return source.distinct(); + return getDecoratedStream().distinct(); } @Override public Stream<T> sorted() { - return source.sorted(); + return getDecoratedStream().sorted(); } @Override public Stream<T> sorted(Comparator<? super T> comparator) { - return source.sorted(comparator); + return getDecoratedStream().sorted(comparator); } @Override public Stream<T> peek(Consumer<? super T> action) { - return source.peek(action); + return getDecoratedStream().peek(action); } @Override public Stream<T> limit(long maxSize) { - return source.limit(maxSize); + return getDecoratedStream().limit(maxSize); } @Override public Stream<T> skip(long n) { - return source.skip(n); + return getDecoratedStream().skip(n); } /* @Override public Stream<T> takeWhile(Predicate<? super T> predicate) { - return source.takeWhile(predicate); + return getDecoratedStream().takeWhile(predicate); } @Override public Stream<T> dropWhile(Predicate<? super T> predicate) { - return source.dropWhile(predicate); + return getDecoratedStream().dropWhile(predicate); } */ @Override public void forEach(Consumer<? super T> action) { - source.forEach(action); + getDecoratedStream().forEach(action); } @Override public void forEachOrdered(Consumer<? super T> action) { - source.forEachOrdered(action); + getDecoratedStream().forEachOrdered(action); } @Override public Object[] toArray() { - return source.toArray(); + return getDecoratedStream().toArray(); } @Override public <A> A[] toArray(IntFunction<A[]> generator) { - return source.toArray(generator); + return getDecoratedStream().toArray(generator); } @Override public T reduce(T identity, BinaryOperator<T> accumulator) { - return source.reduce(identity, accumulator); + return getDecoratedStream().reduce(identity, accumulator); } @Override public Optional<T> reduce(BinaryOperator<T> accumulator) { - return source.reduce(accumulator); + return getDecoratedStream().reduce(accumulator); } @Override public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { - return source.reduce(identity, accumulator, combiner); + return getDecoratedStream().reduce(identity, accumulator, combiner); } @Override public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { - return source.collect(supplier, accumulator, combiner); + return getDecoratedStream().collect(supplier, accumulator, combiner); } @Override public <R, A> R collect(Collector<? super T, A, R> collector) { - return source.collect(collector); + return getDecoratedStream().collect(collector); } @Override public Optional<T> min(Comparator<? super T> comparator) { - return source.min(comparator); + return getDecoratedStream().min(comparator); } @Override public Optional<T> max(Comparator<? super T> comparator) { - return source.max(comparator); + return getDecoratedStream().max(comparator); } @Override public long count() { - return source.count(); + return getDecoratedStream().count(); } @Override public boolean anyMatch(Predicate<? super T> predicate) { - return source.anyMatch(predicate); + return getDecoratedStream().anyMatch(predicate); } @Override public boolean allMatch(Predicate<? super T> predicate) { - return source.allMatch(predicate); + return getDecoratedStream().allMatch(predicate); } @Override public boolean noneMatch(Predicate<? super T> predicate) { - return source.noneMatch(predicate); + return getDecoratedStream().noneMatch(predicate); } @Override public Optional<T> findFirst() { - return source.findFirst(); + return getDecoratedStream().findFirst(); } @Override public Optional<T> findAny() { - return source.findAny(); + return getDecoratedStream().findAny(); } @Override public Iterator<T> iterator() { - return source.iterator(); + return getDecoratedStream().iterator(); } @Override public Spliterator<T> spliterator() { - return source.spliterator(); + return getDecoratedStream().spliterator(); } @Override public boolean isParallel() { - return source.isParallel(); + return getDecoratedStream().isParallel(); } @Override public Stream<T> sequential() { - return source.sequential(); + return getDecoratedStream().sequential(); } @Override public Stream<T> parallel() { - return source.parallel(); + return getDecoratedStream().parallel(); } @Override public Stream<T> unordered() { - return source.unordered(); + return getDecoratedStream().unordered(); } @Override public Stream<T> onClose(Runnable closeHandler) { - return source.onClose(closeHandler); + return getDecoratedStream().onClose(closeHandler); } @Override public void close() { - source.close(); + getDecoratedStream().close(); } + + protected abstract Stream<T> getDecoratedStream(); } diff --git a/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java b/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java index 0c22f0f..a4e3146 100644 --- a/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java +++ b/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java @@ -16,15 +16,17 @@ */ package org.apache.sis.util; -import java.util.Map; // For javadoc import java.util.BitSet; -import org.opengis.referencing.cs.CoordinateSystem; -import org.opengis.referencing.crs.CoordinateReferenceSystem; -import org.opengis.geometry.Envelope; +import java.util.Collection; +import java.util.Map; + import org.opengis.geometry.DirectPosition; +import org.opengis.geometry.Envelope; import org.opengis.geometry.MismatchedDimensionException; -import org.apache.sis.internal.util.Strings; +import org.opengis.referencing.crs.CoordinateReferenceSystem; +import org.opengis.referencing.cs.CoordinateSystem; +import org.apache.sis.internal.util.Strings; import org.apache.sis.util.resources.Errors; @@ -190,6 +192,24 @@ public final class ArgumentChecks extends Static { } /** + * Makes sure that given collection is non-null and non-empty. If it is null, then a {@link NullArgumentException} + * is thrown. Otherwise if it {@link Collection#isEmpty() is empty}, then an {@link IllegalArgumentException} is thrown. + * + * @param name the name of the argument to be checked. Used only if an exception is thrown. + * @param toCheck the user argument to check against null value and empty collection. + * @throws NullArgumentException if {@code toCheck} is null. + * @throws IllegalArgumentException if {@code toCheck} is empty. + */ + public static void ensureNonEmpty(final String name, final Collection<?> toCheck) { + if (toCheck == null) { + throw new NullArgumentException(Errors.format(Errors.Keys.NullArgument_1, name)); + } + if (toCheck.isEmpty()) { + throw new IllegalArgumentException(Errors.format(Errors.Keys.EmptyArgument_1, name)); + } + } + + /** * Ensures that the given {@code values} array is non-null and non-empty. This method can also ensures that all values * are between the given bounds (inclusive) and are distinct. The distinct values requirement is useful for validating * arrays of spatiotemporal dimension indices, where dimensions can not be repeated. diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java new file mode 100644 index 0000000..f2d229d --- /dev/null +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java @@ -0,0 +1,60 @@ +package org.apache.sis.internal.sql.feature; + +import java.util.Objects; + +import org.apache.sis.internal.metadata.sql.SQLBuilder; + +import static org.apache.sis.util.ArgumentChecks.ensureNonNull; + +/** + * A column reference. Specify name of the column, and optionally an alias to use for public visibility. + * By default, column has no alias. To create a column with an alias, use {@code ColumnRef myCol = new ColumnRef("colName).as("myAlias");} + */ +public final class ColumnRef { + final String name; + final String alias; + final String attrName; + + public ColumnRef(String name) { + ensureNonNull("Column name", name); + this.name = this.attrName = name; + alias = null; + } + + private ColumnRef(final String name, final String alias) { + ensureNonNull("Column alias", alias); + this.name = name; + this.alias = this.attrName = alias; + } + + ColumnRef as(final String alias) { + return new ColumnRef(name, alias); + } + + SQLBuilder append(final SQLBuilder target) { + target.appendIdentifier(name); + if (alias != null) { + target.append(" AS ").appendIdentifier(alias); + } + + return target; + } + + public String getAttributeName() { + return attrName; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ColumnRef)) return false; + ColumnRef columnRef = (ColumnRef) o; + return name.equals(columnRef.name) && + Objects.equals(alias, columnRef.alias); + } + + @Override + public int hashCode() { + return Objects.hash(name, alias); + } +} diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java index 57e9c1f..0934bf7 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java @@ -16,29 +16,37 @@ */ package org.apache.sis.internal.sql.feature; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.Collection; -import java.util.Spliterator; -import java.util.function.Consumer; +import java.lang.reflect.Array; import java.sql.Connection; import java.sql.DatabaseMetaData; -import java.sql.Statement; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.lang.reflect.Array; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.opengis.feature.Feature; +import org.opengis.feature.FeatureType; + import org.apache.sis.internal.metadata.sql.SQLBuilder; +import org.apache.sis.storage.DataStoreException; import org.apache.sis.storage.InternalDataStoreException; +import org.apache.sis.util.ArraysExt; import org.apache.sis.util.collection.BackingStoreException; import org.apache.sis.util.collection.WeakValueHashMap; -import org.apache.sis.util.ArraysExt; + +import static org.apache.sis.util.ArgumentChecks.ensureNonEmpty; +import static org.apache.sis.util.ArgumentChecks.ensureNonNull; // Branch-dependent imports -import org.opengis.feature.Feature; -import org.opengis.feature.FeatureType; /** @@ -141,25 +149,45 @@ final class Features implements Spliterator<Feature>, Runnable { /** * Creates a new iterator over the feature instances. + * TODO: This object is far too complicated. A builder of some sort should be used. We should even consider a + * third-party tool like JOOQ, which is a great abstraction for SQL query building. * * @param table the table for which we are creating an iterator. * @param connection connection to the database. - * @param attributeNames value of {@link Table#attributeNames}: where to store simple values. - * @param attributeColumns value of {@link Table#attributeColumns}: often the same as attribute names. - * @param importedKeys value of {@link Table#importedKeys}: targets of this table foreign keys. - * @param exportedKeys value of {@link Table#exportedKeys}: foreigner keys of other tables. + * @param columns Names of columns to read, along with an eventual alias. The alias (or name if no alias + * is provided) must match a property name in output feature type. * @param following the relations that we are following. Used for avoiding never ending loop. * @param noFollow relation to not follow, or {@code null} if none. + * @param distinct True if we should return only distinct result, false otherwise. + * @param offset An offset (nuber of rows to skip) in underlying SQL query. A negative or zeero value + * means no offset will be set. + * @param limit Maximum number of rows to return. Corresponds to a LIMIT statement in underlying SQL + * query. A negative or 0 value means no limit will be set. */ - Features(final Table table, final Connection connection, final String[] attributeNames, final String[] attributeColumns, - final Relation[] importedKeys, final Relation[] exportedKeys, final List<Relation> following, final Relation noFollow) + Features(final Table table, final Connection connection, final Collection<ColumnRef> columns, + final List<Relation> following, final Relation noFollow, + boolean distinct, final long offset, final long limit) throws SQLException, InternalDataStoreException { + ensureNonEmpty("Columns to fetch", columns); + String[] attributeColumns = new String[columns.size()]; + attributeNames = new String[attributeColumns.length]; + int i = 0; + for (ColumnRef column : columns) { + attributeColumns[i] = column.name; + attributeNames[i++] = column.getAttributeName(); + } this.featureType = table.featureType; - this.attributeNames = attributeNames; final DatabaseMetaData metadata = connection.getMetaData(); estimatedSize = following.isEmpty() ? table.countRows(metadata, true) : 0; + /* + * Create a SELECT clause with all columns that are ordinary attributes. Order matter, since 'Features' + * iterator will map the columns to the attributes listed in the 'attributeNames' array in that order. + * Moreover, we optionaly add a "distinct" clause on user request. + */ final SQLBuilder sql = new SQLBuilder(metadata, true).append("SELECT"); + if (distinct) sql.append(" DISTINCT"); + final Map<String,Integer> columnIndices = new HashMap<>(); /* * Create a SELECT clause with all columns that are ordinary attributes. @@ -169,12 +197,13 @@ final class Features implements Spliterator<Feature>, Runnable { for (String column : attributeColumns) { appendColumn(sql, column, columnIndices); } + /* * Collect information about associations in local arrays before to assign * them to the final fields, because some array lengths may be adjusted. */ - int importCount = (importedKeys != null) ? importedKeys.length : 0; - int exportCount = (exportedKeys != null) ? exportedKeys.length : 0; + int importCount = (table.importedKeys != null) ? table.importedKeys.length : 0; + int exportCount = (table.exportedKeys != null) ? table.exportedKeys.length : 0; int totalCount = importCount + exportCount; if (totalCount == 0) { dependencies = EMPTY; @@ -192,7 +221,7 @@ final class Features implements Spliterator<Feature>, Runnable { */ if (importCount != 0) { importCount = 0; // We will recount. - for (final Relation dependency : importedKeys) { + for (final Relation dependency : table.importedKeys) { if (dependency != noFollow) { dependency.startFollowing(following); // Safety against never-ending recursivity. associationNames [importCount] = dependency.propertyName; @@ -212,8 +241,8 @@ final class Features implements Spliterator<Feature>, Runnable { * associations we need to iterate over all "Parks" rows referencing the city. */ if (exportCount != 0) { - int i = importCount; - for (final Relation dependency : exportedKeys) { + i = importCount; + for (final Relation dependency : table.exportedKeys) { dependency.startFollowing(following); // Safety against never-ending recursivity. final Table foreigner = dependency.getSearchTable(); final Relation inverse = foreigner.getInverseOf(dependency, table.name); @@ -241,7 +270,13 @@ final class Features implements Spliterator<Feature>, Runnable { statement = null; instances = null; // A future SIS version could use the map opportunistically if it exists. keyComponentClass = null; - result = connection.createStatement().executeQuery(sql.toString()); + addOffsetLimit(sql, offset, limit); + final Statement statement = connection.createStatement(); + /* Why this parameter ? See: https://gitlab.geomatys.com/geomatys-group/knowledge-base/wikis/cookbook/jdbc + * TODO : allow parameterization ? + */ + statement.setFetchSize(100); + result = statement.executeQuery(sql.toString()); } else { final Relation componentOf = following.get(following.size() - 1); String separator = " WHERE "; @@ -249,6 +284,7 @@ final class Features implements Spliterator<Feature>, Runnable { sql.append(separator).appendIdentifier(primaryKey).append("=?"); separator = " AND "; } + addOffsetLimit(sql, offset, limit); statement = connection.prepareStatement(sql.toString()); /* * Following assumes that the foreigner key references the primary key of this table, @@ -266,11 +302,22 @@ final class Features implements Spliterator<Feature>, Runnable { } /** + * If a limit or an offset is appended, a space will be added beforehand to the given builder. + * @param toEdit The builder to add offset and limit to. + * @param offset The offset to use. If zero or negative, it will be ignored. + * @param limit the value for limit parameter. If zero or negative, it will be ignored. + */ + private static void addOffsetLimit(final SQLBuilder toEdit, final long offset, final long limit) { + if (limit > 0) toEdit.append(" LIMIT ").append(limit); + if (offset > 0) toEdit.append(" OFFSET ").append(offset); + } + + /** * Appends a columns in the given builder and remember the column indices. * An exception is thrown if the column has already been added (should never happen). */ private static int appendColumn(final SQLBuilder sql, final String column, - final Map<String,Integer> columnIndices) throws InternalDataStoreException + final Map<String,Integer> columnIndices) throws InternalDataStoreException { int columnCount = columnIndices.size(); if (columnCount != 0) sql.append(','); @@ -283,8 +330,7 @@ final class Features implements Spliterator<Feature>, Runnable { * Computes the 1-based indices of given columns, adding the columns in the given builder if necessary. */ private static int[] getColumnIndices(final SQLBuilder sql, final Collection<String> columns, - final Map<String,Integer> columnIndices) throws InternalDataStoreException - { + final Map<String,Integer> columnIndices) throws InternalDataStoreException { int i = 0; final int[] indices = new int[columns.size()]; for (final String column : columns) { @@ -496,4 +542,78 @@ final class Features implements Spliterator<Feature>, Runnable { throw new BackingStoreException(e); } } + + /** + * Useful to customiez value retrieval on result sets. Example: + * {@code + * SQLBiFunction<ResultSet, Integer, Integer> get = ResultSet::getInt; + * } + * @param <T> + * @param <U> + * @param <R> + */ + @FunctionalInterface + interface SQLBiFunction<T, U, R> { + R apply(T t, U u) throws SQLException; + + /** + * Returns a composed function that first applies this function to + * its input, and then applies the {@code after} function to the result. + * If evaluation of either function throws an exception, it is relayed to + * the caller of the composed function. + * + * @param <V> the type of output of the {@code after} function, and of the + * composed function + * @param after the function to apply after this function is applied + * @return a composed function that first applies this function and then + * applies the {@code after} function + * @throws NullPointerException if after is null + */ + default <V> SQLBiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) { + ensureNonNull("After function", after); + return (T t, U u) -> after.apply(apply(t, u)); + } + } + + static class Builder { + + final Table parent; + long limit, offset; + boolean distinct; + + Builder(Table parent) { + this.parent = parent; + } + + /** + * Warning : This does not work with relations. It is only a rough estimation of the parameterized query. + * @param count True if a count query must be generated. False for a simple selection. + * @return A text representing (roughly) the SQL query which will be posted. + * @throws SQLException If we cannot initialize an sql statement builder. + */ + String getSnapshot(final boolean count) throws SQLException { + final SQLBuilder sql = new SQLBuilder(parent.dbMeta, true).append("SELECT "); + if (count) sql.append("COUNT("); + if (distinct) sql.append("DISTINCT "); + // If we want a count and no distinct clause is specified, we can query it for a single column. + if (count && !distinct) sql.appendIdentifier(parent.attributes.get(0).name); + else { + final Iterator<ColumnRef> it = parent.attributes.iterator(); + sql.appendIdentifier(it.next().name); + while (it.hasNext()) { + sql.append(',').appendIdentifier(it.next().name); + } + } + + if (count) sql.append(')'); + sql.append(" FROM ").appendIdentifier(parent.name.catalog, parent.name.schema, parent.name.table); + addOffsetLimit(sql, offset, limit); + + return sql.toString(); + } + + Features build(final Connection conn) throws SQLException, DataStoreException { + return new Features(parent, conn, parent.attributes, new ArrayList<>(), null, distinct, offset, limit); + } + } } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java index 7ef4e90..d01dbdf 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java @@ -17,16 +17,18 @@ package org.apache.sis.internal.sql.feature; import java.math.BigDecimal; -import java.time.LocalDate; -import java.time.LocalTime; -import java.time.LocalDateTime; -import java.time.OffsetTime; -import java.time.OffsetDateTime; -import java.sql.Types; +import java.sql.DatabaseMetaData; +import java.sql.Date; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.DatabaseMetaData; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.OffsetDateTime; +import java.time.OffsetTime; + import org.opengis.referencing.crs.CoordinateReferenceSystem; + import org.apache.sis.internal.metadata.sql.Reflection; import org.apache.sis.setup.GeometryLibrary; @@ -112,9 +114,9 @@ class SpatialFunctions { case Types.CHAR: case Types.VARCHAR: case Types.LONGVARCHAR: return String.class; - case Types.DATE: return LocalDate.class; - case Types.TIME: return LocalTime.class; - case Types.TIMESTAMP: return LocalDateTime.class; + case Types.DATE: return Date.class; + case Types.TIME: return Time.class; + case Types.TIMESTAMP: return Timestamp.class; case Types.TIME_WITH_TIMEZONE: return OffsetTime.class; case Types.TIMESTAMP_WITH_TIMEZONE: return OffsetDateTime.class; case Types.BINARY: diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java new file mode 100644 index 0000000..7e948a3 --- /dev/null +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java @@ -0,0 +1,144 @@ +package org.apache.sis.internal.sql.feature; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.opengis.feature.Feature; + +import org.apache.sis.internal.util.StreamDecoration; +import org.apache.sis.storage.DataStoreException; +import org.apache.sis.util.collection.BackingStoreException; + +class StreamSQL extends StreamDecoration<Feature> { + + final Features.Builder queryBuilder; + boolean parallel; + + StreamSQL(final Table source) { + this(new Features.Builder(source)); + } + + StreamSQL(Features.Builder builder) { + this.queryBuilder = builder; + } + + @Override + public <R> Stream<R> map(Function<? super Feature, ? extends R> mapper) { + return super.map(mapper); + } + + @Override + public IntStream mapToInt(ToIntFunction<? super Feature> mapper) { + return super.mapToInt(mapper); + } + + @Override + public LongStream mapToLong(ToLongFunction<? super Feature> mapper) { + return super.mapToLong(mapper); + } + + @Override + public DoubleStream mapToDouble(ToDoubleFunction<? super Feature> mapper) { + return super.mapToDouble(mapper); + } + + @Override + public Stream<Feature> parallel() { + parallel = true; + return this; + } + + @Override + public Stream<Feature> sequential() { + parallel = false; + return this; + } + + @Override + public Stream<Feature> distinct() { + queryBuilder.distinct = true; + return this; + } + + @Override + public Stream<Feature> peek(Consumer<? super Feature> action) { + return super.peek(action); + } + + @Override + public Stream<Feature> limit(long maxSize) { + if (queryBuilder.limit < 1) queryBuilder.limit = maxSize; + else queryBuilder.limit = Math.min(queryBuilder.limit, maxSize); + return this; + } + + @Override + public Stream<Feature> skip(long n) { + queryBuilder.offset += n; + return this; + } + + @Override + public long count() { + // Avoid opening a connection if sql text cannot be evaluated. + final String sql; + try { + sql = queryBuilder.getSnapshot(true); + } catch (SQLException e) { + throw new BackingStoreException("Cannot create SQL COUNT query", e); + } + try (Connection conn = queryBuilder.parent.source.getConnection()) { + try (final Statement st = conn.createStatement(); final ResultSet rs = st.executeQuery(sql)) { + if (rs.next()) { + return rs.getLong(1); + } else return 0; + } + } catch (SQLException e) { + throw new BackingStoreException("Cannot estimate feature set size using SQL COUNT query", e); + } + } + + @Override + protected synchronized Stream<Feature> getDecoratedStream() { + final AtomicReference<Connection> connectionRef = new AtomicReference<>(); + return Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection())) + .map(Supplier::get) + .peek(connectionRef::set) + .flatMap(conn -> { + try { + final Features iter = queryBuilder.build(conn); + return StreamSupport.stream(iter, parallel).onClose(iter); + } catch (SQLException | DataStoreException e) { + throw new BackingStoreException(e); + } + }) + .onClose(() -> queryBuilder.parent.closeRef(connectionRef)); + } + + private static <T> Supplier<T> uncheck(final Callable<T> generator) { + return () -> { + try { + return generator.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new BackingStoreException(e); + } + }; + } +} diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java index 380669b..321561c 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java @@ -20,19 +20,16 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import javax.sql.DataSource; import org.opengis.feature.AttributeType; @@ -52,7 +49,6 @@ import org.apache.sis.internal.metadata.sql.SQLBuilder; import org.apache.sis.internal.metadata.sql.SQLUtilities; import org.apache.sis.internal.storage.AbstractFeatureSet; import org.apache.sis.internal.util.CollectionsExt; -import org.apache.sis.internal.util.DecoratedStream; import org.apache.sis.storage.DataStoreContentException; import org.apache.sis.storage.DataStoreException; import org.apache.sis.storage.InternalDataStoreException; @@ -60,7 +56,6 @@ import org.apache.sis.util.CharSequences; import org.apache.sis.util.Classes; import org.apache.sis.util.Debug; import org.apache.sis.util.Numbers; -import org.apache.sis.util.collection.BackingStoreException; import org.apache.sis.util.collection.TreeTable; import org.apache.sis.util.collection.WeakValueHashMap; @@ -83,7 +78,7 @@ final class Table extends AbstractFeatureSet { /** * Provider of (pooled) connections to the database. */ - private final DataSource source; + final DataSource source; /** * The structure of this table represented as a feature. Each feature attribute is a table column, @@ -98,18 +93,11 @@ final class Table extends AbstractFeatureSet { final TableReference name; /** - * Name of attributes in feature instances, excluding operations and associations to other tables. - * Those names are in the order of columns declared in the {@code SELECT <columns} statement. - * This array shall not be modified after construction. + * Name of all columns to fetch from database, optionally amended with an alias. Alias is used for feature type + * attributes which have been renamed to avoid name collisions. In any case, a call to {@link ColumnRef#getAttributeName()}} + * will return the name available in target feature type. */ - private final String[] attributeNames; - - /** - * Name of columns corresponding to each {@link #attributeNames}. This is often a reference to the - * same array than {@link #attributeNames}, but may be different if some attributes have been renamed - * for avoiding name collisions. - */ - private final String[] attributeColumns; + final List<ColumnRef> attributes; /** * The columns that constitute the primary key, or {@code null} if there is no primary key. @@ -120,13 +108,13 @@ final class Table extends AbstractFeatureSet { * The primary keys of other tables that are referenced by this table foreign key columns. * They are 0:1 relations. May be {@code null} if there is no imported keys. */ - private final Relation[] importedKeys; + final Relation[] importedKeys; /** * The foreign keys of other tables that reference this table primary key columns. * They are 0:N relations. May be {@code null} if there is no exported keys. */ - private final Relation[] exportedKeys; + final Relation[] exportedKeys; /** * The class of primary key values, or {@code null} if there is no primary keys. @@ -152,6 +140,11 @@ final class Table extends AbstractFeatureSet { final boolean hasGeometry; /** + * Keep a reference of target database metadata, to ease creation of {@link SQLBuilder}. + */ + final DatabaseMetaData dbMeta; + + /** * Creates a description of the table of the given name. * The table is identified by {@code id}, which contains a (catalog, schema, name) tuple. * The catalog and schema parts are optional and can be null, but the table is mandatory. @@ -165,6 +158,7 @@ final class Table extends AbstractFeatureSet { throws SQLException, DataStoreException { super(analyzer.listeners); + this.dbMeta = analyzer.metadata; this.source = analyzer.source; this.name = id; final String tableEsc = analyzer.escape(id.table); @@ -230,8 +224,7 @@ final class Table extends AbstractFeatureSet { boolean primaryKeyNonNull = true; boolean hasGeometry = false; int startWithLowerCase = 0; - final List<String> attributeNames = new ArrayList<>(); - final List<String> attributeColumns = new ArrayList<>(); + final List<ColumnRef> attributes = new ArrayList<>(); final FeatureTypeBuilder feature = new FeatureTypeBuilder(analyzer.nameFactory, analyzer.functions.library, analyzer.locale); try (ResultSet reflect = analyzer.metadata.getColumns(id.catalog, schemaEsc, tableEsc, null)) { while (reflect.next()) { @@ -251,6 +244,8 @@ final class Table extends AbstractFeatureSet { startWithLowerCase--; } } + + ColumnRef colRef = new ColumnRef(column); /* * Add the column as an attribute. Foreign keys are excluded (they will be replaced by associations), * except if the column is also a primary key. In the later case we need to keep that column because @@ -258,8 +253,6 @@ final class Table extends AbstractFeatureSet { */ AttributeTypeBuilder<?> attribute = null; if (isPrimaryKey || dependencies == null) { - attributeNames.add(column); - attributeColumns.add(column); final String typeName = reflect.getString(Reflection.TYPE_NAME); Class<?> type = analyzer.functions.toJavaType(reflect.getInt(Reflection.DATA_TYPE), typeName); if (type == null) { @@ -339,12 +332,14 @@ final class Table extends AbstractFeatureSet { */ if (attribute != null) { attribute.setName(analyzer.nameFactory.createGenericName(null, "pk", column)); - attributeNames.set(attributeNames.size() - 1, attribute.getName().toString()); + colRef = colRef.as(attribute.getName().toString()); attribute = null; } } } } + + attributes.add(colRef); } } /* @@ -420,9 +415,7 @@ final class Table extends AbstractFeatureSet { this.exportedKeys = toArray(exportedKeys); this.primaryKeyClass = primaryKeyClass; this.hasGeometry = hasGeometry; - this.attributeNames = attributeNames.toArray(new String[attributeNames.size()]); - this.attributeColumns = attributeColumns.equals(attributeNames) ? this.attributeNames - : attributeColumns.toArray(new String[attributeColumns.size()]); + this.attributes = Collections.unmodifiableList(attributes); } /** @@ -501,8 +494,8 @@ final class Table extends AbstractFeatureSet { @Debug final void appendTo(TreeTable.Node parent) { parent = Relation.newChild(parent, featureType.getName().toString()); - for (final String attribute : attributeNames) { - TableReference.newChild(parent, attribute); + for (final ColumnRef attribute : attributes) { + TableReference.newChild(parent, attribute.getAttributeName()); } appendAll(parent, importedKeys, " → "); appendAll(parent, exportedKeys, " ← "); @@ -610,22 +603,10 @@ final class Table extends AbstractFeatureSet { */ @Override public Stream<Feature> features(final boolean parallel) throws DataStoreException { - final AtomicReference<Connection> connectionRef = new AtomicReference<>(); - final Stream<Feature> featureStream = Stream.generate(uncheck(() -> source.getConnection())) - .peek(connectionRef::set) - .flatMap(conn -> { - try { - final Features iter = features(conn, new ArrayList<>(), null); - return StreamSupport.stream(iter, parallel).onClose(iter); - } catch (SQLException | InternalDataStoreException e) { - throw new BackingStoreException(e); - } - }) - .onClose(() -> closeRef(connectionRef)); - return new CountOverload<>(featureStream); + return new StreamSQL(this); } - private void closeRef(final AtomicReference<Connection> ref) { + void closeRef(final AtomicReference<Connection> ref) { final Connection conn = ref.get(); if (conn != null) { try { @@ -646,45 +627,6 @@ final class Table extends AbstractFeatureSet { final Features features(final Connection connection, final List<Relation> following, final Relation noFollow) throws SQLException, InternalDataStoreException { - return new Features(this, connection, attributeNames, attributeColumns, importedKeys, exportedKeys, following, noFollow); - } - - private class CountOverload<T> extends DecoratedStream<T> { - - CountOverload(Stream<T> source) { - super(source); - } - - @Override - public long count() { - try (Connection conn = Table.this.source.getConnection()) { - final String query = new SQLBuilder(conn.getMetaData(), true) - .append("SELECT COUNT(") - .appendIdentifier(attributeColumns[0]) - .append(')') - .append(" FROM ") - .appendIdentifier(name.catalog, name.schema, name.table) - .toString(); - try (final Statement st = conn.createStatement(); final ResultSet rs = st.executeQuery(query)) { - if (rs.next()) { - return rs.getLong(1); - } else return 0; - } - } catch (SQLException e) { - throw new BackingStoreException("Cannot estimate feature set size using SQL COUNT query", e); - } - } - } - - private static <T> Supplier<T> uncheck(final Callable<T> generator) { - return () -> { - try { - return generator.call(); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new BackingStoreException(e); - } - }; + return new Features(this, connection, attributes, following, noFollow, false, -1, -1); } }
