This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/sql-store in repository https://gitbox.apache.org/repos/asf/sis.git
commit 1fb97acfdf451475c2ab125ead3bef4eee6fc164 Author: Alexis Manin <[email protected]> AuthorDate: Tue Aug 20 17:55:56 2019 +0200 feat(SQL-Store): add support for limit, offset and distinct operations via SQL calls through java.util.Stream API Still need to allow mixing map operations though --- .../sis/internal/metadata/sql/SQLBuilder.java | 11 + .../sis/internal/util/BaseStreamDecoration.java | 104 ++++++ .../apache/sis/internal/util/DecoratedStream.java | 243 ------------- .../sis/internal/util/DoubleStreamDecoration.java | 201 +++++++++++ .../apache/sis/internal/util/StreamDecoration.java | 219 ++++++++++++ .../java/org/apache/sis/util/ArgumentChecks.java | 30 +- .../apache/sis/internal/sql/feature/ColumnRef.java | 60 ++++ .../apache/sis/internal/sql/feature/Features.java | 176 ++++++++-- .../sis/internal/sql/feature/SpatialFunctions.java | 22 +- .../apache/sis/internal/sql/feature/StreamSQL.java | 380 +++++++++++++++++++++ .../org/apache/sis/internal/sql/feature/Table.java | 110 ++---- .../org/apache/sis/storage/sql/SQLStoreTest.java | 97 +++++- .../java/org/apache/sis/storage/FeatureNaming.java | 5 +- 13 files changed, 1272 insertions(+), 386 deletions(-) diff --git a/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java b/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java index c4ab4c4..1cb7e8a 100644 --- a/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java +++ b/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java @@ -121,6 +121,17 @@ public class SQLBuilder { } /** + * Appends the given long. + * + * @param n the long to append. + * @return this builder, for method call chaining. + */ + public final SQLBuilder append(final long n) { + buffer.append(n); + return this; + } + + /** * Appends the given character. * * @param c the character to append. diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/BaseStreamDecoration.java b/core/sis-utility/src/main/java/org/apache/sis/internal/util/BaseStreamDecoration.java new file mode 100644 index 0000000..980aac1 --- /dev/null +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/BaseStreamDecoration.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.util; + +import java.util.Iterator; +import java.util.Spliterator; +import java.util.stream.BaseStream; +import javax.validation.constraints.NotNull; + +/** + * Delegates operations to an underlying stream provided by {@link #createDecoratedStream()}. Allows for custom logic + * decoration. See {@link StreamDecoration} for further details. + * + * @param <T> Type of values contained in the stream, as defined in {@link BaseStream} + * @param <S> Type of stream implementation, as defined in {@link BaseStream} + * + * @since 1.0 + * + * @author Alexis Manin (Geomatys) + */ +public abstract class BaseStreamDecoration<T, S extends BaseStream<T, S>> implements BaseStream<T, S> { + + private S decorated; + + private boolean closed; + + /** + * Get previously created wrapped stream, or create it if never done. + * @return The stream containing actual data. + */ + + protected final @NotNull S getOrCreate() { + if (closed) throw new IllegalStateException("Stream has already been closed."); + if (decorated == null) { + decorated = createDecoratedStream(); + } + + return decorated; + } + + /** + * Operation that creates underlying stream to delegate operations to as a last resort. Note that sub-classes should + * never call this method. Instead, please use {@link #getOrCreate()}, ensuring that decorated stream is created + * only once, then potentially re-used multiple times. + * + * @return A new, non-consumed stream to delegate actions to. + */ + protected abstract @NotNull S createDecoratedStream(); + + @Override + public void close() { + closed = true; + if (decorated != null) decorated.close(); + } + + @Override + public Iterator<T> iterator() { + return getOrCreate().iterator(); + } + + @Override + public Spliterator<T> spliterator() { + return getOrCreate().spliterator(); + } + + @Override + public boolean isParallel() { + return getOrCreate().isParallel(); + } + + @Override + public S sequential() { + return getOrCreate().sequential(); + } + + @Override + public S parallel() { + return getOrCreate().parallel(); + } + + @Override + public S unordered() { + return getOrCreate().unordered(); + } + + @Override + public S onClose(Runnable closeHandler) { + return getOrCreate().onClose(closeHandler); + } +} diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java deleted file mode 100644 index eb398d9..0000000 --- a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java +++ /dev/null @@ -1,243 +0,0 @@ -package org.apache.sis.internal.util; - -import java.util.Comparator; -import java.util.Iterator; -import java.util.Optional; -import java.util.Spliterator; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BinaryOperator; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.IntFunction; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.function.ToDoubleFunction; -import java.util.function.ToIntFunction; -import java.util.function.ToLongFunction; -import java.util.stream.Collector; -import java.util.stream.DoubleStream; -import java.util.stream.IntStream; -import java.util.stream.LongStream; -import java.util.stream.Stream; - -public class DecoratedStream<T> implements Stream<T> { - - final Stream<T> source; - - protected DecoratedStream(Stream<T> source) { - this.source = source; - } - - @Override - public Stream<T> filter(Predicate<? super T> predicate) { - return source.filter(predicate); - } - - @Override - public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { - return source.map(mapper); - } - - @Override - public IntStream mapToInt(ToIntFunction<? super T> mapper) { - return source.mapToInt(mapper); - } - - @Override - public LongStream mapToLong(ToLongFunction<? super T> mapper) { - return source.mapToLong(mapper); - } - - @Override - public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { - return source.mapToDouble(mapper); - } - - @Override - public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) { - return source.flatMap(mapper); - } - - @Override - public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) { - return source.flatMapToInt(mapper); - } - - @Override - public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) { - return source.flatMapToLong(mapper); - } - - @Override - public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) { - return source.flatMapToDouble(mapper); - } - - @Override - public Stream<T> distinct() { - return source.distinct(); - } - - @Override - public Stream<T> sorted() { - return source.sorted(); - } - - @Override - public Stream<T> sorted(Comparator<? super T> comparator) { - return source.sorted(comparator); - } - - @Override - public Stream<T> peek(Consumer<? super T> action) { - return source.peek(action); - } - - @Override - public Stream<T> limit(long maxSize) { - return source.limit(maxSize); - } - - @Override - public Stream<T> skip(long n) { - return source.skip(n); - } - -/* - @Override - public Stream<T> takeWhile(Predicate<? super T> predicate) { - return source.takeWhile(predicate); - } - - @Override - public Stream<T> dropWhile(Predicate<? super T> predicate) { - return source.dropWhile(predicate); - } -*/ - - @Override - public void forEach(Consumer<? super T> action) { - source.forEach(action); - } - - @Override - public void forEachOrdered(Consumer<? super T> action) { - source.forEachOrdered(action); - } - - @Override - public Object[] toArray() { - return source.toArray(); - } - - @Override - public <A> A[] toArray(IntFunction<A[]> generator) { - return source.toArray(generator); - } - - @Override - public T reduce(T identity, BinaryOperator<T> accumulator) { - return source.reduce(identity, accumulator); - } - - @Override - public Optional<T> reduce(BinaryOperator<T> accumulator) { - return source.reduce(accumulator); - } - - @Override - public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { - return source.reduce(identity, accumulator, combiner); - } - - @Override - public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { - return source.collect(supplier, accumulator, combiner); - } - - @Override - public <R, A> R collect(Collector<? super T, A, R> collector) { - return source.collect(collector); - } - - @Override - public Optional<T> min(Comparator<? super T> comparator) { - return source.min(comparator); - } - - @Override - public Optional<T> max(Comparator<? super T> comparator) { - return source.max(comparator); - } - - @Override - public long count() { - return source.count(); - } - - @Override - public boolean anyMatch(Predicate<? super T> predicate) { - return source.anyMatch(predicate); - } - - @Override - public boolean allMatch(Predicate<? super T> predicate) { - return source.allMatch(predicate); - } - - @Override - public boolean noneMatch(Predicate<? super T> predicate) { - return source.noneMatch(predicate); - } - - @Override - public Optional<T> findFirst() { - return source.findFirst(); - } - - @Override - public Optional<T> findAny() { - return source.findAny(); - } - - @Override - public Iterator<T> iterator() { - return source.iterator(); - } - - @Override - public Spliterator<T> spliterator() { - return source.spliterator(); - } - - @Override - public boolean isParallel() { - return source.isParallel(); - } - - @Override - public Stream<T> sequential() { - return source.sequential(); - } - - @Override - public Stream<T> parallel() { - return source.parallel(); - } - - @Override - public Stream<T> unordered() { - return source.unordered(); - } - - @Override - public Stream<T> onClose(Runnable closeHandler) { - return source.onClose(closeHandler); - } - - @Override - public void close() { - source.close(); - } -} diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DoubleStreamDecoration.java b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DoubleStreamDecoration.java new file mode 100644 index 0000000..22e0fe1 --- /dev/null +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DoubleStreamDecoration.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.util; + +import java.util.DoubleSummaryStatistics; +import java.util.OptionalDouble; +import java.util.PrimitiveIterator; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.DoubleBinaryOperator; +import java.util.function.DoubleConsumer; +import java.util.function.DoubleFunction; +import java.util.function.DoublePredicate; +import java.util.function.DoubleToIntFunction; +import java.util.function.DoubleToLongFunction; +import java.util.function.DoubleUnaryOperator; +import java.util.function.ObjDoubleConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * A specialization of {@link StreamDecoration} for {@link DoubleStream streams of double value}. + * + * @since 1.0 + * + * @author Alexis Manin (Geomatys) + */ +public abstract class DoubleStreamDecoration extends BaseStreamDecoration<Double, DoubleStream> implements DoubleStream { + + @Override + public DoubleStream filter(DoublePredicate predicate) { + return getOrCreate().filter(predicate); + } + + @Override + public DoubleStream map(DoubleUnaryOperator mapper) { + return getOrCreate().map(mapper); + } + + @Override + public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { + return getOrCreate().mapToObj(mapper); + } + + @Override + public IntStream mapToInt(DoubleToIntFunction mapper) { + return getOrCreate().mapToInt(mapper); + } + + @Override + public LongStream mapToLong(DoubleToLongFunction mapper) { + return getOrCreate().mapToLong(mapper); + } + + @Override + public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { + return getOrCreate().flatMap(mapper); + } + + @Override + public DoubleStream distinct() { + return getOrCreate().distinct(); + } + + @Override + public DoubleStream sorted() { + return getOrCreate().sorted(); + } + + @Override + public DoubleStream peek(DoubleConsumer action) { + return getOrCreate().peek(action); + } + + @Override + public DoubleStream limit(long maxSize) { + return getOrCreate().limit(maxSize); + } + + @Override + public DoubleStream skip(long n) { + return getOrCreate().skip(n); + } + + @Override + public void forEach(DoubleConsumer action) { + getOrCreate().forEach(action); + } + + @Override + public void forEachOrdered(DoubleConsumer action) { + getOrCreate().forEachOrdered(action); + } + + @Override + public double[] toArray() { + return getOrCreate().toArray(); + } + + @Override + public double reduce(double identity, DoubleBinaryOperator op) { + return getOrCreate().reduce(identity, op); + } + + @Override + public OptionalDouble reduce(DoubleBinaryOperator op) { + return getOrCreate().reduce(op); + } + + @Override + public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) { + return getOrCreate().collect(supplier, accumulator, combiner); + } + + @Override + public double sum() { + return getOrCreate().sum(); + } + + @Override + public OptionalDouble min() { + return getOrCreate().min(); + } + + @Override + public OptionalDouble max() { + return getOrCreate().max(); + } + + @Override + public long count() { + return getOrCreate().count(); + } + + @Override + public OptionalDouble average() { + return getOrCreate().average(); + } + + @Override + public DoubleSummaryStatistics summaryStatistics() { + return getOrCreate().summaryStatistics(); + } + + @Override + public boolean anyMatch(DoublePredicate predicate) { + return getOrCreate().anyMatch(predicate); + } + + @Override + public boolean allMatch(DoublePredicate predicate) { + return getOrCreate().allMatch(predicate); + } + + @Override + public boolean noneMatch(DoublePredicate predicate) { + return getOrCreate().noneMatch(predicate); + } + + @Override + public OptionalDouble findFirst() { + return getOrCreate().findFirst(); + } + + @Override + public OptionalDouble findAny() { + return getOrCreate().findAny(); + } + + @Override + public Stream<Double> boxed() { + return getOrCreate().boxed(); + } + + @Override + public PrimitiveIterator.OfDouble iterator() { + return getOrCreate().iterator(); + } + + @Override + public Spliterator.OfDouble spliterator() { + return getOrCreate().spliterator(); + } +} diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java new file mode 100644 index 0000000..75f143f --- /dev/null +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.util; + +import java.util.Comparator; +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; +import java.util.stream.Collector; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * Allows for delegation for a subset of behaviors of a stream to a custom component. + * How it works: This class is simply delegating all {@link Stream} interface operations to the one provided by + * {@link #createDecoratedStream()} implementation. It allows implementations to short-circuit a certain number of + * methods to provide their own solution. For example, if you've got a streamable dataset whose count can be provided + * efficiently by a third-party tool (typically COUNT query on SQL databases), you can create a {@link StreamDecoration} + * with an overrided count method running the count query instead of counting elements of the stream manually. + * + * Another exemple would be intermediate operations. Let's keep SQL queries as example. If you create a stream executing + * a statement on terminal operations, you could override {@link Stream#limit(long)} and {@link Stream#skip(long)} + * methods to set LIMIT and OFFSET criterias in an SQL query. + * + * For an advanced example, you can look at {@link org.apache.sis.internal.sql.feature.StreamSQL} class. + * + * @param <T> The type of objects contained in the stream, as specified in {@link Stream} interface. + * + * @since 1.0 + * + * @author Alexis Manin (Geomatys) + */ +public abstract class StreamDecoration<T> extends BaseStreamDecoration<T, Stream<T>> implements Stream<T> { + + @Override + public Stream<T> filter(Predicate<? super T> predicate) { + return getOrCreate().filter(predicate); + } + + @Override + public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { + return getOrCreate().map(mapper); + } + + @Override + public IntStream mapToInt(ToIntFunction<? super T> mapper) { + return getOrCreate().mapToInt(mapper); + } + + @Override + public LongStream mapToLong(ToLongFunction<? super T> mapper) { + return getOrCreate().mapToLong(mapper); + } + + @Override + public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { + return getOrCreate().mapToDouble(mapper); + } + + @Override + public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) { + return getOrCreate().flatMap(mapper); + } + + @Override + public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) { + return getOrCreate().flatMapToInt(mapper); + } + + @Override + public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) { + return getOrCreate().flatMapToLong(mapper); + } + + @Override + public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) { + return getOrCreate().flatMapToDouble(mapper); + } + + @Override + public Stream<T> distinct() { + return getOrCreate().distinct(); + } + + @Override + public Stream<T> sorted() { + return getOrCreate().sorted(); + } + + @Override + public Stream<T> sorted(Comparator<? super T> comparator) { + return getOrCreate().sorted(comparator); + } + + @Override + public Stream<T> peek(Consumer<? super T> action) { + return getOrCreate().peek(action); + } + + @Override + public Stream<T> limit(long maxSize) { + return getOrCreate().limit(maxSize); + } + + @Override + public Stream<T> skip(long n) { + return getOrCreate().skip(n); + } + + @Override + public void forEach(Consumer<? super T> action) { + getOrCreate().forEach(action); + } + + @Override + public void forEachOrdered(Consumer<? super T> action) { + getOrCreate().forEachOrdered(action); + } + + @Override + public Object[] toArray() { + return getOrCreate().toArray(); + } + + @Override + public <A> A[] toArray(IntFunction<A[]> generator) { + return getOrCreate().toArray(generator); + } + + @Override + public T reduce(T identity, BinaryOperator<T> accumulator) { + return getOrCreate().reduce(identity, accumulator); + } + + @Override + public Optional<T> reduce(BinaryOperator<T> accumulator) { + return getOrCreate().reduce(accumulator); + } + + @Override + public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { + return getOrCreate().reduce(identity, accumulator, combiner); + } + + @Override + public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { + return getOrCreate().collect(supplier, accumulator, combiner); + } + + @Override + public <R, A> R collect(Collector<? super T, A, R> collector) { + return getOrCreate().collect(collector); + } + + @Override + public Optional<T> min(Comparator<? super T> comparator) { + return getOrCreate().min(comparator); + } + + @Override + public Optional<T> max(Comparator<? super T> comparator) { + return getOrCreate().max(comparator); + } + + @Override + public long count() { + return getOrCreate().count(); + } + + @Override + public boolean anyMatch(Predicate<? super T> predicate) { + return getOrCreate().anyMatch(predicate); + } + + @Override + public boolean allMatch(Predicate<? super T> predicate) { + return getOrCreate().allMatch(predicate); + } + + @Override + public boolean noneMatch(Predicate<? super T> predicate) { + return getOrCreate().noneMatch(predicate); + } + + @Override + public Optional<T> findFirst() { + return getOrCreate().findFirst(); + } + + @Override + public Optional<T> findAny() { + return getOrCreate().findAny(); + } +} diff --git a/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java b/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java index 0c22f0f..a4e3146 100644 --- a/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java +++ b/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java @@ -16,15 +16,17 @@ */ package org.apache.sis.util; -import java.util.Map; // For javadoc import java.util.BitSet; -import org.opengis.referencing.cs.CoordinateSystem; -import org.opengis.referencing.crs.CoordinateReferenceSystem; -import org.opengis.geometry.Envelope; +import java.util.Collection; +import java.util.Map; + import org.opengis.geometry.DirectPosition; +import org.opengis.geometry.Envelope; import org.opengis.geometry.MismatchedDimensionException; -import org.apache.sis.internal.util.Strings; +import org.opengis.referencing.crs.CoordinateReferenceSystem; +import org.opengis.referencing.cs.CoordinateSystem; +import org.apache.sis.internal.util.Strings; import org.apache.sis.util.resources.Errors; @@ -190,6 +192,24 @@ public final class ArgumentChecks extends Static { } /** + * Makes sure that given collection is non-null and non-empty. If it is null, then a {@link NullArgumentException} + * is thrown. Otherwise if it {@link Collection#isEmpty() is empty}, then an {@link IllegalArgumentException} is thrown. + * + * @param name the name of the argument to be checked. Used only if an exception is thrown. + * @param toCheck the user argument to check against null value and empty collection. + * @throws NullArgumentException if {@code toCheck} is null. + * @throws IllegalArgumentException if {@code toCheck} is empty. + */ + public static void ensureNonEmpty(final String name, final Collection<?> toCheck) { + if (toCheck == null) { + throw new NullArgumentException(Errors.format(Errors.Keys.NullArgument_1, name)); + } + if (toCheck.isEmpty()) { + throw new IllegalArgumentException(Errors.format(Errors.Keys.EmptyArgument_1, name)); + } + } + + /** * Ensures that the given {@code values} array is non-null and non-empty. This method can also ensures that all values * are between the given bounds (inclusive) and are distinct. The distinct values requirement is useful for validating * arrays of spatiotemporal dimension indices, where dimensions can not be repeated. diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java new file mode 100644 index 0000000..f2d229d --- /dev/null +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java @@ -0,0 +1,60 @@ +package org.apache.sis.internal.sql.feature; + +import java.util.Objects; + +import org.apache.sis.internal.metadata.sql.SQLBuilder; + +import static org.apache.sis.util.ArgumentChecks.ensureNonNull; + +/** + * A column reference. Specify name of the column, and optionally an alias to use for public visibility. + * By default, column has no alias. To create a column with an alias, use {@code ColumnRef myCol = new ColumnRef("colName).as("myAlias");} + */ +public final class ColumnRef { + final String name; + final String alias; + final String attrName; + + public ColumnRef(String name) { + ensureNonNull("Column name", name); + this.name = this.attrName = name; + alias = null; + } + + private ColumnRef(final String name, final String alias) { + ensureNonNull("Column alias", alias); + this.name = name; + this.alias = this.attrName = alias; + } + + ColumnRef as(final String alias) { + return new ColumnRef(name, alias); + } + + SQLBuilder append(final SQLBuilder target) { + target.appendIdentifier(name); + if (alias != null) { + target.append(" AS ").appendIdentifier(alias); + } + + return target; + } + + public String getAttributeName() { + return attrName; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ColumnRef)) return false; + ColumnRef columnRef = (ColumnRef) o; + return name.equals(columnRef.name) && + Objects.equals(alias, columnRef.alias); + } + + @Override + public int hashCode() { + return Objects.hash(name, alias); + } +} diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java index 57e9c1f..0934bf7 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java @@ -16,29 +16,37 @@ */ package org.apache.sis.internal.sql.feature; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.Collection; -import java.util.Spliterator; -import java.util.function.Consumer; +import java.lang.reflect.Array; import java.sql.Connection; import java.sql.DatabaseMetaData; -import java.sql.Statement; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.lang.reflect.Array; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.opengis.feature.Feature; +import org.opengis.feature.FeatureType; + import org.apache.sis.internal.metadata.sql.SQLBuilder; +import org.apache.sis.storage.DataStoreException; import org.apache.sis.storage.InternalDataStoreException; +import org.apache.sis.util.ArraysExt; import org.apache.sis.util.collection.BackingStoreException; import org.apache.sis.util.collection.WeakValueHashMap; -import org.apache.sis.util.ArraysExt; + +import static org.apache.sis.util.ArgumentChecks.ensureNonEmpty; +import static org.apache.sis.util.ArgumentChecks.ensureNonNull; // Branch-dependent imports -import org.opengis.feature.Feature; -import org.opengis.feature.FeatureType; /** @@ -141,25 +149,45 @@ final class Features implements Spliterator<Feature>, Runnable { /** * Creates a new iterator over the feature instances. + * TODO: This object is far too complicated. A builder of some sort should be used. We should even consider a + * third-party tool like JOOQ, which is a great abstraction for SQL query building. * * @param table the table for which we are creating an iterator. * @param connection connection to the database. - * @param attributeNames value of {@link Table#attributeNames}: where to store simple values. - * @param attributeColumns value of {@link Table#attributeColumns}: often the same as attribute names. - * @param importedKeys value of {@link Table#importedKeys}: targets of this table foreign keys. - * @param exportedKeys value of {@link Table#exportedKeys}: foreigner keys of other tables. + * @param columns Names of columns to read, along with an eventual alias. The alias (or name if no alias + * is provided) must match a property name in output feature type. * @param following the relations that we are following. Used for avoiding never ending loop. * @param noFollow relation to not follow, or {@code null} if none. + * @param distinct True if we should return only distinct result, false otherwise. + * @param offset An offset (nuber of rows to skip) in underlying SQL query. A negative or zeero value + * means no offset will be set. + * @param limit Maximum number of rows to return. Corresponds to a LIMIT statement in underlying SQL + * query. A negative or 0 value means no limit will be set. */ - Features(final Table table, final Connection connection, final String[] attributeNames, final String[] attributeColumns, - final Relation[] importedKeys, final Relation[] exportedKeys, final List<Relation> following, final Relation noFollow) + Features(final Table table, final Connection connection, final Collection<ColumnRef> columns, + final List<Relation> following, final Relation noFollow, + boolean distinct, final long offset, final long limit) throws SQLException, InternalDataStoreException { + ensureNonEmpty("Columns to fetch", columns); + String[] attributeColumns = new String[columns.size()]; + attributeNames = new String[attributeColumns.length]; + int i = 0; + for (ColumnRef column : columns) { + attributeColumns[i] = column.name; + attributeNames[i++] = column.getAttributeName(); + } this.featureType = table.featureType; - this.attributeNames = attributeNames; final DatabaseMetaData metadata = connection.getMetaData(); estimatedSize = following.isEmpty() ? table.countRows(metadata, true) : 0; + /* + * Create a SELECT clause with all columns that are ordinary attributes. Order matter, since 'Features' + * iterator will map the columns to the attributes listed in the 'attributeNames' array in that order. + * Moreover, we optionaly add a "distinct" clause on user request. + */ final SQLBuilder sql = new SQLBuilder(metadata, true).append("SELECT"); + if (distinct) sql.append(" DISTINCT"); + final Map<String,Integer> columnIndices = new HashMap<>(); /* * Create a SELECT clause with all columns that are ordinary attributes. @@ -169,12 +197,13 @@ final class Features implements Spliterator<Feature>, Runnable { for (String column : attributeColumns) { appendColumn(sql, column, columnIndices); } + /* * Collect information about associations in local arrays before to assign * them to the final fields, because some array lengths may be adjusted. */ - int importCount = (importedKeys != null) ? importedKeys.length : 0; - int exportCount = (exportedKeys != null) ? exportedKeys.length : 0; + int importCount = (table.importedKeys != null) ? table.importedKeys.length : 0; + int exportCount = (table.exportedKeys != null) ? table.exportedKeys.length : 0; int totalCount = importCount + exportCount; if (totalCount == 0) { dependencies = EMPTY; @@ -192,7 +221,7 @@ final class Features implements Spliterator<Feature>, Runnable { */ if (importCount != 0) { importCount = 0; // We will recount. - for (final Relation dependency : importedKeys) { + for (final Relation dependency : table.importedKeys) { if (dependency != noFollow) { dependency.startFollowing(following); // Safety against never-ending recursivity. associationNames [importCount] = dependency.propertyName; @@ -212,8 +241,8 @@ final class Features implements Spliterator<Feature>, Runnable { * associations we need to iterate over all "Parks" rows referencing the city. */ if (exportCount != 0) { - int i = importCount; - for (final Relation dependency : exportedKeys) { + i = importCount; + for (final Relation dependency : table.exportedKeys) { dependency.startFollowing(following); // Safety against never-ending recursivity. final Table foreigner = dependency.getSearchTable(); final Relation inverse = foreigner.getInverseOf(dependency, table.name); @@ -241,7 +270,13 @@ final class Features implements Spliterator<Feature>, Runnable { statement = null; instances = null; // A future SIS version could use the map opportunistically if it exists. keyComponentClass = null; - result = connection.createStatement().executeQuery(sql.toString()); + addOffsetLimit(sql, offset, limit); + final Statement statement = connection.createStatement(); + /* Why this parameter ? See: https://gitlab.geomatys.com/geomatys-group/knowledge-base/wikis/cookbook/jdbc + * TODO : allow parameterization ? + */ + statement.setFetchSize(100); + result = statement.executeQuery(sql.toString()); } else { final Relation componentOf = following.get(following.size() - 1); String separator = " WHERE "; @@ -249,6 +284,7 @@ final class Features implements Spliterator<Feature>, Runnable { sql.append(separator).appendIdentifier(primaryKey).append("=?"); separator = " AND "; } + addOffsetLimit(sql, offset, limit); statement = connection.prepareStatement(sql.toString()); /* * Following assumes that the foreigner key references the primary key of this table, @@ -266,11 +302,22 @@ final class Features implements Spliterator<Feature>, Runnable { } /** + * If a limit or an offset is appended, a space will be added beforehand to the given builder. + * @param toEdit The builder to add offset and limit to. + * @param offset The offset to use. If zero or negative, it will be ignored. + * @param limit the value for limit parameter. If zero or negative, it will be ignored. + */ + private static void addOffsetLimit(final SQLBuilder toEdit, final long offset, final long limit) { + if (limit > 0) toEdit.append(" LIMIT ").append(limit); + if (offset > 0) toEdit.append(" OFFSET ").append(offset); + } + + /** * Appends a columns in the given builder and remember the column indices. * An exception is thrown if the column has already been added (should never happen). */ private static int appendColumn(final SQLBuilder sql, final String column, - final Map<String,Integer> columnIndices) throws InternalDataStoreException + final Map<String,Integer> columnIndices) throws InternalDataStoreException { int columnCount = columnIndices.size(); if (columnCount != 0) sql.append(','); @@ -283,8 +330,7 @@ final class Features implements Spliterator<Feature>, Runnable { * Computes the 1-based indices of given columns, adding the columns in the given builder if necessary. */ private static int[] getColumnIndices(final SQLBuilder sql, final Collection<String> columns, - final Map<String,Integer> columnIndices) throws InternalDataStoreException - { + final Map<String,Integer> columnIndices) throws InternalDataStoreException { int i = 0; final int[] indices = new int[columns.size()]; for (final String column : columns) { @@ -496,4 +542,78 @@ final class Features implements Spliterator<Feature>, Runnable { throw new BackingStoreException(e); } } + + /** + * Useful to customiez value retrieval on result sets. Example: + * {@code + * SQLBiFunction<ResultSet, Integer, Integer> get = ResultSet::getInt; + * } + * @param <T> + * @param <U> + * @param <R> + */ + @FunctionalInterface + interface SQLBiFunction<T, U, R> { + R apply(T t, U u) throws SQLException; + + /** + * Returns a composed function that first applies this function to + * its input, and then applies the {@code after} function to the result. + * If evaluation of either function throws an exception, it is relayed to + * the caller of the composed function. + * + * @param <V> the type of output of the {@code after} function, and of the + * composed function + * @param after the function to apply after this function is applied + * @return a composed function that first applies this function and then + * applies the {@code after} function + * @throws NullPointerException if after is null + */ + default <V> SQLBiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) { + ensureNonNull("After function", after); + return (T t, U u) -> after.apply(apply(t, u)); + } + } + + static class Builder { + + final Table parent; + long limit, offset; + boolean distinct; + + Builder(Table parent) { + this.parent = parent; + } + + /** + * Warning : This does not work with relations. It is only a rough estimation of the parameterized query. + * @param count True if a count query must be generated. False for a simple selection. + * @return A text representing (roughly) the SQL query which will be posted. + * @throws SQLException If we cannot initialize an sql statement builder. + */ + String getSnapshot(final boolean count) throws SQLException { + final SQLBuilder sql = new SQLBuilder(parent.dbMeta, true).append("SELECT "); + if (count) sql.append("COUNT("); + if (distinct) sql.append("DISTINCT "); + // If we want a count and no distinct clause is specified, we can query it for a single column. + if (count && !distinct) sql.appendIdentifier(parent.attributes.get(0).name); + else { + final Iterator<ColumnRef> it = parent.attributes.iterator(); + sql.appendIdentifier(it.next().name); + while (it.hasNext()) { + sql.append(',').appendIdentifier(it.next().name); + } + } + + if (count) sql.append(')'); + sql.append(" FROM ").appendIdentifier(parent.name.catalog, parent.name.schema, parent.name.table); + addOffsetLimit(sql, offset, limit); + + return sql.toString(); + } + + Features build(final Connection conn) throws SQLException, DataStoreException { + return new Features(parent, conn, parent.attributes, new ArrayList<>(), null, distinct, offset, limit); + } + } } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java index 7ef4e90..d01dbdf 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java @@ -17,16 +17,18 @@ package org.apache.sis.internal.sql.feature; import java.math.BigDecimal; -import java.time.LocalDate; -import java.time.LocalTime; -import java.time.LocalDateTime; -import java.time.OffsetTime; -import java.time.OffsetDateTime; -import java.sql.Types; +import java.sql.DatabaseMetaData; +import java.sql.Date; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.DatabaseMetaData; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.OffsetDateTime; +import java.time.OffsetTime; + import org.opengis.referencing.crs.CoordinateReferenceSystem; + import org.apache.sis.internal.metadata.sql.Reflection; import org.apache.sis.setup.GeometryLibrary; @@ -112,9 +114,9 @@ class SpatialFunctions { case Types.CHAR: case Types.VARCHAR: case Types.LONGVARCHAR: return String.class; - case Types.DATE: return LocalDate.class; - case Types.TIME: return LocalTime.class; - case Types.TIMESTAMP: return LocalDateTime.class; + case Types.DATE: return Date.class; + case Types.TIME: return Time.class; + case Types.TIMESTAMP: return Timestamp.class; case Types.TIME_WITH_TIMEZONE: return OffsetTime.class; case Types.TIMESTAMP_WITH_TIMEZONE: return OffsetDateTime.class; case Types.BINARY: diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java new file mode 100644 index 0000000..f6d8c3d --- /dev/null +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.sql.feature; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.DoubleConsumer; +import java.util.function.DoubleFunction; +import java.util.function.DoubleUnaryOperator; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.opengis.feature.Feature; + +import org.apache.sis.internal.util.DoubleStreamDecoration; +import org.apache.sis.internal.util.StreamDecoration; +import org.apache.sis.storage.DataStoreException; +import org.apache.sis.util.collection.BackingStoreException; + +import static org.apache.sis.util.ArgumentChecks.ensureNonNull; + +/** + * Manages query lifecycle and optimizations. Operations like {@link #count()}, {@link #distinct()}, {@link #skip(long)} + * and {@link #limit(long)} are delegated to underlying SQL database. This class consistently propagate optimisation + * strategies through streams obtained using {@link #map(Function)}, {@link #mapToDouble(ToDoubleFunction)} and + * {@link #peek(Consumer)} operations. However, for result consistency, no optimization is stacked once either + * {@link #filter(Predicate)} or {@link #flatMap(Function)} operations are called, as they modify browing flow (the + * count of stream elements is not bound 1 to 1 to query result rows). + * + * @since 1.0 + * + * @author Alexis Manin (Geomatys) + * + */ +class StreamSQL extends StreamDecoration<Feature> { + + final Features.Builder queryBuilder; + boolean parallel; + + private Consumer<? super Feature> peekAction; + + StreamSQL(final Table source) { + this(new Features.Builder(source)); + } + + StreamSQL(Features.Builder builder) { + this.queryBuilder = builder; + } + + @Override + public <R> Stream<R> map(Function<? super Feature, ? extends R> mapper) { + return new MappedStream<>(mapper, this); + } + + @Override + public IntStream mapToInt(ToIntFunction<? super Feature> mapper) { + return super.mapToInt(mapper); + } + + @Override + public LongStream mapToLong(ToLongFunction<? super Feature> mapper) { + return super.mapToLong(mapper); + } + + @Override + public DoubleStream mapToDouble(ToDoubleFunction<? super Feature> mapper) { + return super.mapToDouble(mapper); + } + + @Override + public Stream<Feature> parallel() { + parallel = true; + return this; + } + + @Override + public Stream<Feature> sequential() { + parallel = false; + return this; + } + + @Override + public Stream<Feature> distinct() { + queryBuilder.distinct = true; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public Stream<Feature> peek(Consumer<? super Feature> action) { + if (peekAction == null) { + peekAction = action; + } else { + // Safe cast, because Stream values are strongly typed to O. + peekAction = peekAction.andThen((Consumer) action); + } + + return this; + } + + @Override + public Stream<Feature> limit(long maxSize) { + if (queryBuilder.limit < 1) queryBuilder.limit = maxSize; + else queryBuilder.limit = Math.min(queryBuilder.limit, maxSize); + return this; + } + + @Override + public Stream<Feature> skip(long n) { + queryBuilder.offset += n; + return this; + } + + @Override + public long count() { + // Avoid opening a connection if sql text cannot be evaluated. + final String sql; + try { + sql = queryBuilder.getSnapshot(true); + } catch (SQLException e) { + throw new BackingStoreException("Cannot create SQL COUNT query", e); + } + try (Connection conn = queryBuilder.parent.source.getConnection()) { + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(sql)) { + if (rs.next()) { + return rs.getLong(1); + } else return 0; + } + } catch (SQLException e) { + throw new BackingStoreException("Cannot estimate feature set size using SQL COUNT query", e); + } + } + + @Override + protected synchronized Stream<Feature> createDecoratedStream() { + final AtomicReference<Connection> connectionRef = new AtomicReference<>(); + Stream<Feature> featureStream = Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection())) + .map(Supplier::get) + .peek(connectionRef::set) + .flatMap(conn -> { + try { + final Features iter = queryBuilder.build(conn); + return StreamSupport.stream(iter, parallel).onClose(iter); + } catch (SQLException | DataStoreException e) { + throw new BackingStoreException(e); + } + }) + .onClose(() -> queryBuilder.parent.closeRef(connectionRef)); + if (peekAction != null) featureStream = featureStream.peek(peekAction); + return featureStream; + } + + /** + * Transform a callable into supplier by catching any potential verified exception and rethrowing it as a {@link BackingStoreException}. + * @param generator The callable to use in a non-verified error context. Must not be null. + * @param <T> The return type of input callable. + * @return A supplier that delegates work to given callable, wrapping any verified exception in the process. + */ + private static <T> Supplier<T> uncheck(final Callable<T> generator) { + ensureNonNull("Generator", generator); + return () -> { + try { + return generator.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new BackingStoreException(e); + } + }; + } + + /** + * Describes a stream on which a {@link Stream#map(Function) mapping operation} has been set. It serves to delegate + * optimizable operation to underlying sql stream (which could be an indirect parent). + * + * @param <I> Type of object received as input of mapping operation. + * @param <O> Return type of mapping operation. + */ + private static final class MappedStream<I, O> extends StreamDecoration<O> { + private Function<? super I, ? extends O> mapper; + private Stream<I> source; + + private MappedStream(Function<? super I, ? extends O> mapper, Stream<I> source) { + this.mapper = mapper; + this.source = source; + } + + @Override + public Stream<O> peek(Consumer<? super O> action) { + mapper = concatenate(mapper, action); + return this; + } + + @Override + public Stream<O> distinct() { + source = source.distinct(); + return this; + } + + @Override + public Stream<O> limit(long maxSize) { + source = source.limit(maxSize); + return this; + } + + @Override + public Stream<O> skip(long n) { + source = source.skip(n); + return this; + } + + @Override + public long count() { + return source.count(); + } + + @Override + public boolean isParallel() { + return source.isParallel(); + } + + @Override + public Stream<O> sequential() { + source = source.sequential(); + return this; + } + + @Override + public Stream<O> parallel() { + source = source.parallel(); + return this; + } + + @Override + public <R> Stream<R> map(Function<? super O, ? extends R> mapper) { + return new MappedStream<>(this.mapper.andThen(mapper), source); + } + + @Override + public DoubleStream mapToDouble(ToDoubleFunction<? super O> mapper) { + return new ToDoubleStream<I>(source, i -> mapper.applyAsDouble(this.mapper.apply(i))); + } + + @Override + protected Stream<O> createDecoratedStream() { + // Break possible infinite loop by sinking source content through its spliterator (terminal op). + final Stream<I> sink = StreamSupport.stream(source.spliterator(), source.isParallel()); + sink.onClose(source::close); + return sink.map(mapper); + } + } + + /** + * Same purpose as {@link MappedStream}, but specialized for {@link Stream#mapToDouble(ToDoubleFunction) double mapping} + * operations. + * + * @param <T> Type of objects contained in source stream (before double mapping). + */ + private static final class ToDoubleStream<T> extends DoubleStreamDecoration { + + Stream<T> source; + ToDoubleFunction<T> toDouble; + + private ToDoubleStream(Stream<T> source, ToDoubleFunction<T> toDouble) { + this.source = source; + this.toDouble = toDouble; + } + + @Override + public DoubleStream peek(DoubleConsumer action) { + final ToDoubleFunction<T> toDoubleFixedRef = toDouble; + toDouble = t -> { + final double value = toDoubleFixedRef.applyAsDouble(t); + action.accept(value); + return value; + }; + return this; + } + + @Override + public DoubleStream map(DoubleUnaryOperator mapper) { + return new ToDoubleStream<T>(source, t -> mapper.applyAsDouble(toDouble.applyAsDouble(t))); + } + + @Override + public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { + return new MappedStream<T, U>(t -> mapper.apply(toDouble.applyAsDouble(t)), source); + } + + @Override + public DoubleStream distinct() { + source = source.distinct(); + return this; + } + + @Override + public DoubleStream limit(long maxSize) { + source = source.limit(maxSize); + return this; + } + + @Override + public DoubleStream skip(long n) { + source = source.skip(n); + return this; + } + + @Override + public long count() { + return source.count(); + } + + @Override + public Stream<Double> boxed() { + return new MappedStream<>(t -> Double.valueOf(toDouble.applyAsDouble(t)), source); + } + + @Override + public boolean isParallel() { + return source.isParallel(); + } + + @Override + public DoubleStream sequential() { + source = source.sequential(); + return this; + } + + @Override + public DoubleStream parallel() { + source = source.parallel(); + return this; + } + + @Override + protected DoubleStream createDecoratedStream() { + // Break possible cycle by sinking source content through its spliterator (terminal op). + final Stream<T> sink = StreamSupport.stream(source.spliterator(), source.isParallel()); + sink.onClose(source::close); + return sink.mapToDouble(toDouble); + } + } + + private static <I, O> Function<? super I, ? extends O> concatenate(final Function<? super I, ? extends O> function, final Consumer<? super O> consumer) { + return i -> { + final O o = function.apply(i); + consumer.accept(o); + return o; + }; + } +} diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java index 380669b..321561c 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java @@ -20,19 +20,16 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import javax.sql.DataSource; import org.opengis.feature.AttributeType; @@ -52,7 +49,6 @@ import org.apache.sis.internal.metadata.sql.SQLBuilder; import org.apache.sis.internal.metadata.sql.SQLUtilities; import org.apache.sis.internal.storage.AbstractFeatureSet; import org.apache.sis.internal.util.CollectionsExt; -import org.apache.sis.internal.util.DecoratedStream; import org.apache.sis.storage.DataStoreContentException; import org.apache.sis.storage.DataStoreException; import org.apache.sis.storage.InternalDataStoreException; @@ -60,7 +56,6 @@ import org.apache.sis.util.CharSequences; import org.apache.sis.util.Classes; import org.apache.sis.util.Debug; import org.apache.sis.util.Numbers; -import org.apache.sis.util.collection.BackingStoreException; import org.apache.sis.util.collection.TreeTable; import org.apache.sis.util.collection.WeakValueHashMap; @@ -83,7 +78,7 @@ final class Table extends AbstractFeatureSet { /** * Provider of (pooled) connections to the database. */ - private final DataSource source; + final DataSource source; /** * The structure of this table represented as a feature. Each feature attribute is a table column, @@ -98,18 +93,11 @@ final class Table extends AbstractFeatureSet { final TableReference name; /** - * Name of attributes in feature instances, excluding operations and associations to other tables. - * Those names are in the order of columns declared in the {@code SELECT <columns} statement. - * This array shall not be modified after construction. + * Name of all columns to fetch from database, optionally amended with an alias. Alias is used for feature type + * attributes which have been renamed to avoid name collisions. In any case, a call to {@link ColumnRef#getAttributeName()}} + * will return the name available in target feature type. */ - private final String[] attributeNames; - - /** - * Name of columns corresponding to each {@link #attributeNames}. This is often a reference to the - * same array than {@link #attributeNames}, but may be different if some attributes have been renamed - * for avoiding name collisions. - */ - private final String[] attributeColumns; + final List<ColumnRef> attributes; /** * The columns that constitute the primary key, or {@code null} if there is no primary key. @@ -120,13 +108,13 @@ final class Table extends AbstractFeatureSet { * The primary keys of other tables that are referenced by this table foreign key columns. * They are 0:1 relations. May be {@code null} if there is no imported keys. */ - private final Relation[] importedKeys; + final Relation[] importedKeys; /** * The foreign keys of other tables that reference this table primary key columns. * They are 0:N relations. May be {@code null} if there is no exported keys. */ - private final Relation[] exportedKeys; + final Relation[] exportedKeys; /** * The class of primary key values, or {@code null} if there is no primary keys. @@ -152,6 +140,11 @@ final class Table extends AbstractFeatureSet { final boolean hasGeometry; /** + * Keep a reference of target database metadata, to ease creation of {@link SQLBuilder}. + */ + final DatabaseMetaData dbMeta; + + /** * Creates a description of the table of the given name. * The table is identified by {@code id}, which contains a (catalog, schema, name) tuple. * The catalog and schema parts are optional and can be null, but the table is mandatory. @@ -165,6 +158,7 @@ final class Table extends AbstractFeatureSet { throws SQLException, DataStoreException { super(analyzer.listeners); + this.dbMeta = analyzer.metadata; this.source = analyzer.source; this.name = id; final String tableEsc = analyzer.escape(id.table); @@ -230,8 +224,7 @@ final class Table extends AbstractFeatureSet { boolean primaryKeyNonNull = true; boolean hasGeometry = false; int startWithLowerCase = 0; - final List<String> attributeNames = new ArrayList<>(); - final List<String> attributeColumns = new ArrayList<>(); + final List<ColumnRef> attributes = new ArrayList<>(); final FeatureTypeBuilder feature = new FeatureTypeBuilder(analyzer.nameFactory, analyzer.functions.library, analyzer.locale); try (ResultSet reflect = analyzer.metadata.getColumns(id.catalog, schemaEsc, tableEsc, null)) { while (reflect.next()) { @@ -251,6 +244,8 @@ final class Table extends AbstractFeatureSet { startWithLowerCase--; } } + + ColumnRef colRef = new ColumnRef(column); /* * Add the column as an attribute. Foreign keys are excluded (they will be replaced by associations), * except if the column is also a primary key. In the later case we need to keep that column because @@ -258,8 +253,6 @@ final class Table extends AbstractFeatureSet { */ AttributeTypeBuilder<?> attribute = null; if (isPrimaryKey || dependencies == null) { - attributeNames.add(column); - attributeColumns.add(column); final String typeName = reflect.getString(Reflection.TYPE_NAME); Class<?> type = analyzer.functions.toJavaType(reflect.getInt(Reflection.DATA_TYPE), typeName); if (type == null) { @@ -339,12 +332,14 @@ final class Table extends AbstractFeatureSet { */ if (attribute != null) { attribute.setName(analyzer.nameFactory.createGenericName(null, "pk", column)); - attributeNames.set(attributeNames.size() - 1, attribute.getName().toString()); + colRef = colRef.as(attribute.getName().toString()); attribute = null; } } } } + + attributes.add(colRef); } } /* @@ -420,9 +415,7 @@ final class Table extends AbstractFeatureSet { this.exportedKeys = toArray(exportedKeys); this.primaryKeyClass = primaryKeyClass; this.hasGeometry = hasGeometry; - this.attributeNames = attributeNames.toArray(new String[attributeNames.size()]); - this.attributeColumns = attributeColumns.equals(attributeNames) ? this.attributeNames - : attributeColumns.toArray(new String[attributeColumns.size()]); + this.attributes = Collections.unmodifiableList(attributes); } /** @@ -501,8 +494,8 @@ final class Table extends AbstractFeatureSet { @Debug final void appendTo(TreeTable.Node parent) { parent = Relation.newChild(parent, featureType.getName().toString()); - for (final String attribute : attributeNames) { - TableReference.newChild(parent, attribute); + for (final ColumnRef attribute : attributes) { + TableReference.newChild(parent, attribute.getAttributeName()); } appendAll(parent, importedKeys, " → "); appendAll(parent, exportedKeys, " ← "); @@ -610,22 +603,10 @@ final class Table extends AbstractFeatureSet { */ @Override public Stream<Feature> features(final boolean parallel) throws DataStoreException { - final AtomicReference<Connection> connectionRef = new AtomicReference<>(); - final Stream<Feature> featureStream = Stream.generate(uncheck(() -> source.getConnection())) - .peek(connectionRef::set) - .flatMap(conn -> { - try { - final Features iter = features(conn, new ArrayList<>(), null); - return StreamSupport.stream(iter, parallel).onClose(iter); - } catch (SQLException | InternalDataStoreException e) { - throw new BackingStoreException(e); - } - }) - .onClose(() -> closeRef(connectionRef)); - return new CountOverload<>(featureStream); + return new StreamSQL(this); } - private void closeRef(final AtomicReference<Connection> ref) { + void closeRef(final AtomicReference<Connection> ref) { final Connection conn = ref.get(); if (conn != null) { try { @@ -646,45 +627,6 @@ final class Table extends AbstractFeatureSet { final Features features(final Connection connection, final List<Relation> following, final Relation noFollow) throws SQLException, InternalDataStoreException { - return new Features(this, connection, attributeNames, attributeColumns, importedKeys, exportedKeys, following, noFollow); - } - - private class CountOverload<T> extends DecoratedStream<T> { - - CountOverload(Stream<T> source) { - super(source); - } - - @Override - public long count() { - try (Connection conn = Table.this.source.getConnection()) { - final String query = new SQLBuilder(conn.getMetaData(), true) - .append("SELECT COUNT(") - .appendIdentifier(attributeColumns[0]) - .append(')') - .append(" FROM ") - .appendIdentifier(name.catalog, name.schema, name.table) - .toString(); - try (final Statement st = conn.createStatement(); final ResultSet rs = st.executeQuery(query)) { - if (rs.next()) { - return rs.getLong(1); - } else return 0; - } - } catch (SQLException e) { - throw new BackingStoreException("Cannot estimate feature set size using SQL COUNT query", e); - } - } - } - - private static <T> Supplier<T> uncheck(final Callable<T> generator) { - return () -> { - try { - return generator.call(); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new BackingStoreException(e); - } - }; + return new Features(this, connection, attributes, following, noFollow, false, -1, -1); } } diff --git a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java index 7269f20..271bdb7 100644 --- a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java +++ b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java @@ -16,26 +16,37 @@ */ package org.apache.sis.storage.sql; -import java.util.Map; -import java.util.HashMap; -import java.util.HashSet; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; + +import org.opengis.feature.AttributeType; +import org.opengis.feature.Feature; +import org.opengis.feature.FeatureAssociationRole; +import org.opengis.feature.FeatureType; +import org.opengis.feature.PropertyType; + +import org.apache.sis.storage.DataStoreException; import org.apache.sis.storage.FeatureSet; import org.apache.sis.storage.StorageConnector; -import org.apache.sis.test.sql.TestDatabase; import org.apache.sis.test.TestCase; +import org.apache.sis.test.sql.TestDatabase; + import org.junit.Test; -import static org.apache.sis.test.Assert.*; +import static org.apache.sis.test.Assert.assertEquals; +import static org.apache.sis.test.Assert.assertInstanceOf; +import static org.apache.sis.test.Assert.assertNotEquals; +import static org.apache.sis.test.Assert.assertNotNull; +import static org.apache.sis.test.Assert.assertSame; +import static org.apache.sis.test.Assert.assertTrue; +import static org.apache.sis.test.Assert.fail; // Branch-dependent imports -import org.opengis.feature.Feature; -import org.opengis.feature.FeatureType; -import org.opengis.feature.PropertyType; -import org.opengis.feature.AttributeType; -import org.opengis.feature.FeatureAssociationRole; /** @@ -52,6 +63,13 @@ public final strictfp class SQLStoreTest extends TestCase { */ private static final String SCHEMA = "features"; + private static final int[] POPULATIONS = { + 13622267, // Tokyo, 2016. + 2206488, // Paris, 2017. + 1704694, // Montréal, 2016. + 531902 // Québec, 2016. + }; + /** * Number of time that the each country has been seen while iterating over the cities. */ @@ -130,6 +148,10 @@ public final strictfp class SQLStoreTest extends TestCase { try (Stream<Feature> features = cities.features(false)) { features.forEach((f) -> verifyContent(f)); } + + // Now, we'll check that overloaded stream operations are functionally stable, even stacked. + verifyStreamOperations(cities); + } } assertEquals(Integer.valueOf(2), countryCount.remove("CAN")); @@ -139,6 +161,53 @@ public final strictfp class SQLStoreTest extends TestCase { } /** + * Checks that operations stacked on feature stream are well executed. This test focus on mapping and peeking + * actions overloaded by sql streams. We'd like to test skip and limit operations too, but ignore it for now, + * because ordering of results matters for such a test. + * + * @implNote Most of stream operations used here are meaningless. We just want to ensure that the pipeline does not + * skip any operation. + * + * @param cities The feature set to read from. We expect a feature set containing all cities defined for the test + * class. + * @throws DataStoreException Let's propagate any error raised by input feature set. + */ + private static void verifyStreamOperations(final FeatureSet cities) throws DataStoreException { + try (Stream<Feature> features = cities.features(false)) { + final AtomicInteger peekCount = new AtomicInteger(); + final AtomicInteger mapCount = new AtomicInteger(); + final long populations = features.peek(f -> peekCount.incrementAndGet()) + .peek(f -> peekCount.incrementAndGet()) + .map(f -> { + mapCount.incrementAndGet(); + return f; + }) + .peek(f -> peekCount.incrementAndGet()) + .map(f -> { + mapCount.incrementAndGet(); + return f; + }) + .map(f -> f.getPropertyValue("population")) + .mapToDouble(obj -> ((Number) obj).doubleValue()) + .peek(f -> peekCount.incrementAndGet()) + .peek(f -> peekCount.incrementAndGet()) + .boxed() + .mapToDouble(d -> {mapCount.incrementAndGet(); return d;}) + .mapToObj(d -> {mapCount.incrementAndGet(); return d;}) + .mapToDouble(d -> {mapCount.incrementAndGet(); return d;}) + .map(d -> {mapCount.incrementAndGet(); return d;}) + .mapToLong(d -> (long) d) + .sum(); + + long expectedPopulations = 0; + for (long pop : POPULATIONS) expectedPopulations += pop; + assertEquals("Overall population count via Stream pipeline", expectedPopulations, populations); + assertEquals("Number of mapping (by element in the stream)", 24, mapCount.get()); + assertEquals("Number of peeking (by element in the stream)", 20, peekCount.get()); + } + } + + /** * Verifies the result of analyzing the structure of the {@code "Cities"} table. */ private static void verifyFeatureType(final FeatureType type, final String[] expectedNames, final Object[] expectedTypes) { @@ -178,7 +247,7 @@ public final strictfp class SQLStoreTest extends TestCase { englishName = "Tōkyō"; country = "JPN"; countryName = "日本"; - population = 13622267; // In 2016. + population = POPULATIONS[0]; parks = new String[] {"Yoyogi-kōen", "Shinjuku Gyoen"}; break; } @@ -186,7 +255,7 @@ public final strictfp class SQLStoreTest extends TestCase { englishName = "Paris"; country = "FRA"; countryName = "France"; - population = 2206488; // In 2017. + population = POPULATIONS[1]; parks = new String[] {"Tuileries Garden", "Luxembourg Garden"}; break; } @@ -194,7 +263,7 @@ public final strictfp class SQLStoreTest extends TestCase { englishName = "Montreal"; country = "CAN"; countryName = "Canada"; - population = 1704694; // In 2016. + population = POPULATIONS[2]; isCanada = true; parks = new String[] {"Mount Royal"}; break; @@ -203,7 +272,7 @@ public final strictfp class SQLStoreTest extends TestCase { englishName = "Quebec"; country = "CAN"; countryName = "Canada"; - population = 531902; // In 2016. + population = POPULATIONS[3]; isCanada = true; parks = new String[] {}; break; diff --git a/storage/sis-storage/src/main/java/org/apache/sis/storage/FeatureNaming.java b/storage/sis-storage/src/main/java/org/apache/sis/storage/FeatureNaming.java index 62e9235..e38d57c 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/storage/FeatureNaming.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/storage/FeatureNaming.java @@ -36,7 +36,8 @@ import org.apache.sis.internal.storage.Resources; * actually puts no restriction on the kind of object associated to {@code GenericName}s; * {@link DataStore} implementations are free to choose their internal object. * Those objects can be stored and fetched using the {@code String} representation of their name - * as given by {@link GenericName#toString()}, or a shortened name when there is no ambiguity. + * as given by {@link GenericName#toString()}, or a shortened name when there is no ambiguity. Note that search is + * case sensitive. * * <div class="note"><b>Example:</b> * a data store may contain a {@code FeatureType} named {@code "foo:bar"}. @@ -146,7 +147,7 @@ public class FeatureNaming<E> { } /** - * Returns the value associated to the given name. + * Returns the value associated to the given name. Case sensitive. * * @param store the data store for which to get a value, or {@code null} if unknown. * @param name the name for which to get a value.
