This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/sql-store in repository https://gitbox.apache.org/repos/asf/sis.git
commit cd98b745530bb91b268a01ebce248eb859dd30f4 Author: Alexis Manin <[email protected]> AuthorDate: Wed Aug 21 13:17:34 2019 +0200 feat(SQL-Store): improve stream decoration to handle mappings to/from DoubleStream. --- .../sis/internal/util/BaseStreamDecoration.java | 68 ++++++++ .../sis/internal/util/DoubleStreamDecoration.java | 178 +++++++++++++++++++++ .../apache/sis/internal/util/StreamDecoration.java | 110 ++++--------- .../apache/sis/internal/sql/feature/StreamSQL.java | 149 ++++++++++++++++- 4 files changed, 426 insertions(+), 79 deletions(-) diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/BaseStreamDecoration.java b/core/sis-utility/src/main/java/org/apache/sis/internal/util/BaseStreamDecoration.java new file mode 100644 index 0000000..f20e32d --- /dev/null +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/BaseStreamDecoration.java @@ -0,0 +1,68 @@ +package org.apache.sis.internal.util; + +import java.util.Iterator; +import java.util.Spliterator; +import java.util.stream.BaseStream; + +public abstract class BaseStreamDecoration<T, S extends BaseStream<T, S>> implements BaseStream<T, S> { + + private S decorated; + + private boolean closed; + + /** + * Get previously created wrapped stream, or create it if never done. + * @return + */ + protected final S getOrCreate() { + if (closed) throw new IllegalStateException("Stream has already been closed."); + if (decorated == null) { + decorated = createDecoratedStream(); + } + + return decorated; + } + + protected abstract S createDecoratedStream(); + + @Override + public void close() { + closed = true; + if (decorated != null) decorated.close(); + } + + @Override + public Iterator<T> iterator() { + return getOrCreate().iterator(); + } + + @Override + public Spliterator<T> spliterator() { + return getOrCreate().spliterator(); + } + + @Override + public boolean isParallel() { + return getOrCreate().isParallel(); + } + + @Override + public S sequential() { + return getOrCreate().sequential(); + } + + @Override + public S parallel() { + return getOrCreate().parallel(); + } + + @Override + public S unordered() { + return getOrCreate().unordered(); + } + + @Override + public S onClose(Runnable closeHandler) { + return getOrCreate().onClose(closeHandler); + } +} diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DoubleStreamDecoration.java b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DoubleStreamDecoration.java new file mode 100644 index 0000000..1215169 --- /dev/null +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DoubleStreamDecoration.java @@ -0,0 +1,178 @@ +package org.apache.sis.internal.util; + +import java.util.DoubleSummaryStatistics; +import java.util.OptionalDouble; +import java.util.PrimitiveIterator; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.DoubleBinaryOperator; +import java.util.function.DoubleConsumer; +import java.util.function.DoubleFunction; +import java.util.function.DoublePredicate; +import java.util.function.DoubleToIntFunction; +import java.util.function.DoubleToLongFunction; +import java.util.function.DoubleUnaryOperator; +import java.util.function.ObjDoubleConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +public abstract class DoubleStreamDecoration extends BaseStreamDecoration<Double, DoubleStream> implements DoubleStream { + + @Override + public DoubleStream filter(DoublePredicate predicate) { + return getOrCreate().filter(predicate); + } + + @Override + public DoubleStream map(DoubleUnaryOperator mapper) { + return getOrCreate().map(mapper); + } + + @Override + public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { + return getOrCreate().mapToObj(mapper); + } + + @Override + public IntStream mapToInt(DoubleToIntFunction mapper) { + return getOrCreate().mapToInt(mapper); + } + + @Override + public LongStream mapToLong(DoubleToLongFunction mapper) { + return getOrCreate().mapToLong(mapper); + } + + @Override + public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { + return getOrCreate().flatMap(mapper); + } + + @Override + public DoubleStream distinct() { + return getOrCreate().distinct(); + } + + @Override + public DoubleStream sorted() { + return getOrCreate().sorted(); + } + + @Override + public DoubleStream peek(DoubleConsumer action) { + return getOrCreate().peek(action); + } + + @Override + public DoubleStream limit(long maxSize) { + return getOrCreate().limit(maxSize); + } + + @Override + public DoubleStream skip(long n) { + return getOrCreate().skip(n); + } + + @Override + public void forEach(DoubleConsumer action) { + getOrCreate().forEach(action); + } + + @Override + public void forEachOrdered(DoubleConsumer action) { + getOrCreate().forEachOrdered(action); + } + + @Override + public double[] toArray() { + return getOrCreate().toArray(); + } + + @Override + public double reduce(double identity, DoubleBinaryOperator op) { + return getOrCreate().reduce(identity, op); + } + + @Override + public OptionalDouble reduce(DoubleBinaryOperator op) { + return getOrCreate().reduce(op); + } + + @Override + public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) { + return getOrCreate().collect(supplier, accumulator, combiner); + } + + @Override + public double sum() { + return getOrCreate().sum(); + } + + @Override + public OptionalDouble min() { + return getOrCreate().min(); + } + + @Override + public OptionalDouble max() { + return getOrCreate().max(); + } + + @Override + public long count() { + return getOrCreate().count(); + } + + @Override + public OptionalDouble average() { + return getOrCreate().average(); + } + + @Override + public DoubleSummaryStatistics summaryStatistics() { + return getOrCreate().summaryStatistics(); + } + + @Override + public boolean anyMatch(DoublePredicate predicate) { + return getOrCreate().anyMatch(predicate); + } + + @Override + public boolean allMatch(DoublePredicate predicate) { + return getOrCreate().allMatch(predicate); + } + + @Override + public boolean noneMatch(DoublePredicate predicate) { + return getOrCreate().noneMatch(predicate); + } + + @Override + public OptionalDouble findFirst() { + return getOrCreate().findFirst(); + } + + @Override + public OptionalDouble findAny() { + return getOrCreate().findAny(); + } + + @Override + public Stream<Double> boxed() { + return getOrCreate().boxed(); + } + + @Override + public PrimitiveIterator.OfDouble iterator() { + return getOrCreate().iterator(); + } + + @Override + public Spliterator.OfDouble spliterator() { + return getOrCreate().spliterator(); + } +} diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java index 53637b2..45cd0f8 100644 --- a/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java @@ -1,9 +1,7 @@ package org.apache.sis.internal.util; import java.util.Comparator; -import java.util.Iterator; import java.util.Optional; -import java.util.Spliterator; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.BinaryOperator; @@ -21,81 +19,81 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.Stream; -public abstract class StreamDecoration<T> implements Stream<T> { +public abstract class StreamDecoration<T> extends BaseStreamDecoration<T, Stream<T>> implements Stream<T> { @Override public Stream<T> filter(Predicate<? super T> predicate) { - return getDecoratedStream().filter(predicate); + return getOrCreate().filter(predicate); } @Override public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { - return getDecoratedStream().map(mapper); + return getOrCreate().map(mapper); } @Override public IntStream mapToInt(ToIntFunction<? super T> mapper) { - return getDecoratedStream().mapToInt(mapper); + return getOrCreate().mapToInt(mapper); } @Override public LongStream mapToLong(ToLongFunction<? super T> mapper) { - return getDecoratedStream().mapToLong(mapper); + return getOrCreate().mapToLong(mapper); } @Override public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { - return getDecoratedStream().mapToDouble(mapper); + return getOrCreate().mapToDouble(mapper); } @Override public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) { - return getDecoratedStream().flatMap(mapper); + return getOrCreate().flatMap(mapper); } @Override public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) { - return getDecoratedStream().flatMapToInt(mapper); + return getOrCreate().flatMapToInt(mapper); } @Override public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) { - return getDecoratedStream().flatMapToLong(mapper); + return getOrCreate().flatMapToLong(mapper); } @Override public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) { - return getDecoratedStream().flatMapToDouble(mapper); + return getOrCreate().flatMapToDouble(mapper); } @Override public Stream<T> distinct() { - return getDecoratedStream().distinct(); + return getOrCreate().distinct(); } @Override public Stream<T> sorted() { - return getDecoratedStream().sorted(); + return getOrCreate().sorted(); } @Override public Stream<T> sorted(Comparator<? super T> comparator) { - return getDecoratedStream().sorted(comparator); + return getOrCreate().sorted(comparator); } @Override public Stream<T> peek(Consumer<? super T> action) { - return getDecoratedStream().peek(action); + return getOrCreate().peek(action); } @Override public Stream<T> limit(long maxSize) { - return getDecoratedStream().limit(maxSize); + return getOrCreate().limit(maxSize); } @Override public Stream<T> skip(long n) { - return getDecoratedStream().skip(n); + return getOrCreate().skip(n); } /* @@ -112,128 +110,86 @@ public abstract class StreamDecoration<T> implements Stream<T> { @Override public void forEach(Consumer<? super T> action) { - getDecoratedStream().forEach(action); + getOrCreate().forEach(action); } @Override public void forEachOrdered(Consumer<? super T> action) { - getDecoratedStream().forEachOrdered(action); + getOrCreate().forEachOrdered(action); } @Override public Object[] toArray() { - return getDecoratedStream().toArray(); + return getOrCreate().toArray(); } @Override public <A> A[] toArray(IntFunction<A[]> generator) { - return getDecoratedStream().toArray(generator); + return getOrCreate().toArray(generator); } @Override public T reduce(T identity, BinaryOperator<T> accumulator) { - return getDecoratedStream().reduce(identity, accumulator); + return getOrCreate().reduce(identity, accumulator); } @Override public Optional<T> reduce(BinaryOperator<T> accumulator) { - return getDecoratedStream().reduce(accumulator); + return getOrCreate().reduce(accumulator); } @Override public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { - return getDecoratedStream().reduce(identity, accumulator, combiner); + return getOrCreate().reduce(identity, accumulator, combiner); } @Override public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { - return getDecoratedStream().collect(supplier, accumulator, combiner); + return getOrCreate().collect(supplier, accumulator, combiner); } @Override public <R, A> R collect(Collector<? super T, A, R> collector) { - return getDecoratedStream().collect(collector); + return getOrCreate().collect(collector); } @Override public Optional<T> min(Comparator<? super T> comparator) { - return getDecoratedStream().min(comparator); + return getOrCreate().min(comparator); } @Override public Optional<T> max(Comparator<? super T> comparator) { - return getDecoratedStream().max(comparator); + return getOrCreate().max(comparator); } @Override public long count() { - return getDecoratedStream().count(); + return getOrCreate().count(); } @Override public boolean anyMatch(Predicate<? super T> predicate) { - return getDecoratedStream().anyMatch(predicate); + return getOrCreate().anyMatch(predicate); } @Override public boolean allMatch(Predicate<? super T> predicate) { - return getDecoratedStream().allMatch(predicate); + return getOrCreate().allMatch(predicate); } @Override public boolean noneMatch(Predicate<? super T> predicate) { - return getDecoratedStream().noneMatch(predicate); + return getOrCreate().noneMatch(predicate); } @Override public Optional<T> findFirst() { - return getDecoratedStream().findFirst(); + return getOrCreate().findFirst(); } @Override public Optional<T> findAny() { - return getDecoratedStream().findAny(); + return getOrCreate().findAny(); } - - @Override - public Iterator<T> iterator() { - return getDecoratedStream().iterator(); - } - - @Override - public Spliterator<T> spliterator() { - return getDecoratedStream().spliterator(); - } - - @Override - public boolean isParallel() { - return getDecoratedStream().isParallel(); - } - - @Override - public Stream<T> sequential() { - return getDecoratedStream().sequential(); - } - - @Override - public Stream<T> parallel() { - return getDecoratedStream().parallel(); - } - - @Override - public Stream<T> unordered() { - return getDecoratedStream().unordered(); - } - - @Override - public Stream<T> onClose(Runnable closeHandler) { - return getDecoratedStream().onClose(closeHandler); - } - - @Override - public void close() { - getDecoratedStream().close(); - } - - protected abstract Stream<T> getDecoratedStream(); } diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java index 7e948a3..88c59dd 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java @@ -7,6 +7,8 @@ import java.sql.Statement; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.DoubleFunction; +import java.util.function.DoubleUnaryOperator; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.ToDoubleFunction; @@ -20,6 +22,7 @@ import java.util.stream.StreamSupport; import org.opengis.feature.Feature; +import org.apache.sis.internal.util.DoubleStreamDecoration; import org.apache.sis.internal.util.StreamDecoration; import org.apache.sis.storage.DataStoreException; import org.apache.sis.util.collection.BackingStoreException; @@ -39,7 +42,7 @@ class StreamSQL extends StreamDecoration<Feature> { @Override public <R> Stream<R> map(Function<? super Feature, ? extends R> mapper) { - return super.map(mapper); + return new MappedStream<>(mapper, this); } @Override @@ -114,7 +117,7 @@ class StreamSQL extends StreamDecoration<Feature> { } @Override - protected synchronized Stream<Feature> getDecoratedStream() { + protected synchronized Stream<Feature> createDecoratedStream() { final AtomicReference<Connection> connectionRef = new AtomicReference<>(); return Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection())) .map(Supplier::get) @@ -141,4 +144,146 @@ class StreamSQL extends StreamDecoration<Feature> { } }; } + + private static class MappedStream<I, O> extends StreamDecoration<O> { + private final Function<? super I, ? extends O> mapper; + Stream<I> source; + + private MappedStream(Function<? super I, ? extends O> mapper, Stream<I> source) { + this.mapper = mapper; + this.source = source; + } + + @Override + public Stream<O> distinct() { + source = source.distinct(); + return this; + } + + @Override + public Stream<O> limit(long maxSize) { + source = source.limit(maxSize); + return this; + } + + @Override + public Stream<O> skip(long n) { + source = source.skip(n); + return this; + } + + @Override + public long count() { + return source.count(); + } + + @Override + public boolean isParallel() { + return source.isParallel(); + } + + @Override + public Stream<O> sequential() { + source = source.sequential(); + return this; + } + + @Override + public Stream<O> parallel() { + source = source.parallel(); + return this; + } + + @Override + public <R> Stream<R> map(Function<? super O, ? extends R> mapper) { + return new MappedStream<>(this.mapper.andThen(mapper), source); + } + + @Override + public DoubleStream mapToDouble(ToDoubleFunction<? super O> mapper) { + return new ToDoubleStream<I>(source, i -> mapper.applyAsDouble(this.mapper.apply(i))); + } + + @Override + protected Stream<O> createDecoratedStream() { + // Break possible infinite loop by sinking source content through its spliterator (terminal op). + final Stream<I> sink = StreamSupport.stream(source.spliterator(), source.isParallel()); + sink.onClose(source::close); + return sink.map(mapper); + } + } + + private static class ToDoubleStream<T> extends DoubleStreamDecoration { + + Stream<T> source; + final ToDoubleFunction<T> toDouble; + + private ToDoubleStream(Stream<T> source, ToDoubleFunction<T> toDouble) { + this.source = source; + this.toDouble = toDouble; + } + + @Override + public DoubleStream map(DoubleUnaryOperator mapper) { + return new ToDoubleStream<T>(source, t -> mapper.applyAsDouble(toDouble.applyAsDouble(t))); + } + + @Override + public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { + return new MappedStream<T, U>(t -> mapper.apply(toDouble.applyAsDouble(t)), source); + } + + @Override + public DoubleStream distinct() { + source = source.distinct(); + return this; + } + + @Override + public DoubleStream limit(long maxSize) { + source = source.limit(maxSize); + return this; + } + + @Override + public DoubleStream skip(long n) { + source = source.skip(n); + return this; + } + + @Override + public long count() { + return source.count(); + } + + @Override + public Stream<Double> boxed() { + return new MappedStream<>(t -> Double.valueOf(toDouble.applyAsDouble(t)), source); + } + + @Override + public boolean isParallel() { + return source.isParallel(); + } + + @Override + public DoubleStream sequential() { + source = source.sequential(); + return this; + } + + @Override + public DoubleStream parallel() { + source = source.parallel(); + return this; + } + + @Override + protected DoubleStream createDecoratedStream() { + // Break possible cycle by sinking source content through its spliterator (terminal op). + final Stream<T> sink = StreamSupport.stream(source.spliterator(), source.isParallel()); + sink.onClose(source::close); + return sink.mapToDouble(toDouble); + } + } }
