This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch ignite-24053
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit c4a504bf2f9bd001a8cda2ec2e95412aecf2ecf9
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Fri Jan 17 15:06:44 2025 +0300

    IGNITE-24053 Multiple connections support.
---
 .../jdbc/proto/event/JdbcBatchExecuteRequest.java  |   2 +-
 .../proto/event/JdbcBatchPreparedStmntRequest.java |   2 +-
 .../jdbc/proto/event/JdbcFinishTxResult.java       |  22 +++-
 ...hTxResult.java => JdbcObservableTimeAware.java} |  25 ++---
 .../jdbc/proto/event/JdbcQueryExecuteRequest.java  |  14 ++-
 .../ignite/internal/jdbc/proto/event/Response.java |   7 +-
 .../handler/ClientInboundMessageHandler.java       |   4 -
 .../ignite/client/handler/JdbcHandlerBase.java     |   5 -
 .../client/handler/JdbcQueryEventHandlerImpl.java  | 121 ++++++++++++++-------
 .../jdbc/ClientJdbcExecuteBatchRequest.java        |   4 +-
 .../requests/jdbc/ClientJdbcExecuteRequest.java    |   5 +-
 .../requests/jdbc/ClientJdbcFinishTxRequest.java   |   2 +-
 .../jdbc/ClientJdbcPreparedStmntBatchRequest.java  |   4 +-
 .../sql/ClientSqlExecuteScriptRequest.java         |   1 +
 .../handler/JdbcQueryEventHandlerImplTest.java     |   2 +-
 .../ignite/internal/client/ReliableChannel.java    |   6 +-
 .../ignite/internal/client/TcpIgniteClient.java    |  30 ++++-
 .../ignite/jdbc/ItJdbcMultipleConnectionsTest.java |  97 +++++++++++++++++
 .../ignite/internal/jdbc/JdbcConnection.java       |  38 +++++--
 .../apache/ignite/internal/jdbc/JdbcStatement.java |   3 +-
 .../org/apache/ignite/jdbc/IgniteJdbcDriver.java   |   5 +-
 .../ignite/internal/tx/HybridTimestampTracker.java |  30 +++++
 22 files changed, 329 insertions(+), 100 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchExecuteRequest.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchExecuteRequest.java
index 0994126b83..d795458415 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchExecuteRequest.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchExecuteRequest.java
@@ -28,7 +28,7 @@ import org.apache.ignite.internal.util.CollectionUtils;
 /**
  * JDBC batch execute request.
  */
