This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch ignite-24053 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit c4a504bf2f9bd001a8cda2ec2e95412aecf2ecf9 Author: Pavel Pereslegin <[email protected]> AuthorDate: Fri Jan 17 15:06:44 2025 +0300 IGNITE-24053 Multiple connections support. --- .../jdbc/proto/event/JdbcBatchExecuteRequest.java | 2 +- .../proto/event/JdbcBatchPreparedStmntRequest.java | 2 +- .../jdbc/proto/event/JdbcFinishTxResult.java | 22 +++- ...hTxResult.java => JdbcObservableTimeAware.java} | 25 ++--- .../jdbc/proto/event/JdbcQueryExecuteRequest.java | 14 ++- .../ignite/internal/jdbc/proto/event/Response.java | 7 +- .../handler/ClientInboundMessageHandler.java | 4 - .../ignite/client/handler/JdbcHandlerBase.java | 5 - .../client/handler/JdbcQueryEventHandlerImpl.java | 121 ++++++++++++++------- .../jdbc/ClientJdbcExecuteBatchRequest.java | 4 +- .../requests/jdbc/ClientJdbcExecuteRequest.java | 5 +- .../requests/jdbc/ClientJdbcFinishTxRequest.java | 2 +- .../jdbc/ClientJdbcPreparedStmntBatchRequest.java | 4 +- .../sql/ClientSqlExecuteScriptRequest.java | 1 + .../handler/JdbcQueryEventHandlerImplTest.java | 2 +- .../ignite/internal/client/ReliableChannel.java | 6 +- .../ignite/internal/client/TcpIgniteClient.java | 30 ++++- .../ignite/jdbc/ItJdbcMultipleConnectionsTest.java | 97 +++++++++++++++++ .../ignite/internal/jdbc/JdbcConnection.java | 38 +++++-- .../apache/ignite/internal/jdbc/JdbcStatement.java | 3 +- .../org/apache/ignite/jdbc/IgniteJdbcDriver.java | 5 +- .../ignite/internal/tx/HybridTimestampTracker.java | 30 +++++ 22 files changed, 329 insertions(+), 100 deletions(-) diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchExecuteRequest.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchExecuteRequest.java index 0994126b83..d795458415 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchExecuteRequest.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchExecuteRequest.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.util.CollectionUtils; /** * JDBC batch execute request. */ -public class JdbcBatchExecuteRequest implements ClientMessage { +public class JdbcBatchExecuteRequest extends JdbcObservableTimeAware implements ClientMessage { /** Schema name. */ private String schemaName; diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java index 6391b29a17..87781e1b7e 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.util.CollectionUtils; /** * JDBC prepared statement query batch execute request. */ -public class JdbcBatchPreparedStmntRequest implements ClientMessage { +public class JdbcBatchPreparedStmntRequest extends JdbcObservableTimeAware implements ClientMessage { /** Schema name. */ private String schemaName; diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java index 5c6cbfa6f2..da32243e61 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java @@ -17,15 +17,29 @@ package org.apache.ignite.internal.jdbc.proto.event; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.jetbrains.annotations.Nullable; + /** * Result of commit/rollback command. */ public class JdbcFinishTxResult extends Response { + /** Observable timestamp used only on server side. */ + @SuppressWarnings("TransientFieldInNonSerializableClass") + private final transient @Nullable HybridTimestamp observableTime; + /** * Default constructor is used for deserialization. */ public JdbcFinishTxResult() { - // No-op. + this.observableTime = null; + } + + /** + * Constructor. + */ + public JdbcFinishTxResult(@Nullable HybridTimestamp observableTime) { + this.observableTime = observableTime; } /** @@ -36,5 +50,11 @@ public class JdbcFinishTxResult extends Response { */ public JdbcFinishTxResult(int status, String err) { super(status, err); + + this.observableTime = null; + } + + public @Nullable Object observableTime() { + return observableTime; } } diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcObservableTimeAware.java similarity index 64% copy from modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java copy to modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcObservableTimeAware.java index 5c6cbfa6f2..4d0ae5855e 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcObservableTimeAware.java @@ -17,24 +17,17 @@ package org.apache.ignite.internal.jdbc.proto.event; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.hlc.HybridTimestamp; + /** - * Result of commit/rollback command. + * TODO fix naming add description. */ -public class JdbcFinishTxResult extends Response { - /** - * Default constructor is used for deserialization. - */ - public JdbcFinishTxResult() { - // No-op. - } +abstract class JdbcObservableTimeAware { + @SuppressWarnings("TransientFieldInNonSerializableClass") + private final transient AtomicReference<HybridTimestamp> observableTimeUpdatesHolder = new AtomicReference<>(); - /** - * Constructor. - * - * @param status Status code. - * @param err Error message. - */ - public JdbcFinishTxResult(int status, String err) { - super(status, err); + public AtomicReference<HybridTimestamp> observableTimeUpdatesHolder() { + return observableTimeUpdatesHolder; } } diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java index cd48d51d1f..3658f4efa5 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.tostring.S; /** * JDBC query execute request. */ -public class JdbcQueryExecuteRequest implements ClientMessage { +public class JdbcQueryExecuteRequest extends JdbcObservableTimeAware implements ClientMessage { /** Expected statement type. */ private JdbcStatementType stmtType; @@ -61,6 +61,8 @@ public class JdbcQueryExecuteRequest implements ClientMessage { */ private long correlationToken; + private long observableTime; + /** * Default constructor. For deserialization purposes. */ @@ -91,7 +93,8 @@ public class JdbcQueryExecuteRequest implements ClientMessage { boolean autoCommit, boolean multiStatement, long queryTimeoutMillis, - long correlationToken + long correlationToken, + long observableTime ) { Objects.requireNonNull(stmtType); @@ -105,6 +108,7 @@ public class JdbcQueryExecuteRequest implements ClientMessage { this.multiStatement = multiStatement; this.queryTimeoutMillis = queryTimeoutMillis; this.correlationToken = correlationToken; + this.observableTime = observableTime; } /** @@ -190,6 +194,10 @@ public class JdbcQueryExecuteRequest implements ClientMessage { return correlationToken; } + public long observableTime() { + return observableTime; + } + /** {@inheritDoc} */ @Override public void writeBinary(ClientMessagePacker packer) { @@ -204,6 +212,7 @@ public class JdbcQueryExecuteRequest implements ClientMessage { packer.packObjectArrayAsBinaryTuple(args); packer.packLong(queryTimeoutMillis); packer.packLong(correlationToken); + packer.packLong(observableTime); } /** {@inheritDoc} */ @@ -220,6 +229,7 @@ public class JdbcQueryExecuteRequest implements ClientMessage { args = unpacker.unpackObjectArrayFromBinaryTuple(); queryTimeoutMillis = unpacker.unpackLong(); correlationToken = unpacker.unpackLong(); + observableTime = unpacker.unpackLong(); } /** {@inheritDoc} */ diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/Response.java b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/Response.java index 9e625e2827..618e50cb1c 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/Response.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/Response.java @@ -39,9 +39,6 @@ public abstract class Response implements ClientMessage { /** Error. */ private String err; - /** Has results. */ - protected boolean hasResults; - /** * Constructs successful response. */ @@ -121,9 +118,9 @@ public abstract class Response implements ClientMessage { } /** - * Gets hasResults flag. + * Gets success status. * - * @return Has results. + * @return {@code True} if command succeeded, {@code false} otherwise. */ public boolean success() { return status == STATUS_SUCCESS; diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java index 0744e8be94..ea8d53ff98 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java @@ -713,10 +713,6 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im return ClientTupleContainsAllKeysRequest.process(in, out, igniteTables, resources, txManager); case ClientOp.JDBC_CONNECT: - // TODO: IGNITE-24053 JDBC request ought to contain the client observation timestamp. - jdbcQueryEventHandler.getTimestampTracker().update(clockService.current()); - out.meta(jdbcQueryEventHandler.getTimestampTracker().get()); - return ClientJdbcConnectRequest.execute(in, out, jdbcQueryEventHandler); case ClientOp.JDBC_EXEC: diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java index a3a54a986d..f1abedb91b 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java @@ -19,7 +19,6 @@ package org.apache.ignite.client.handler; import static org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode.UNSUPPORTED_OPERATION; import static org.apache.ignite.internal.sql.engine.SqlQueryType.DDL; -import static org.apache.ignite.internal.sql.engine.SqlQueryType.DML; import static org.apache.ignite.internal.sql.engine.SqlQueryType.KILL; import static org.apache.ignite.internal.sql.engine.SqlQueryType.TX_CONTROL; @@ -49,16 +48,12 @@ import org.jetbrains.annotations.Nullable; * Contains common methods used to process jdbc requests. */ abstract class JdbcHandlerBase { - /** {@link SqlQueryType}s allowed in JDBC select statements. **/ public static final Set<SqlQueryType> SELECT_STATEMENT_QUERIES = Set.of( SqlQueryType.QUERY, SqlQueryType.EXPLAIN ); - /** {@link SqlQueryType}s allowed in JDBC update statements. **/ - public static final Set<SqlQueryType> UPDATE_STATEMENT_QUERIES = Set.of(DML, DDL, KILL); - /** {@link SqlQueryType}s types that return 0 in executeUpdate and execute / getUpdateCount. **/ public static final Set<SqlQueryType> ZERO_UPDATE_COUNT_QUERIES = Set.of(DDL, KILL, TX_CONTROL); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java index 3662ce2abc..0711a14427 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java @@ -34,9 +34,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.apache.ignite.client.handler.requests.jdbc.JdbcMetadataCatalog; import org.apache.ignite.client.handler.requests.jdbc.JdbcQueryCursor; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler; import org.apache.ignite.internal.jdbc.proto.JdbcStatementType; import org.apache.ignite.internal.jdbc.proto.event.JdbcBatchExecuteRequest; @@ -80,12 +82,6 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu /** {@link SqlQueryType}s allowed in JDBC update statements. **/ public static final Set<SqlQueryType> UPDATE_STATEMENT_QUERIES = Set.of(DML, SqlQueryType.DDL, SqlQueryType.KILL); - /** - * Observation timestamp tracker. - * TODO: IGNITE-24053 Remove this after the issue will be fixed. - * */ - private final HybridTimestampTracker timestampTracker = HybridTimestampTracker.atomicTracker(null); - /** Sql query processor. */ private final QueryProcessor processor; @@ -138,6 +134,28 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu } } + static class ObservableTimeContext { + private final @Nullable InternalTransaction transaction; + private final HybridTimestampTracker tracker; + + private ObservableTimeContext(HybridTimestampTracker tracker) { + this(tracker, null); + } + + private ObservableTimeContext(HybridTimestampTracker tracker, @Nullable InternalTransaction transaction) { + this.transaction = transaction; + this.tracker = tracker; + } + + @Nullable InternalTransaction transaction() { + return transaction; + } + + HybridTimestampTracker tracker() { + return tracker; + } + } + /** {@inheritDoc} */ @Override public CompletableFuture<? extends Response> queryAsync(long connectionId, JdbcQueryExecuteRequest req) { @@ -157,7 +175,11 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu long correlationToken = req.correlationToken(); CancellationToken token = connectionContext.registerExecution(correlationToken); - InternalTransaction tx = req.autoCommit() ? null : connectionContext.getOrStartTransaction(timestampTracker); + ObservableTimeContext timeContext = connectionContext.getOrCreateObservableTimeContext( + req.autoCommit(), + HybridTimestamp.nullableHybridTimestamp(req.observableTime()), + req.observableTimeUpdatesHolder() + ); JdbcStatementType reqStmtType = req.getStmtType(); boolean multiStatement = req.multiStatement(); @@ -168,8 +190,8 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu CompletableFuture<AsyncSqlCursor<InternalSqlRow>> result = processor.queryAsync( properties, - timestampTracker, - tx, + timeContext.tracker(), + timeContext.transaction(), token, req.sqlQuery(), req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments() @@ -181,10 +203,6 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu .exceptionally(t -> createErrorResult("Exception while executing query.", t, null)); } - public HybridTimestampTracker getTimestampTracker() { - return timestampTracker; - } - private static SqlProperties createProperties( JdbcStatementType stmtType, boolean multiStatement, @@ -224,7 +242,11 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu return CompletableFuture.completedFuture(new JdbcBatchExecuteResult(Response.STATUS_FAILED, "Connection is broken")); } - InternalTransaction tx = req.autoCommit() ? null : connectionContext.getOrStartTransaction(timestampTracker); + ObservableTimeContext timeContext = connectionContext.getOrCreateObservableTimeContext( + req.autoCommit(), + null, + req.observableTimeUpdatesHolder() + ); long correlationToken = req.correlationToken(); CancellationToken token = connectionContext.registerExecution(correlationToken); var queries = req.queries(); @@ -235,7 +257,7 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu for (String query : queries) { tail = tail.thenCompose(list -> executeAndCollectUpdateCount( connectionContext, - tx, + timeContext, token, query, OBJECT_EMPTY_ARRAY, @@ -265,7 +287,11 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu return CompletableFuture.completedFuture(new JdbcBatchExecuteResult(Response.STATUS_FAILED, "Connection is broken")); } - InternalTransaction tx = req.autoCommit() ? null : connectionContext.getOrStartTransaction(timestampTracker); + ObservableTimeContext timeContext = connectionContext.getOrCreateObservableTimeContext( + req.autoCommit(), + null, + req.observableTimeUpdatesHolder() + ); long correlationToken = req.correlationToken(); CancellationToken token = connectionContext.registerExecution(correlationToken); var argList = req.getArgs(); @@ -275,7 +301,7 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu for (Object[] args : argList) { tail = tail.thenCompose(list -> executeAndCollectUpdateCount( - connectionContext, tx, token, req.getQuery(), args, timeoutMillis, list + connectionContext, timeContext, token, req.getQuery(), args, timeoutMillis, list )); } @@ -292,7 +318,7 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu private CompletableFuture<IntArrayList> executeAndCollectUpdateCount( JdbcConnectionContext context, - @Nullable InternalTransaction tx, + ObservableTimeContext timeContext, CancellationToken token, String sql, Object[] arg, @@ -307,8 +333,8 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu CompletableFuture<AsyncSqlCursor<InternalSqlRow>> result = processor.queryAsync( properties, - timestampTracker, - tx, + timeContext.tracker(), + timeContext.transaction(), token, sql, arg == null ? OBJECT_EMPTY_ARRAY : arg @@ -389,12 +415,12 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu return CompletableFuture.completedFuture(new JdbcFinishTxResult(Response.STATUS_FAILED, "Connection is broken")); } - return connectionContext.finishTransactionAsync(commit).handle((ignored, t) -> { + return connectionContext.finishTransactionAsync(commit).handle((observableTime, t) -> { if (t != null) { return new JdbcFinishTxResult(Response.STATUS_FAILED, t.getMessage()); } - return new JdbcFinishTxResult(); + return new JdbcFinishTxResult(observableTime); }); } @@ -429,7 +455,7 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu private final ConcurrentMap<Long, CancelHandle> cancelHandles = new ConcurrentHashMap<>(); - private @Nullable InternalTransaction tx; + private @Nullable ObservableTimeContext observableTimeContext; JdbcConnectionContext( TxManager txManager, @@ -443,16 +469,30 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu return timeZoneId; } - /** - * Gets the transaction associated with the current connection, starts a new one if it doesn't already exist. - * - * <p>NOTE: this method is not thread-safe and should only be called by a single thread. - * - * @param timestampProvider Observation timestamp provider. - * @return Transaction associated with the current connection. - */ - InternalTransaction getOrStartTransaction(HybridTimestampTracker timestampProvider) { - return tx == null ? tx = txManager.begin(timestampProvider, false) : tx; + ObservableTimeContext getOrCreateObservableTimeContext( + boolean useImplicitTx, + @Nullable HybridTimestamp timeFromClient, + AtomicReference<HybridTimestamp> observableTimeHolder + ) { + if (observableTimeContext == null) { + if (useImplicitTx) { + return new ObservableTimeContext(HybridTimestampTracker.atomicClientTracker( + timeFromClient, + observableTimeHolder::set + )); + } else { + HybridTimestampTracker tracker = HybridTimestampTracker.atomicTracker(null); + + observableTimeContext = new ObservableTimeContext( + tracker, + txManager.begin(tracker, false) + ); + } + + return observableTimeContext; + } + + return observableTimeContext; } /** @@ -463,16 +503,23 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu * @param commit {@code True} to commit, {@code false} to rollback. * @return Future that represents the pending completion of the operation. */ - CompletableFuture<Void> finishTransactionAsync(boolean commit) { - InternalTransaction tx0 = tx; + CompletableFuture<@Nullable HybridTimestamp> finishTransactionAsync(boolean commit) { + if (observableTimeContext == null) { + return nullCompletedFuture(); + } + + InternalTransaction tx0 = observableTimeContext.transaction(); + HybridTimestampTracker tracker = observableTimeContext.tracker(); - tx = null; + observableTimeContext = null; if (tx0 == null) { return nullCompletedFuture(); } - return commit ? tx0.commitAsync() : tx0.rollbackAsync(); + return commit + ? tx0.commitAsync().thenApply(ignore -> tracker.get()) + : tx0.rollbackAsync().thenApply(ignore -> null); } boolean valid() { diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java index d21534938f..433a13f62b 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java @@ -47,9 +47,7 @@ public class ClientJdbcExecuteBatchRequest { req.readBinary(in); return handler.batchAsync(connectionId, req).thenAccept(res -> { - if (req.autoCommit()) { - out.meta(handler.getTimestampTracker().get()); - } + out.meta(req.observableTimeUpdatesHolder().get()); res.writeBinary(out); }); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteRequest.java index 8e527272df..6baaed2579 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteRequest.java @@ -41,15 +41,12 @@ public class ClientJdbcExecuteRequest { JdbcQueryEventHandlerImpl handler ) { var req = new JdbcQueryExecuteRequest(); - long connectionId = in.unpackLong(); req.readBinary(in); return handler.queryAsync(connectionId, req).thenAccept(res -> { - if (req.autoCommit()) { - out.meta(handler.getTimestampTracker().get()); - } + out.meta(req.observableTimeUpdatesHolder().get()); res.writeBinary(out); }); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFinishTxRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFinishTxRequest.java index b1982a26e6..7aeb903b3c 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFinishTxRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFinishTxRequest.java @@ -44,7 +44,7 @@ public class ClientJdbcFinishTxRequest { return handler.finishTxAsync(connectionId, commit).thenAccept(res -> { if (commit) { - out.meta(handler.getTimestampTracker().get()); + out.meta(res.observableTime()); } res.writeBinary(out); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java index 4bd46661a3..00ef7114c4 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java @@ -47,9 +47,7 @@ public class ClientJdbcPreparedStmntBatchRequest { req.readBinary(in); return handler.batchPrepStatementAsync(connectionId, req).thenAccept(res -> { - if (req.autoCommit()) { - out.meta(handler.getTimestampTracker().get()); - } + out.meta(req.observableTimeUpdatesHolder().get()); res.writeBinary(out); }); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java index 328d39b1a0..00a6712927 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java @@ -50,6 +50,7 @@ public class ClientSqlExecuteScriptRequest { } HybridTimestamp clientTs = HybridTimestamp.nullableHybridTimestamp(in.unpackLong()); + // TODO Thin client script execution is broken - should return observable time to client. var tsUpdater = HybridTimestampTracker.clientTracker(clientTs, ts -> {}); // TODO https://issues.apache.org/jira/browse/IGNITE-23646 Pass cancellation token to the query processor. diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java index 69f7e6df60..8e2c540c9d 100644 --- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java +++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java @@ -302,7 +302,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest { private static JdbcQueryExecuteRequest createExecuteRequest(String schema, String query, JdbcStatementType type) { //noinspection DataFlowIssue return new JdbcQueryExecuteRequest( - type, schema, 1024, 1, query, null, false, false, 0, System.currentTimeMillis() + type, schema, 1024, 1, query, null, false, false, 0, System.currentTimeMillis(), 0 ); } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java index 1dc83e40f0..75d4dedaea 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java @@ -115,7 +115,7 @@ public final class ReliableChannel implements AutoCloseable { private final AtomicLong partitionAssignmentTimestamp = new AtomicLong(); /** Observable timestamp, or causality token. Sent by the server with every response, and required by some requests. */ - private final AtomicLong observableTimestamp = new AtomicLong(); + private final AtomicLong observableTimestamp; /** Cluster id from the first handshake. */ private final AtomicReference<UUID> clusterId = new AtomicReference<>(); @@ -133,11 +133,13 @@ public final class ReliableChannel implements AutoCloseable { ReliableChannel( ClientChannelFactory chFactory, IgniteClientConfiguration clientCfg, - ClientMetricSource metrics) { + ClientMetricSource metrics, + @Nullable AtomicLong tracker) { this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg"); this.chFactory = Objects.requireNonNull(chFactory, "chFactory"); this.log = ClientUtils.logger(clientCfg, ReliableChannel.class); this.metrics = metrics; + this.observableTimestamp = tracker == null ? new AtomicLong() : tracker; connMgr = new NettyClientConnectionMultiplexer(metrics); connMgr.start(clientCfg); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java index 0de3774a83..814a7b26c2 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.catalog.IgniteCatalog; import org.apache.ignite.client.IgniteClient; import org.apache.ignite.client.IgniteClientConfiguration; @@ -95,7 +96,11 @@ public class TcpIgniteClient implements IgniteClient { * @param cfg Config. */ private TcpIgniteClient(IgniteClientConfiguration cfg) { - this(TcpClientChannel::createAsync, cfg); + this(TcpClientChannel::createAsync, cfg, null); + } + + private TcpIgniteClient(IgniteClientConfiguration cfg, @Nullable AtomicLong tracker) { + this(TcpClientChannel::createAsync, cfg, tracker); } /** @@ -104,14 +109,14 @@ public class TcpIgniteClient implements IgniteClient { * @param chFactory Channel factory. * @param cfg Config. */ - private TcpIgniteClient(ClientChannelFactory chFactory, IgniteClientConfiguration cfg) { + private TcpIgniteClient(ClientChannelFactory chFactory, IgniteClientConfiguration cfg, @Nullable AtomicLong tracker) { assert chFactory != null; assert cfg != null; this.cfg = cfg; metrics = new ClientMetricSource(); - ch = new ReliableChannel(chFactory, cfg, metrics); + ch = new ReliableChannel(chFactory, cfg, metrics, tracker); tables = new ClientTables(ch, marshallers); transactions = new ClientTransactions(ch); compute = new ClientCompute(ch, tables); @@ -166,6 +171,25 @@ public class TcpIgniteClient implements IgniteClient { } } + /** + * Initializes new instance of {@link IgniteClient} and establishes the connection. + * + * @param cfg Thin client configuration. + * @return Future representing pending completion of the operation. + */ + public static CompletableFuture<IgniteClient> startAsync(IgniteClientConfiguration cfg, AtomicLong tracker) { + ErrorGroups.initialize(); + + try { + //noinspection resource: returned from method + var client = new TcpIgniteClient(cfg, tracker); + + return client.initAsync().thenApply(x -> client); + } catch (IgniteException e) { + return failedFuture(e); + } + } + /** {@inheritDoc} */ @Override public IgniteTables tables() { diff --git a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultipleConnectionsTest.java b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultipleConnectionsTest.java new file mode 100644 index 0000000000..b094a5335b --- /dev/null +++ b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultipleConnectionsTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +/** + * Multiple JDBC connections test. + */ +public class ItJdbcMultipleConnectionsTest extends AbstractJdbcSelfTest { + + private static final String URL1 = "jdbc:ignite:thin://127.0.0.1:10800"; + private static final String URL2 = "jdbc:ignite:thin://127.0.0.1:10801"; + + @Override + protected int initialNodes() { + return 2; + } + + @AfterEach + public void dropTable() { + sql("DROP TABLE IF EXISTS T1"); + } + + @Test + public void testMultipleConnectionsSingleServer() throws SQLException { + Connection conn1 = DriverManager.getConnection(URL1); + Connection conn2 = DriverManager.getConnection(URL1); + + conn1.setAutoCommit(true); + conn2.setAutoCommit(true); + + try (Statement stmt2 = conn2.createStatement()) { + try (Statement stmt1 = conn1.createStatement()) { + checkVisibility(stmt1, stmt2); + } + } + } + + @Test + public void testMultipleConnectionsMultipleServer() throws SQLException { + try (Connection conn1 = DriverManager.getConnection(URL1)) { + try (Connection conn2 = DriverManager.getConnection(URL2)) { + + conn1.setAutoCommit(true); + conn2.setAutoCommit(true); + + try (Statement stmt2 = conn2.createStatement()) { + try (Statement stmt1 = conn1.createStatement()) { + checkVisibility(stmt1, stmt2); + } + } + } + } + } + + private static void checkVisibility(Statement stmt1, Statement stmt2) throws SQLException { + stmt1.executeUpdate("CREATE TABLE T1(id INT PRIMARY KEY)"); + + int rowsCount = 100; + int i = 0; + + do { + assertThat(stmt1.executeUpdate("INSERT INTO T1 VALUES (" + i + ")"), is(1)); + + try (ResultSet rs = stmt2.executeQuery("SELECT count(*) FROM T1 WHERE id >= 0")) { + assertTrue(rs.next()); + assertThat("i=" + i, rs.getLong(1), is(i + 1L)); + } + } while (i++ < rowsCount); + } +} diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java index 779942a718..5eb312d57d 100644 --- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java +++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java @@ -23,6 +23,7 @@ import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT; import static java.sql.ResultSet.TYPE_FORWARD_ONLY; import static org.apache.ignite.internal.jdbc.proto.SqlStateCode.CLIENT_CONNECTION_FAILED; import static org.apache.ignite.internal.jdbc.proto.SqlStateCode.CONNECTION_CLOSED; +import static org.apache.ignite.internal.util.ViewUtils.sync; import java.sql.Array; import java.sql.Blob; @@ -54,10 +55,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.client.BasicAuthenticator; -import org.apache.ignite.client.IgniteClient; import org.apache.ignite.client.IgniteClientAuthenticator; +import org.apache.ignite.client.IgniteClientConfiguration; import org.apache.ignite.client.SslConfiguration; import org.apache.ignite.internal.client.HostAndPort; +import org.apache.ignite.internal.client.IgniteClientConfigurationImpl; import org.apache.ignite.internal.client.TcpIgniteClient; import org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode; import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler; @@ -127,7 +129,7 @@ public class JdbcConnection implements Connection { * * @param props Connection properties. */ - public JdbcConnection(ConnectionProperties props) throws SQLException { + public JdbcConnection(ConnectionProperties props, AtomicLong tracker) throws SQLException { this.connProps = props; autoCommit = true; @@ -138,13 +140,7 @@ public class JdbcConnection implements Connection { qryTimeout = connProps.getQueryTimeout(); try { - client = ((TcpIgniteClient) IgniteClient.builder() - .addresses(addrs) - .connectTimeout(netTimeout) - .ssl(extractSslConfiguration(connProps)) - .authenticator(extractAuthenticationConfiguration(connProps)) - .build()); - + client = buildClient(addrs, tracker); } catch (Exception e) { throw new SQLException("Failed to connect to server", CLIENT_CONNECTION_FAILED, e); } @@ -174,6 +170,25 @@ public class JdbcConnection implements Connection { holdability = HOLD_CURSORS_OVER_COMMIT; } + private TcpIgniteClient buildClient(String[] addrs, AtomicLong observableTime) { + var cfg = new IgniteClientConfigurationImpl( + null, + addrs, + netTimeout, + IgniteClientConfigurationImpl.DFLT_BACKGROUND_RECONNECT_INTERVAL, + null, + IgniteClientConfigurationImpl.DFLT_HEARTBEAT_INTERVAL, + IgniteClientConfigurationImpl.DFLT_HEARTBEAT_TIMEOUT, + null, + null, + extractSslConfiguration(connProps), + false, + extractAuthenticationConfiguration(connProps), + IgniteClientConfiguration.DFLT_OPERATION_TIMEOUT); + + return (TcpIgniteClient) sync(TcpIgniteClient.startAsync(cfg, observableTime)); + } + /** * Constructor used for testing purposes. */ @@ -848,6 +863,11 @@ public class JdbcConnection implements Connection { return connectionId; } + /** Returns an observable timestamp. */ + long observableTimestamp() { + return client.channel().observableTimestamp(); + } + /** {@inheritDoc} */ @Override public <T> T unwrap(Class<T> iface) throws SQLException { diff --git a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java index 48270bc32c..e5b2507a58 100644 --- a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java +++ b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java @@ -143,9 +143,10 @@ public class JdbcStatement implements Statement { long correlationToken = nextToken(); JdbcQueryExecuteRequest req = new JdbcQueryExecuteRequest(stmtType, schema, pageSize, maxRows, sql, args, - conn.getAutoCommit(), multiStatement, queryTimeoutMillis, correlationToken); + conn.getAutoCommit(), multiStatement, queryTimeoutMillis, correlationToken, conn.observableTimestamp()); JdbcQueryExecuteResponse res; + try { res = (JdbcQueryExecuteResponse) conn.handler().queryAsync(conn.connectionId(), req).get(); } catch (InterruptedException e) { diff --git a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java index fd935978f3..c5242c3ad1 100644 --- a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java +++ b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java @@ -27,6 +27,7 @@ import java.sql.DriverPropertyInfo; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import org.apache.ignite.internal.client.proto.ProtocolVersion; import org.apache.ignite.internal.jdbc.ConnectionPropertiesImpl; @@ -161,6 +162,8 @@ public class IgniteJdbcDriver implements Driver { /** Minor version. */ private static final int MINOR_VER = ProtocolVersion.LATEST_VER.minor(); + private static final AtomicLong tracker = new AtomicLong(); + /** {@inheritDoc} */ @Override public Connection connect(String url, Properties props) throws SQLException { @@ -172,7 +175,7 @@ public class IgniteJdbcDriver implements Driver { connProps.init(url, props); - return new JdbcConnection(connProps); + return new JdbcConnection(connProps, tracker); } /** {@inheritDoc} */ diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java index ec994d78ab..df7a1f2ef3 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java @@ -96,6 +96,36 @@ public interface HybridTimestampTracker { }; } + /** + * Creates a client-managed HybridTimestampTracker based on a given initial timestamp and an update consumer. + * + * @param intTs The initial HybridTimestamp, or null if no initial timestamp is provided. + * @param updateTs A Consumer that accepts a HybridTimestamp for managing updates to the timestamp. + * @return A HybridTimestampTracker instance that uses the provided initial timestamp and update mechanism. + */ + static HybridTimestampTracker atomicClientTracker(@Nullable HybridTimestamp intTs, Consumer<HybridTimestamp> updateTs) { + return new HybridTimestampTracker() { + /** Timestamp. */ + private final AtomicLong timestamp = new AtomicLong(hybridTimestampToLong(intTs)); + + @Override + public @Nullable HybridTimestamp get() { + return nullableHybridTimestamp(timestamp.get()); + } + + @Override + public void update(@Nullable HybridTimestamp ts) { + long tsVal = hybridTimestampToLong(ts); + long updated = timestamp.updateAndGet(x -> Math.max(x, tsVal)); + + // If updated. + if (updated == tsVal) { + updateTs.accept(ts); + } + } + }; + } + /** * Get the observable timestamp. *
