This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/sql-store in repository https://gitbox.apache.org/repos/asf/sis.git
commit 926af1a24482b5255de165f0dd0b491ca7bbbdb9 Author: Alexis Manin <[email protected]> AuthorDate: Wed Aug 28 18:15:05 2019 +0200 fix(SQLStore): fix connection auto-commit management for feature streaming. --- .../apache/sis/internal/sql/feature/Features.java | 15 +-------------- .../apache/sis/internal/sql/feature/StreamSQL.java | 21 ++++++++++++++++++--- .../org/apache/sis/internal/sql/feature/Table.java | 3 ++- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java index 0934bf7..a596e7b 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java @@ -57,7 +57,7 @@ import static org.apache.sis.util.ArgumentChecks.ensureNonNull; * @since 1.0 * @module */ -final class Features implements Spliterator<Feature>, Runnable { +final class Features implements Spliterator<Feature> { /** * An empty array of iterators, used when there is no dependency. */ @@ -531,19 +531,6 @@ final class Features implements Spliterator<Feature>, Runnable { } /** - * Closes the (pooled) connection, including the statements of all dependencies. - * This is a handler to be invoked by {@link java.util.stream.Stream#close()}. - */ - @Override - public void run() { - try { - close(); - } catch (SQLException e) { - throw new BackingStoreException(e); - } - } - - /** * Useful to customiez value retrieval on result sets. Example: * {@code * SQLBiFunction<ResultSet, Integer, Integer> get = ResultSet::getInt; diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java index f6d8c3d..7a295b2 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java @@ -38,6 +38,8 @@ import java.util.stream.LongStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import javax.sql.DataSource; + import org.opengis.feature.Feature; import org.apache.sis.internal.util.DoubleStreamDecoration; @@ -163,23 +165,36 @@ class StreamSQL extends StreamDecoration<Feature> { @Override protected synchronized Stream<Feature> createDecoratedStream() { final AtomicReference<Connection> connectionRef = new AtomicReference<>(); - Stream<Feature> featureStream = Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection())) + Stream<Feature> featureStream = Stream.of(uncheck(this::connectNoAuto)) .map(Supplier::get) .peek(connectionRef::set) .flatMap(conn -> { try { final Features iter = queryBuilder.build(conn); - return StreamSupport.stream(iter, parallel).onClose(iter); + return StreamSupport.stream(iter, parallel); } catch (SQLException | DataStoreException e) { throw new BackingStoreException(e); } }) - .onClose(() -> queryBuilder.parent.closeRef(connectionRef)); + .onClose(() -> queryBuilder.parent.closeRef(connectionRef, true)); if (peekAction != null) featureStream = featureStream.peek(peekAction); return featureStream; } /** + * Acquire a connection over {@link Table parent table} database, forcing + * {@link Connection#setAutoCommit(boolean) auto-commit} to false. + * + * @return A new connection to {@link Table parent table} database, with deactivated auto-commit. + * @throws SQLException If we cannot create a new connection. See {@link DataSource#getConnection()} for details. + */ + private Connection connectNoAuto() throws SQLException { + final Connection conn = queryBuilder.parent.source.getConnection(); + conn.setAutoCommit(false); + return conn; + } + + /** * Transform a callable into supplier by catching any potential verified exception and rethrowing it as a {@link BackingStoreException}. * @param generator The callable to use in a non-verified error context. Must not be null. * @param <T> The return type of input callable. diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java index 321561c..5a9c555 100644 --- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java +++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java @@ -606,10 +606,11 @@ final class Table extends AbstractFeatureSet { return new StreamSQL(this); } - void closeRef(final AtomicReference<Connection> ref) { + void closeRef(final AtomicReference<Connection> ref, boolean forceRollback) { final Connection conn = ref.get(); if (conn != null) { try { + if (forceRollback) conn.rollback(); conn.close(); } catch (SQLException e) { warning(e);