-public class JdbcBatchExecuteRequest implements ClientMessage {
+public class JdbcBatchExecuteRequest extends JdbcObservableTimeAware 
implements ClientMessage {
     /** Schema name. */
     private String schemaName;
 
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
index 6391b29a17..87781e1b7e 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.util.CollectionUtils;
 /**
  * JDBC prepared statement query batch execute request.
  */
-public class JdbcBatchPreparedStmntRequest implements ClientMessage {
+public class JdbcBatchPreparedStmntRequest extends JdbcObservableTimeAware 
implements ClientMessage {
     /** Schema name. */
     private String schemaName;
 
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java
index 5c6cbfa6f2..da32243e61 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java
@@ -17,15 +17,29 @@
 
 package org.apache.ignite.internal.jdbc.proto.event;
 
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Result of commit/rollback command.
  */
 public class JdbcFinishTxResult extends Response {
+    /** Observable timestamp used only on server side. */
+    @SuppressWarnings("TransientFieldInNonSerializableClass")
+    private final transient @Nullable HybridTimestamp observableTime;
+
     /**
      * Default constructor is used for deserialization.
      */
     public JdbcFinishTxResult() {
-        // No-op.
+        this.observableTime = null;
+    }
+
+    /**
+     * Constructor.
+     */
+    public JdbcFinishTxResult(@Nullable HybridTimestamp observableTime) {
+        this.observableTime = observableTime;
     }
 
     /**
@@ -36,5 +50,11 @@ public class JdbcFinishTxResult extends Response {
      */
     public JdbcFinishTxResult(int status, String err) {
         super(status, err);
+
+        this.observableTime = null;
+    }
+
+    public @Nullable Object observableTime() {
+        return observableTime;
     }
 }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcObservableTimeAware.java
similarity index 64%
copy from 
modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java
copy to 
modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcObservableTimeAware.java
index 5c6cbfa6f2..4d0ae5855e 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcFinishTxResult.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcObservableTimeAware.java
@@ -17,24 +17,17 @@
 
 package org.apache.ignite.internal.jdbc.proto.event;
 
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
 /**
- * Result of commit/rollback command.
+ * TODO fix naming add description.
  */
-public class JdbcFinishTxResult extends Response {
-    /**
-     * Default constructor is used for deserialization.
-     */
-    public JdbcFinishTxResult() {
-        // No-op.
-    }
+abstract class JdbcObservableTimeAware {
+    @SuppressWarnings("TransientFieldInNonSerializableClass")
+    private final transient AtomicReference<HybridTimestamp> 
observableTimeUpdatesHolder = new AtomicReference<>();
 
-    /**
-     * Constructor.
-     *
-     * @param status Status code.
-     * @param err    Error message.
-     */
-    public JdbcFinishTxResult(int status, String err) {
-        super(status, err);
+    public AtomicReference<HybridTimestamp> observableTimeUpdatesHolder() {
+        return observableTimeUpdatesHolder;
     }
 }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
index cd48d51d1f..3658f4efa5 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.tostring.S;
 /**
  * JDBC query execute request.
  */
-public class JdbcQueryExecuteRequest implements ClientMessage {
+public class JdbcQueryExecuteRequest extends JdbcObservableTimeAware 
implements ClientMessage {
     /** Expected statement type. */
     private JdbcStatementType stmtType;
 
@@ -61,6 +61,8 @@ public class JdbcQueryExecuteRequest implements ClientMessage 
{
      */
     private long correlationToken;
 
+    private long observableTime;
+
     /**
      * Default constructor. For deserialization purposes.
      */
@@ -91,7 +93,8 @@ public class JdbcQueryExecuteRequest implements ClientMessage 
{
             boolean autoCommit,
             boolean multiStatement,
             long queryTimeoutMillis,
-            long correlationToken
+            long correlationToken,
+            long observableTime
     ) {
         Objects.requireNonNull(stmtType);
 
@@ -105,6 +108,7 @@ public class JdbcQueryExecuteRequest implements 
ClientMessage {
         this.multiStatement = multiStatement;
         this.queryTimeoutMillis = queryTimeoutMillis;
         this.correlationToken = correlationToken;
+        this.observableTime = observableTime;
     }
 
     /**
@@ -190,6 +194,10 @@ public class JdbcQueryExecuteRequest implements 
ClientMessage {
         return correlationToken;
     }
 
+    public long observableTime() {
+        return observableTime;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void writeBinary(ClientMessagePacker packer) {
@@ -204,6 +212,7 @@ public class JdbcQueryExecuteRequest implements 
ClientMessage {
         packer.packObjectArrayAsBinaryTuple(args);
         packer.packLong(queryTimeoutMillis);
         packer.packLong(correlationToken);
+        packer.packLong(observableTime);
     }
 
     /** {@inheritDoc} */
@@ -220,6 +229,7 @@ public class JdbcQueryExecuteRequest implements 
ClientMessage {
         args = unpacker.unpackObjectArrayFromBinaryTuple();
         queryTimeoutMillis = unpacker.unpackLong();
         correlationToken = unpacker.unpackLong();
+        observableTime = unpacker.unpackLong();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/Response.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/Response.java
index 9e625e2827..618e50cb1c 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/Response.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/Response.java
@@ -39,9 +39,6 @@ public abstract class Response implements ClientMessage {
     /** Error. */
     private String err;
 
-    /** Has results. */
-    protected boolean hasResults;
-
     /**
      * Constructs successful response.
      */
@@ -121,9 +118,9 @@ public abstract class Response implements ClientMessage {
     }
 
     /**
-     * Gets hasResults flag.
+     * Gets success status.
      *
-     * @return Has results.
+     * @return {@code True} if command succeeded, {@code false} otherwise.
      */
     public boolean success() {
         return status == STATUS_SUCCESS;
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 0744e8be94..ea8d53ff98 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -713,10 +713,6 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
                 return ClientTupleContainsAllKeysRequest.process(in, out, 
igniteTables, resources, txManager);
 
             case ClientOp.JDBC_CONNECT:
-                // TODO: IGNITE-24053 JDBC request ought to contain the client 
observation timestamp.
-                
jdbcQueryEventHandler.getTimestampTracker().update(clockService.current());
-                out.meta(jdbcQueryEventHandler.getTimestampTracker().get());
-
                 return ClientJdbcConnectRequest.execute(in, out, 
jdbcQueryEventHandler);
 
             case ClientOp.JDBC_EXEC:
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
index a3a54a986d..f1abedb91b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
@@ -19,7 +19,6 @@ package org.apache.ignite.client.handler;
 
 import static 
org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode.UNSUPPORTED_OPERATION;
 import static org.apache.ignite.internal.sql.engine.SqlQueryType.DDL;
-import static org.apache.ignite.internal.sql.engine.SqlQueryType.DML;
 import static org.apache.ignite.internal.sql.engine.SqlQueryType.KILL;
 import static org.apache.ignite.internal.sql.engine.SqlQueryType.TX_CONTROL;
 
@@ -49,16 +48,12 @@ import org.jetbrains.annotations.Nullable;
  * Contains common methods used to process jdbc requests.
  */
 abstract class JdbcHandlerBase {
-
     /** {@link SqlQueryType}s allowed in JDBC select statements. **/
     public static final Set<SqlQueryType> SELECT_STATEMENT_QUERIES = Set.of(
             SqlQueryType.QUERY,
             SqlQueryType.EXPLAIN
     );
 
-    /** {@link SqlQueryType}s allowed in JDBC update statements. **/
-    public static final Set<SqlQueryType> UPDATE_STATEMENT_QUERIES = 
Set.of(DML, DDL, KILL);
-
     /** {@link SqlQueryType}s types that return 0 in executeUpdate and execute 
/ getUpdateCount. **/
     public static final Set<SqlQueryType> ZERO_UPDATE_COUNT_QUERIES = 
Set.of(DDL, KILL, TX_CONTROL);
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index 3662ce2abc..0711a14427 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -34,9 +34,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.ignite.client.handler.requests.jdbc.JdbcMetadataCatalog;
 import org.apache.ignite.client.handler.requests.jdbc.JdbcQueryCursor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler;
 import org.apache.ignite.internal.jdbc.proto.JdbcStatementType;
 import org.apache.ignite.internal.jdbc.proto.event.JdbcBatchExecuteRequest;
@@ -80,12 +82,6 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
     /** {@link SqlQueryType}s allowed in JDBC update statements. **/
     public static final Set<SqlQueryType> UPDATE_STATEMENT_QUERIES = 
Set.of(DML, SqlQueryType.DDL, SqlQueryType.KILL);
 
-    /**
-     * Observation timestamp tracker.
-     * TODO: IGNITE-24053 Remove this after the issue will be fixed.
-     * */
-    private final HybridTimestampTracker timestampTracker = 
HybridTimestampTracker.atomicTracker(null);
-
     /** Sql query processor. */
     private final QueryProcessor processor;
 
@@ -138,6 +134,28 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
         }
     }
 
+    static class ObservableTimeContext {
+        private final @Nullable InternalTransaction transaction;
+        private final HybridTimestampTracker tracker;
+
+        private ObservableTimeContext(HybridTimestampTracker tracker) {
+            this(tracker, null);
+        }
+
+        private ObservableTimeContext(HybridTimestampTracker tracker, 
@Nullable InternalTransaction transaction) {
+            this.transaction = transaction;
+            this.tracker = tracker;
+        }
+
+        @Nullable InternalTransaction transaction() {
+            return transaction;
+        }
+
+        HybridTimestampTracker tracker() {
+            return tracker;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<? extends Response> queryAsync(long connectionId, 
JdbcQueryExecuteRequest req) {
@@ -157,7 +175,11 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
         long correlationToken = req.correlationToken();
         CancellationToken token = 
connectionContext.registerExecution(correlationToken);
 
-        InternalTransaction tx = req.autoCommit() ? null : 
connectionContext.getOrStartTransaction(timestampTracker);
+        ObservableTimeContext timeContext = 
connectionContext.getOrCreateObservableTimeContext(
+                req.autoCommit(),
+                HybridTimestamp.nullableHybridTimestamp(req.observableTime()),
+                req.observableTimeUpdatesHolder()
+        );
 
         JdbcStatementType reqStmtType = req.getStmtType();
         boolean multiStatement = req.multiStatement();
@@ -168,8 +190,8 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
 
         CompletableFuture<AsyncSqlCursor<InternalSqlRow>> result = 
processor.queryAsync(
                 properties,
-                timestampTracker,
-                tx,
+                timeContext.tracker(),
+                timeContext.transaction(),
                 token,
                 req.sqlQuery(),
                 req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments()
@@ -181,10 +203,6 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
                 .exceptionally(t -> createErrorResult("Exception while 
executing query.", t, null));
     }
 
-    public HybridTimestampTracker getTimestampTracker() {
-        return timestampTracker;
-    }
-
     private static SqlProperties createProperties(
             JdbcStatementType stmtType, 
             boolean multiStatement, 
@@ -224,7 +242,11 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
             return CompletableFuture.completedFuture(new 
JdbcBatchExecuteResult(Response.STATUS_FAILED, "Connection is broken"));
         }
 
-        InternalTransaction tx = req.autoCommit() ? null : 
connectionContext.getOrStartTransaction(timestampTracker);
+        ObservableTimeContext timeContext = 
connectionContext.getOrCreateObservableTimeContext(
+                req.autoCommit(),
+                null,
+                req.observableTimeUpdatesHolder()
+        );
         long correlationToken = req.correlationToken();
         CancellationToken token = 
connectionContext.registerExecution(correlationToken);
         var queries = req.queries();
@@ -235,7 +257,7 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
         for (String query : queries) {
             tail = tail.thenCompose(list -> executeAndCollectUpdateCount(
                     connectionContext,
-                    tx,
+                    timeContext,
                     token,
                     query,
                     OBJECT_EMPTY_ARRAY,
@@ -265,7 +287,11 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
             return CompletableFuture.completedFuture(new 
JdbcBatchExecuteResult(Response.STATUS_FAILED, "Connection is broken"));
         }
 
-        InternalTransaction tx = req.autoCommit() ? null : 
connectionContext.getOrStartTransaction(timestampTracker);
+        ObservableTimeContext timeContext = 
connectionContext.getOrCreateObservableTimeContext(
+                req.autoCommit(),
+                null,
+                req.observableTimeUpdatesHolder()
+        );
         long correlationToken = req.correlationToken();
         CancellationToken token = 
connectionContext.registerExecution(correlationToken);
         var argList = req.getArgs();
@@ -275,7 +301,7 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
 
         for (Object[] args : argList) {
             tail = tail.thenCompose(list -> executeAndCollectUpdateCount(
-                    connectionContext, tx, token, req.getQuery(), args, 
timeoutMillis, list
+                    connectionContext, timeContext, token, req.getQuery(), 
args, timeoutMillis, list
             ));
         }
 
@@ -292,7 +318,7 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
 
     private CompletableFuture<IntArrayList> executeAndCollectUpdateCount(
             JdbcConnectionContext context,
-            @Nullable InternalTransaction tx,
+            ObservableTimeContext timeContext,
             CancellationToken token,
             String sql,
             Object[] arg,
@@ -307,8 +333,8 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
 
         CompletableFuture<AsyncSqlCursor<InternalSqlRow>> result = 
processor.queryAsync(
                 properties,
-                timestampTracker,
-                tx,
+                timeContext.tracker(),
+                timeContext.transaction(),
                 token,
                 sql,
                 arg == null ? OBJECT_EMPTY_ARRAY : arg
@@ -389,12 +415,12 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
             return CompletableFuture.completedFuture(new 
JdbcFinishTxResult(Response.STATUS_FAILED, "Connection is broken"));
         }
 
-        return 
connectionContext.finishTransactionAsync(commit).handle((ignored, t) -> {
+        return 
connectionContext.finishTransactionAsync(commit).handle((observableTime, t) -> {
             if (t != null) {
                 return new JdbcFinishTxResult(Response.STATUS_FAILED, 
t.getMessage());
             }
 
-            return new JdbcFinishTxResult();
+            return new JdbcFinishTxResult(observableTime);
         });
     }
 
@@ -429,7 +455,7 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
 
         private final ConcurrentMap<Long, CancelHandle> cancelHandles = new 
ConcurrentHashMap<>();
 
-        private @Nullable InternalTransaction tx;
+        private @Nullable ObservableTimeContext observableTimeContext;
 
         JdbcConnectionContext(
                 TxManager txManager,
@@ -443,16 +469,30 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
             return timeZoneId;
         }
 
-        /**
-         * Gets the transaction associated with the current connection, starts 
a new one if it doesn't already exist.
-         *
-         * <p>NOTE: this method is not thread-safe and should only be called 
by a single thread.
-         *
-         * @param timestampProvider Observation timestamp provider.
-         * @return Transaction associated with the current connection.
-         */
-        InternalTransaction getOrStartTransaction(HybridTimestampTracker 
timestampProvider) {
-            return tx == null ? tx = txManager.begin(timestampProvider, false) 
: tx;
+        ObservableTimeContext getOrCreateObservableTimeContext(
+                boolean useImplicitTx,
+                @Nullable HybridTimestamp timeFromClient,
+                AtomicReference<HybridTimestamp> observableTimeHolder
+        ) {
+            if (observableTimeContext == null) {
+                if (useImplicitTx) {
+                    return new 
ObservableTimeContext(HybridTimestampTracker.atomicClientTracker(
+                            timeFromClient,
+                            observableTimeHolder::set
+                    ));
+                } else {
+                    HybridTimestampTracker tracker = 
HybridTimestampTracker.atomicTracker(null);
+
+                    observableTimeContext = new ObservableTimeContext(
+                            tracker,
+                            txManager.begin(tracker, false)
+                    );
+                }
+
+                return observableTimeContext;
+            }
+
+            return observableTimeContext;
         }
 
         /**
@@ -463,16 +503,23 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
          * @param commit {@code True} to commit, {@code false} to rollback.
          * @return Future that represents the pending completion of the 
operation.
          */
-        CompletableFuture<Void> finishTransactionAsync(boolean commit) {
-            InternalTransaction tx0 = tx;
+        CompletableFuture<@Nullable HybridTimestamp> 
finishTransactionAsync(boolean commit) {
+            if (observableTimeContext == null) {
+                return nullCompletedFuture();
+            }
+
+            InternalTransaction tx0 = observableTimeContext.transaction();
+            HybridTimestampTracker tracker = observableTimeContext.tracker();
 
-            tx = null;
+            observableTimeContext = null;
 
             if (tx0 == null) {
                 return nullCompletedFuture();
             }
 
-            return commit ? tx0.commitAsync() : tx0.rollbackAsync();
+            return commit
+                    ? tx0.commitAsync().thenApply(ignore -> tracker.get())
+                    : tx0.rollbackAsync().thenApply(ignore -> null);
         }
 
         boolean valid() {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
index d21534938f..433a13f62b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
@@ -47,9 +47,7 @@ public class ClientJdbcExecuteBatchRequest {
         req.readBinary(in);
 
         return handler.batchAsync(connectionId, req).thenAccept(res -> {
-            if (req.autoCommit()) {
-                out.meta(handler.getTimestampTracker().get());
-            }
+            out.meta(req.observableTimeUpdatesHolder().get());
 
             res.writeBinary(out);
         });
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteRequest.java
index 8e527272df..6baaed2579 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteRequest.java
@@ -41,15 +41,12 @@ public class ClientJdbcExecuteRequest {
             JdbcQueryEventHandlerImpl handler
     ) {
         var req = new JdbcQueryExecuteRequest();
-
         long connectionId = in.unpackLong();
 
         req.readBinary(in);
 
         return handler.queryAsync(connectionId, req).thenAccept(res -> {
-            if (req.autoCommit()) {
-                out.meta(handler.getTimestampTracker().get());
-            }
+            out.meta(req.observableTimeUpdatesHolder().get());
 
             res.writeBinary(out);
         });
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFinishTxRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFinishTxRequest.java
index b1982a26e6..7aeb903b3c 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFinishTxRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcFinishTxRequest.java
@@ -44,7 +44,7 @@ public class ClientJdbcFinishTxRequest {
 
         return handler.finishTxAsync(connectionId, commit).thenAccept(res -> {
             if (commit) {
-                out.meta(handler.getTimestampTracker().get());
+                out.meta(res.observableTime());
             }
 
             res.writeBinary(out);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
index 4bd46661a3..00ef7114c4 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
@@ -47,9 +47,7 @@ public class ClientJdbcPreparedStmntBatchRequest {
         req.readBinary(in);
 
         return handler.batchPrepStatementAsync(connectionId, 
req).thenAccept(res -> {
-            if (req.autoCommit()) {
-                out.meta(handler.getTimestampTracker().get());
-            }
+            out.meta(req.observableTimeUpdatesHolder().get());
 
             res.writeBinary(out);
         });
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
index 328d39b1a0..00a6712927 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
@@ -50,6 +50,7 @@ public class ClientSqlExecuteScriptRequest {
         }
 
         HybridTimestamp clientTs = 
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
+        // TODO Thin client script execution is broken - should return 
observable time to client.
         var tsUpdater = HybridTimestampTracker.clientTracker(clientTs, ts -> 
{});
 
         // TODO https://issues.apache.org/jira/browse/IGNITE-23646 Pass 
cancellation token to the query processor.
diff --git 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
index 69f7e6df60..8e2c540c9d 100644
--- 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
+++ 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
@@ -302,7 +302,7 @@ class JdbcQueryEventHandlerImplTest extends 
BaseIgniteAbstractTest {
     private static JdbcQueryExecuteRequest createExecuteRequest(String schema, 
String query, JdbcStatementType type) {
         //noinspection DataFlowIssue
         return new JdbcQueryExecuteRequest(
-                type, schema, 1024, 1, query, null, false, false, 0, 
System.currentTimeMillis()
+                type, schema, 1024, 1, query, null, false, false, 0, 
System.currentTimeMillis(), 0
         );
     }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 1dc83e40f0..75d4dedaea 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -115,7 +115,7 @@ public final class ReliableChannel implements AutoCloseable 
{
     private final AtomicLong partitionAssignmentTimestamp = new AtomicLong();
 
     /** Observable timestamp, or causality token. Sent by the server with 
every response, and required by some requests. */
-    private final AtomicLong observableTimestamp = new AtomicLong();
+    private final AtomicLong observableTimestamp;
 
     /** Cluster id from the first handshake. */
     private final AtomicReference<UUID> clusterId = new AtomicReference<>();
@@ -133,11 +133,13 @@ public final class ReliableChannel implements 
AutoCloseable {
     ReliableChannel(
             ClientChannelFactory chFactory,
             IgniteClientConfiguration clientCfg,
-            ClientMetricSource metrics) {
+            ClientMetricSource metrics,
+            @Nullable AtomicLong tracker) {
         this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg");
         this.chFactory = Objects.requireNonNull(chFactory, "chFactory");
         this.log = ClientUtils.logger(clientCfg, ReliableChannel.class);
         this.metrics = metrics;
+        this.observableTimestamp = tracker == null ? new AtomicLong() : 
tracker;
 
         connMgr = new NettyClientConnectionMultiplexer(metrics);
         connMgr.start(clientCfg);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 0de3774a83..814a7b26c2 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.catalog.IgniteCatalog;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.client.IgniteClientConfiguration;
@@ -95,7 +96,11 @@ public class TcpIgniteClient implements IgniteClient {
      * @param cfg Config.
      */
     private TcpIgniteClient(IgniteClientConfiguration cfg) {
-        this(TcpClientChannel::createAsync, cfg);
+        this(TcpClientChannel::createAsync, cfg, null);
+    }
+
+    private TcpIgniteClient(IgniteClientConfiguration cfg, @Nullable 
AtomicLong tracker) {
+        this(TcpClientChannel::createAsync, cfg, tracker);
     }
 
     /**
@@ -104,14 +109,14 @@ public class TcpIgniteClient implements IgniteClient {
      * @param chFactory Channel factory.
      * @param cfg Config.
      */
-    private TcpIgniteClient(ClientChannelFactory chFactory, 
IgniteClientConfiguration cfg) {
+    private TcpIgniteClient(ClientChannelFactory chFactory, 
IgniteClientConfiguration cfg, @Nullable AtomicLong tracker) {
         assert chFactory != null;
         assert cfg != null;
 
         this.cfg = cfg;
 
         metrics = new ClientMetricSource();
-        ch = new ReliableChannel(chFactory, cfg, metrics);
+        ch = new ReliableChannel(chFactory, cfg, metrics, tracker);
         tables = new ClientTables(ch, marshallers);
         transactions = new ClientTransactions(ch);
         compute = new ClientCompute(ch, tables);
@@ -166,6 +171,25 @@ public class TcpIgniteClient implements IgniteClient {
         }
     }
 
+    /**
+     * Initializes new instance of {@link IgniteClient} and establishes the 
connection.
+     *
+     * @param cfg Thin client configuration.
+     * @return Future representing pending completion of the operation.
+     */
+    public static CompletableFuture<IgniteClient> 
startAsync(IgniteClientConfiguration cfg, AtomicLong tracker) {
+        ErrorGroups.initialize();
+
+        try {
+            //noinspection resource: returned from method
+            var client = new TcpIgniteClient(cfg, tracker);
+
+            return client.initAsync().thenApply(x -> client);
+        } catch (IgniteException e) {
+            return failedFuture(e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override
     public IgniteTables tables() {
diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultipleConnectionsTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultipleConnectionsTest.java
new file mode 100644
index 0000000000..b094a5335b
--- /dev/null
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultipleConnectionsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Multiple JDBC connections test.
+ */
+public class ItJdbcMultipleConnectionsTest extends AbstractJdbcSelfTest {
+
+    private static final String URL1 = "jdbc:ignite:thin://127.0.0.1:10800";
+    private static final String URL2 = "jdbc:ignite:thin://127.0.0.1:10801";
+
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @AfterEach
+    public void dropTable() {
+        sql("DROP TABLE IF EXISTS T1");
+    }
+
+    @Test
+    public void testMultipleConnectionsSingleServer() throws SQLException {
+        Connection conn1 = DriverManager.getConnection(URL1);
+        Connection conn2 = DriverManager.getConnection(URL1);
+
+        conn1.setAutoCommit(true);
+        conn2.setAutoCommit(true);
+
+        try (Statement stmt2 = conn2.createStatement()) {
+            try (Statement stmt1 = conn1.createStatement()) {
+                checkVisibility(stmt1, stmt2);
+            }
+        }
+    }
+
+    @Test
+    public void testMultipleConnectionsMultipleServer() throws SQLException {
+        try (Connection conn1 = DriverManager.getConnection(URL1)) {
+            try (Connection conn2 = DriverManager.getConnection(URL2)) {
+
+                conn1.setAutoCommit(true);
+                conn2.setAutoCommit(true);
+
+                try (Statement stmt2 = conn2.createStatement()) {
+                    try (Statement stmt1 = conn1.createStatement()) {
+                        checkVisibility(stmt1, stmt2);
+                    }
+                }
+            }
+        }
+    }
+
+    private static void checkVisibility(Statement stmt1, Statement stmt2) 
throws SQLException {
+        stmt1.executeUpdate("CREATE TABLE T1(id INT PRIMARY KEY)");
+
+        int rowsCount = 100;
+        int i = 0;
+
+        do {
+            assertThat(stmt1.executeUpdate("INSERT INTO T1 VALUES (" + i + 
")"), is(1));
+
+            try (ResultSet rs = stmt2.executeQuery("SELECT count(*) FROM T1 
WHERE id >= 0")) {
+                assertTrue(rs.next());
+                assertThat("i=" + i, rs.getLong(1), is(i + 1L));
+            }
+        } while (i++ < rowsCount);
+    }
+}
diff --git 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
index 779942a718..5eb312d57d 100644
--- 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
+++ 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
@@ -23,6 +23,7 @@ import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
 import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
 import static 
org.apache.ignite.internal.jdbc.proto.SqlStateCode.CLIENT_CONNECTION_FAILED;
 import static 
org.apache.ignite.internal.jdbc.proto.SqlStateCode.CONNECTION_CLOSED;
+import static org.apache.ignite.internal.util.ViewUtils.sync;
 
 import java.sql.Array;
 import java.sql.Blob;
@@ -54,10 +55,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.client.BasicAuthenticator;
-import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.client.IgniteClientAuthenticator;
+import org.apache.ignite.client.IgniteClientConfiguration;
 import org.apache.ignite.client.SslConfiguration;
 import org.apache.ignite.internal.client.HostAndPort;
+import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
 import org.apache.ignite.internal.client.TcpIgniteClient;
 import org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode;
 import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler;
@@ -127,7 +129,7 @@ public class JdbcConnection implements Connection {
      *
      * @param props Connection properties.
      */
-    public JdbcConnection(ConnectionProperties props) throws SQLException {
+    public JdbcConnection(ConnectionProperties props, AtomicLong tracker) 
throws SQLException {
         this.connProps = props;
         autoCommit = true;
 
@@ -138,13 +140,7 @@ public class JdbcConnection implements Connection {
         qryTimeout = connProps.getQueryTimeout();
 
         try {
-            client = ((TcpIgniteClient) IgniteClient.builder()
-                    .addresses(addrs)
-                    .connectTimeout(netTimeout)
-                    .ssl(extractSslConfiguration(connProps))
-                    
.authenticator(extractAuthenticationConfiguration(connProps))
-                    .build());
-
+            client = buildClient(addrs, tracker);
         } catch (Exception e) {
             throw new SQLException("Failed to connect to server", 
CLIENT_CONNECTION_FAILED, e);
         }
@@ -174,6 +170,25 @@ public class JdbcConnection implements Connection {
         holdability = HOLD_CURSORS_OVER_COMMIT;
     }
 
+    private TcpIgniteClient buildClient(String[] addrs, AtomicLong 
observableTime) {
+        var cfg = new IgniteClientConfigurationImpl(
+                null,
+                addrs,
+                netTimeout,
+                
IgniteClientConfigurationImpl.DFLT_BACKGROUND_RECONNECT_INTERVAL,
+                null,
+                IgniteClientConfigurationImpl.DFLT_HEARTBEAT_INTERVAL,
+                IgniteClientConfigurationImpl.DFLT_HEARTBEAT_TIMEOUT,
+                null,
+                null,
+                extractSslConfiguration(connProps),
+                false,
+                extractAuthenticationConfiguration(connProps),
+                IgniteClientConfiguration.DFLT_OPERATION_TIMEOUT);
+
+        return (TcpIgniteClient) sync(TcpIgniteClient.startAsync(cfg, 
observableTime));
+    }
+
     /**
      * Constructor used for testing purposes.
      */
@@ -848,6 +863,11 @@ public class JdbcConnection implements Connection {
         return connectionId;
     }
 
+    /** Returns an observable timestamp. */
+    long observableTimestamp() {
+        return client.channel().observableTimestamp();
+    }
+
     /** {@inheritDoc} */
     @Override
     public <T> T unwrap(Class<T> iface) throws SQLException {
diff --git 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
index 48270bc32c..e5b2507a58 100644
--- 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
+++ 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
@@ -143,9 +143,10 @@ public class JdbcStatement implements Statement {
         long correlationToken = nextToken();
 
         JdbcQueryExecuteRequest req = new JdbcQueryExecuteRequest(stmtType, 
schema, pageSize, maxRows, sql, args,
-                conn.getAutoCommit(), multiStatement, queryTimeoutMillis, 
correlationToken);
+                conn.getAutoCommit(), multiStatement, queryTimeoutMillis, 
correlationToken, conn.observableTimestamp());
 
         JdbcQueryExecuteResponse res;
+
         try {
             res = (JdbcQueryExecuteResponse) 
conn.handler().queryAsync(conn.connectionId(), req).get();
         } catch (InterruptedException e) {
diff --git 
a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java 
b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
index fd935978f3..c5242c3ad1 100644
--- a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
+++ b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
@@ -27,6 +27,7 @@ import java.sql.DriverPropertyInfo;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.jdbc.ConnectionPropertiesImpl;
@@ -161,6 +162,8 @@ public class IgniteJdbcDriver implements Driver {
     /** Minor version. */
     private static final int MINOR_VER = ProtocolVersion.LATEST_VER.minor();
 
+    private static final AtomicLong tracker = new AtomicLong();
+
     /** {@inheritDoc} */
     @Override
     public Connection connect(String url, Properties props) throws 
SQLException {
@@ -172,7 +175,7 @@ public class IgniteJdbcDriver implements Driver {
 
         connProps.init(url, props);
 
-        return new JdbcConnection(connProps);
+        return new JdbcConnection(connProps, tracker);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
index ec994d78ab..df7a1f2ef3 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/HybridTimestampTracker.java
@@ -96,6 +96,36 @@ public interface HybridTimestampTracker {
         };
     }
 
+    /**
+     * Creates a client-managed HybridTimestampTracker based on a given 
initial timestamp and an update consumer.
+     *
+     * @param intTs The initial HybridTimestamp, or null if no initial 
timestamp is provided.
+     * @param updateTs A Consumer that accepts a HybridTimestamp for managing 
updates to the timestamp.
+     * @return A HybridTimestampTracker instance that uses the provided 
initial timestamp and update mechanism.
+     */
+    static HybridTimestampTracker atomicClientTracker(@Nullable 
HybridTimestamp intTs, Consumer<HybridTimestamp> updateTs) {
+        return new HybridTimestampTracker() {
+            /** Timestamp. */
+            private final AtomicLong timestamp = new 
AtomicLong(hybridTimestampToLong(intTs));
+
+            @Override
+            public @Nullable HybridTimestamp get() {
+                return nullableHybridTimestamp(timestamp.get());
+            }
+
+            @Override
+            public void update(@Nullable HybridTimestamp ts) {
+                long tsVal = hybridTimestampToLong(ts);
+                long updated = timestamp.updateAndGet(x -> Math.max(x, tsVal));
+
+                // If updated.
+                if (updated == tsVal) {
+                    updateTs.accept(ts);
+                }
+            }
+        };
+    }
+
     /**
      * Get the observable timestamp.
      *


Reply via email to