This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/sql-store in repository https://gitbox.apache.org/repos/asf/sis.git
commit 569f505ba48682022ad445a6be4194430c27a787 Author: Alexis Manin <[email protected]> AuthorDate: Wed Aug 14 16:30:31 2019 +0200 refactor(SQL-Store): try to improve count operation by overriding returned stream. --- .../apache/sis/internal/util/DecoratedStream.java | 243 +++++++++++++++++++++ .../org/apache/sis/internal/sql/feature/Table.java | 124 ++++++++--- 2 files changed, 333 insertions(+), 34 deletions(-) diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java new file mode 100644 index 0000000..eb398d9 --- /dev/null +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java @@ -0,0 +1,243 @@ +package org.apache.sis.internal.util; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Optional; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; +import java.util.stream.Collector; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +public class DecoratedStream<T> implements Stream<T> { + + final Stream<T> source; + + protected DecoratedStream(Stream<T> source) { + this.source = source; + } + + @Override + public Stream<T> filter(Predicate<? super T> predicate) { + return source.filter(predicate); + } + + @Override + public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { + return source.map(mapper); + } + + @Override + public IntStream mapToInt(ToIntFunction<? super T> mapper) { + return source.mapToInt(mapper); + } + + @Override + public LongStream mapToLong(ToLongFunction<? super T> mapper) { + return source.mapToLong(mapper); + } + + @Override + public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { + return source.mapToDouble(mapper); + } + + @Override + public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) { + return source.flatMap(mapper); + } + + @Override + public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) { + return source.flatMapToInt(mapper); + } + + @Override + public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) { + return source.flatMapToLong(mapper); + } + + @Override + public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) { + return source.flatMapToDouble(mapper); + } + + @Override + public Stream<T> distinct() { + return source.distinct(); + } + + @Override + public Stream<T> sorted() { + return source.sorted(); + } + + @Override + public Stream<T> sorted(Comparator<? super T> comparator) { + return source.sorted(comparator); + } + + @Override + public Stream<T> peek(Consumer<? super T> action) { + return source.peek(action); + } + + @Override + public Stream<T> limit(long maxSize) { + return source.limit(maxSize); + } + + @Override + public Stream<T> skip(long n) { + return source.skip(n); + } + +/* + @Override + public Stream<T> takeWhile(Predicate<? super T> predicate) { + return source.takeWhile(predicate); + } + + @Override + public Stream<T> dropWhile(Predicate<? super T> predicate) { + return source.dropWhile(predicate); + } +*/ + + @Override + public void forEach(Consumer<? super T> action) { + source.forEach(action); + } + + @Override + public void forEachOrdered(Consumer<? super T> action) { + source.forEachOrdered(action); + } + + @Override + public Object[] toArray() { + return source.toArray(); + } + + @Override + public <A> A[] toArray(IntFunction<A[]> generator) { + return source.toArray(generator); + } + + @Override + public T reduce(T identity, BinaryOperator<T> accumulator) { + return source.reduce(identity, accumulator); + } + + @Override + public Optional<T> reduce(BinaryOperator<T> accumulator) { + return source.reduce(accumulator); + } + + @Override + public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { + return source.reduce(identity, accumulator, combiner); + } + + @Override + public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { + return source.collect(supplier, accumulator, combiner); + } + + @Override + public <R, A> R collect(Collector<? super T, A, R> collector) { + return source.collect(collector); + } + + @Override + public Optional<T> min(Comparator<? super T> comparator) { + return source.min(comparator); + } + + @Override + public Optional<T> max(Comparator<? super T> comparator) { + return source.max(comparator); + } + + @Override + public long count() { + return source.count(); + } + + @Override + public boolean anyMatch(Predicate<? super T> predicate) { + return source.anyMatch(predicate); + } + + @Override + public boolean allMatch(Predicate<? super T> predicate) { + return source.allMatch(predicate); + } + + @Override + public boolean noneMatch(Predicate<? super T> predicate) { + return source.noneMatch(predicate); + } + + @Override + public Optional<T> findFirst() { + return source.findFirst(); + } + + @Override + public Optional<T> findAny() { + return source.findAny(); + } + + @Override + public Iterator<T> iterator() { + return source.iterator(); + } + + @Override + public Spliterator<T> spliterator() { + return source.spliterator(); + } + + @Override + public boolean isParallel() { + return source.isParallel(); + } + + @Override + public Stream<T> sequential() { + return source.sequential(); + } + + @Override + public Stream<T> parallel() { + return source.parallel(); + } + + @Override + public Stream<T> unordered() { + return source.unordered(); + } + + @Override + public Stream<T> onClose(Runnable closeHandler) { + return source.onClose(closeHandler); + } + + @Override + public void close() { + source.close(); + } +} diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java index 8c2f35a..380669b 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java @@ -16,47 +16,55 @@ */ package org.apache.sis.internal.sql.feature; -import java.util.Map; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; -import java.util.ArrayList; -import java.util.Collection; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import java.sql.DatabaseMetaData; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; import javax.sql.DataSource; -import org.opengis.util.GenericName; + +import org.opengis.feature.AttributeType; +import org.opengis.feature.Feature; +import org.opengis.feature.FeatureAssociationRole; +import org.opengis.feature.FeatureType; import org.opengis.referencing.crs.CoordinateReferenceSystem; +import org.opengis.util.GenericName; + +import org.apache.sis.feature.builder.AssociationRoleBuilder; import org.apache.sis.feature.builder.AttributeRole; import org.apache.sis.feature.builder.AttributeTypeBuilder; -import org.apache.sis.feature.builder.AssociationRoleBuilder; import org.apache.sis.feature.builder.FeatureTypeBuilder; import org.apache.sis.internal.feature.Geometries; -import org.apache.sis.storage.DataStoreException; -import org.apache.sis.storage.DataStoreContentException; -import org.apache.sis.storage.InternalDataStoreException; import org.apache.sis.internal.metadata.sql.Reflection; +import org.apache.sis.internal.metadata.sql.SQLBuilder; import org.apache.sis.internal.metadata.sql.SQLUtilities; import org.apache.sis.internal.storage.AbstractFeatureSet; import org.apache.sis.internal.util.CollectionsExt; -import org.apache.sis.util.collection.WeakValueHashMap; -import org.apache.sis.util.collection.TreeTable; +import org.apache.sis.internal.util.DecoratedStream; +import org.apache.sis.storage.DataStoreContentException; +import org.apache.sis.storage.DataStoreException; +import org.apache.sis.storage.InternalDataStoreException; import org.apache.sis.util.CharSequences; -import org.apache.sis.util.Exceptions; import org.apache.sis.util.Classes; -import org.apache.sis.util.Numbers; import org.apache.sis.util.Debug; +import org.apache.sis.util.Numbers; +import org.apache.sis.util.collection.BackingStoreException; +import org.apache.sis.util.collection.TreeTable; +import org.apache.sis.util.collection.WeakValueHashMap; // Branch-dependent imports -import org.opengis.feature.Feature; -import org.opengis.feature.FeatureType; -import org.opengis.feature.AttributeType; -import org.opengis.feature.FeatureAssociationRole; /** @@ -602,21 +610,30 @@ final class Table extends AbstractFeatureSet { */ @Override public Stream<Feature> features(final boolean parallel) throws DataStoreException { - DataStoreException ex; - Connection connection = null; - try { - connection = source.getConnection(); - final Features iter = features(connection, new ArrayList<>(), null); - return StreamSupport.stream(iter, parallel).onClose(iter); - } catch (SQLException cause) { - ex = new DataStoreException(Exceptions.unwrap(cause)); - } - if (connection != null) try { - connection.close(); - } catch (SQLException e) { - ex.addSuppressed(e); + final AtomicReference<Connection> connectionRef = new AtomicReference<>(); + final Stream<Feature> featureStream = Stream.generate(uncheck(() -> source.getConnection())) + .peek(connectionRef::set) + .flatMap(conn -> { + try { + final Features iter = features(conn, new ArrayList<>(), null); + return StreamSupport.stream(iter, parallel).onClose(iter); + } catch (SQLException | InternalDataStoreException e) { + throw new BackingStoreException(e); + } + }) + .onClose(() -> closeRef(connectionRef)); + return new CountOverload<>(featureStream); + } + + private void closeRef(final AtomicReference<Connection> ref) { + final Connection conn = ref.get(); + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + warning(e); + } } - throw ex; } /** @@ -631,4 +648,43 @@ final class Table extends AbstractFeatureSet { { return new Features(this, connection, attributeNames, attributeColumns, importedKeys, exportedKeys, following, noFollow); } + + private class CountOverload<T> extends DecoratedStream<T> { + + CountOverload(Stream<T> source) { + super(source); + } + + @Override + public long count() { + try (Connection conn = Table.this.source.getConnection()) { + final String query = new SQLBuilder(conn.getMetaData(), true) + .append("SELECT COUNT(") + .appendIdentifier(attributeColumns[0]) + .append(')') + .append(" FROM ") + .appendIdentifier(name.catalog, name.schema, name.table) + .toString(); + try (final Statement st = conn.createStatement(); final ResultSet rs = st.executeQuery(query)) { + if (rs.next()) { + return rs.getLong(1); + } else return 0; + } + } catch (SQLException e) { + throw new BackingStoreException("Cannot estimate feature set size using SQL COUNT query", e); + } + } + } + + private static <T> Supplier<T> uncheck(final Callable<T> generator) { + return () -> { + try { + return generator.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new BackingStoreException(e); + } + }; + } }
