This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7547d173484 IGNITE-22090 Piggyback tx start in first tx request
7547d173484 is described below

commit 7547d1734846b11e53c682e2b03ca5b4def808af
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Tue May 27 09:28:38 2025 +0300

    IGNITE-22090 Piggyback tx start in first tx request
---
 .../client/proto/ProtocolBitmaskFeature.java       |   7 +-
 .../internal/client/proto/tx/ClientTxUtils.java    |   5 +-
 .../ignite/client/handler/ItClientHandlerTest.java |   8 +-
 .../ignite/client/handler/ClientHandlerModule.java |   3 +-
 .../handler/ClientInboundMessageHandler.java       |  29 ++++-
 .../requests/sql/ClientSqlExecuteBatchRequest.java |   2 +-
 .../requests/sql/ClientSqlExecuteRequest.java      |   2 +-
 .../sql/ClientSqlQueryMetadataRequest.java         |   2 +-
 .../handler/requests/table/ClientTableCommon.java  | 101 ++++++++++----
 .../table/ClientTupleContainsAllKeysRequest.java   |   2 +-
 .../table/ClientTupleContainsKeyRequest.java       |   2 +-
 .../table/ClientTupleDeleteAllExactRequest.java    |   2 +-
 .../table/ClientTupleDeleteAllRequest.java         |   2 +-
 .../table/ClientTupleDeleteExactRequest.java       |   2 +-
 .../requests/table/ClientTupleDeleteRequest.java   |   2 +-
 .../requests/table/ClientTupleGetAllRequest.java   |   2 +-
 .../table/ClientTupleGetAndDeleteRequest.java      |   2 +-
 .../table/ClientTupleGetAndReplaceRequest.java     |   2 +-
 .../table/ClientTupleGetAndUpsertRequest.java      |   2 +-
 .../requests/table/ClientTupleGetRequest.java      |   2 +-
 .../table/ClientTupleInsertAllRequest.java         |   2 +-
 .../requests/table/ClientTupleInsertRequest.java   |   2 +-
 .../table/ClientTupleReplaceExactRequest.java      |   2 +-
 .../requests/table/ClientTupleReplaceRequest.java  |   2 +-
 .../requests/table/ClientTupleRequestBase.java     |  15 ++-
 .../table/ClientTupleUpsertAllRequest.java         |   2 +-
 .../requests/table/ClientTupleUpsertRequest.java   |   2 +-
 .../requests/table/ClientTuplesRequestBase.java    |  14 +-
 .../requests/tx/ClientTransactionBeginRequest.java |  44 +------
 .../tx/ClientTransactionCommitRequest.java         |  12 +-
 .../ignite/internal/client/ReliableChannel.java    |  35 +++--
 .../ignite/internal/client/TcpClientChannel.java   |   3 +-
 .../ignite/internal/client/WriteContext.java       |  12 +-
 .../internal/client/compute/ClientCompute.java     |   2 -
 .../client/compute/ClientJobExecution.java         |   4 -
 .../ignite/internal/client/sql/ClientSql.java      |   4 +-
 .../internal/client/table/ClientDataStreamer.java  |   1 -
 .../ignite/internal/client/table/ClientTable.java  | 145 +++++++++++++++------
 .../internal/client/tx/ClientLazyTransaction.java  |  54 +++++---
 .../internal/client/tx/ClientTransaction.java      |  25 ++--
 .../internal/client/tx/ClientTransactions.java     |  47 ++++---
 .../org/apache/ignite/client/ConnectionTest.java   |   4 +-
 .../ignite/client/FeatureCompatibilityTest.java    |  95 ++++++++++++++
 .../client/ObservableTimestampPropagationTest.java |  14 +-
 .../org/apache/ignite/client/RetryPolicyTest.java  |   4 +-
 .../ignite/client/TestClientHandlerModule.java     |  26 +++-
 .../java/org/apache/ignite/client/TestServer.java  |  10 +-
 .../RepeatedFinishClientTransactionTest.java       |   5 +-
 .../app/client/ItThinClientTransactionsTest.java   |  38 +++++-
 .../ItThinClientTransactionsWithReplicasTest.java  |   6 +-
 50 files changed, 553 insertions(+), 259 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index 80df673426a..78c7109d885 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -57,7 +57,12 @@ public enum ProtocolBitmaskFeature {
     /**
      * Delayed ack optimization for directly mapped transactions.
      */
-    TX_DELAYED_ACKS(6);
+    TX_DELAYED_ACKS(6),
+
+    /**
+     * Piggyback txn start in the first request.
+     */
+    TX_PIGGYBACK(7);
 
     private static final EnumSet<ProtocolBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
             EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ClientTxUtils.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ClientTxUtils.java
index 86568ae61d6..151c361566e 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ClientTxUtils.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ClientTxUtils.java
@@ -21,6 +21,9 @@ package org.apache.ignite.internal.client.proto.tx;
  * Utility class for client transactions.
  */
 public class ClientTxUtils {
-    /** Tx resource id for direct mapping. */
+    /** Tx resource id for directly mapped request. */
     public static final long TX_ID_DIRECT = 0L;
+
+    /** Tx resource id for directly mapped first request. */
+    public static final long TX_ID_FIRST_DIRECT = -1L;
 }
diff --git 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index bc1aa75bbac..f20fa71956c 100644
--- 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -488,10 +488,13 @@ public class ItClientHandlerTest extends 
BaseIgniteAbstractTest {
             packer.packInt(2); // Client type: general purpose.
 
             BitSet clientFeatures = new BitSet();
-            // Supported feature
+            // Supported features
             clientFeatures.set(1);
+            clientFeatures.set(2);
+            clientFeatures.set(6);
+            clientFeatures.set(7);
             // Unsupported feature
-            clientFeatures.set(3);
+            clientFeatures.set(4);
 
             packer.packBinaryHeader(clientFeatures.toByteArray().length); // 
Features.);
             packer.writePayload(clientFeatures.toByteArray());
@@ -538,6 +541,7 @@ public class ItClientHandlerTest extends 
BaseIgniteAbstractTest {
             expected.set(3);
             expected.set(5);
             expected.set(6);
+            expected.set(7);
             assertEquals(expected, supportedFeatures);
 
             var extensionsLen = unpacker.unpackInt();
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 055c4c51176..d5b691d8b85 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -88,7 +88,8 @@ public class ClientHandlerModule implements IgniteComponent, 
PlatformComputeTran
             ProtocolBitmaskFeature.TX_DIRECT_MAPPING,
             ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB,
             ProtocolBitmaskFeature.STREAMER_RECEIVER_EXECUTION_OPTIONS,
-            ProtocolBitmaskFeature.TX_DELAYED_ACKS
+            ProtocolBitmaskFeature.TX_DELAYED_ACKS,
+            ProtocolBitmaskFeature.TX_PIGGYBACK
     ));
 
     /** Connection id generator.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index ad4373ca7d5..d7cd60a0758 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -21,6 +21,7 @@ import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLA
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.STREAMER_RECEIVER_EXECUTION_OPTIONS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
@@ -476,11 +477,26 @@ public class ClientInboundMessageHandler
             BitSet clientFeatures,
             ProtocolVersion clientVer,
             int clientCode) {
-        BitSet mutuallySupportedFeatures = 
HandshakeUtils.supportedFeatures(features, clientFeatures);
+        // Disable direct mapping if not all required features are supported.
+        boolean supportsDirectMapping = 
features.get(TX_DIRECT_MAPPING.featureId()) && 
clientFeatures.get(TX_DIRECT_MAPPING.featureId())
+                && features.get(TX_DELAYED_ACKS.featureId()) && 
clientFeatures.get(TX_DELAYED_ACKS.featureId())
+                && features.get(TX_PIGGYBACK.featureId()) && 
clientFeatures.get(TX_PIGGYBACK.featureId());
 
-        clientContext = new ClientContext(clientVer, clientCode, 
mutuallySupportedFeatures, user);
+        BitSet actualFeatures;
 
-        sendHandshakeResponse(ctx, packer);
+        if (!supportsDirectMapping) {
+            actualFeatures = (BitSet) this.features.clone();
+
+            actualFeatures.clear(TX_DIRECT_MAPPING.featureId());
+            actualFeatures.clear(TX_DELAYED_ACKS.featureId());
+            actualFeatures.clear(TX_PIGGYBACK.featureId());
+        } else {
+            actualFeatures = this.features;
+        }
+
+        clientContext = new ClientContext(clientVer, clientCode, 
HandshakeUtils.supportedFeatures(actualFeatures, clientFeatures), user);
+
+        sendHandshakeResponse(ctx, packer, actualFeatures);
     }
 
     private void handshakeError(ChannelHandlerContext ctx, ClientMessagePacker 
packer, Throwable t) {
@@ -508,7 +524,7 @@ public class ClientInboundMessageHandler
         metrics.sessionsRejectedIncrement();
     }
 
-    private void sendHandshakeResponse(ChannelHandlerContext ctx, 
ClientMessagePacker packer) {
+    private void sendHandshakeResponse(ChannelHandlerContext ctx, 
ClientMessagePacker packer, BitSet mutuallySupportedFeatures) {
         ProtocolVersion.LATEST_VER.pack(packer);
         packer.packNil(); // No error.
 
@@ -538,7 +554,7 @@ public class ClientInboundMessageHandler
         packer.packByteNullable(IgniteProductVersion.CURRENT_VERSION.patch());
         
packer.packStringNullable(IgniteProductVersion.CURRENT_VERSION.preRelease());
 
-        HandshakeUtils.packFeatures(packer, features);
+        HandshakeUtils.packFeatures(packer, mutuallySupportedFeatures);
         HandshakeUtils.packExtensions(packer, extensions);
 
         write(packer, ctx);
@@ -854,8 +870,7 @@ public class ClientInboundMessageHandler
                 return ClientJdbcPrimaryKeyMetadataRequest.process(in, 
jdbcQueryEventHandler);
 
             case ClientOp.TX_BEGIN:
-                return ClientTransactionBeginRequest.process(in, txManager, 
resources, metrics, igniteTables,
-                        clientContext.hasAllFeatures(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS), tsTracker);
+                return ClientTransactionBeginRequest.process(in, txManager, 
resources, metrics, tsTracker);
 
             case ClientOp.TX_COMMIT:
                 return ClientTransactionCommitRequest.process(in, resources, 
metrics, clockService, igniteTables,
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index 085cdf1795d..2b7d560f339 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -63,7 +63,7 @@ public class ClientSqlExecuteBatchRequest {
         CancelHandle cancelHandle = CancelHandle.create();
         cancelHandleMap.put(requestId, cancelHandle);
 
-        InternalTransaction tx = readTx(in, tsTracker, resources, null, null);
+        InternalTransaction tx = readTx(in, tsTracker, resources, null, null, 
null);
         ClientSqlProperties props = new ClientSqlProperties(in);
         String statement = in.unpackString();
         BatchedArguments arguments = readArgs(in);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 169b88651e9..93f3afd96a6 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -81,7 +81,7 @@ public class ClientSqlExecuteRequest {
         CancelHandle cancelHandle = CancelHandle.create();
         cancelHandles.put(requestId, cancelHandle);
 
-        InternalTransaction tx = readTx(in, tsUpdater, resources, null, null);
+        InternalTransaction tx = readTx(in, tsUpdater, resources, null, null, 
null);
         ClientSqlProperties props = new ClientSqlProperties(in);
         String statement = in.unpackString();
         Object[] arguments = readArgsNotNull(in);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
index 83f83c3e227..4437b70536b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
@@ -51,7 +51,7 @@ public class ClientSqlQueryMetadataRequest {
             ClientResourceRegistry resources,
             HybridTimestampTracker tsTracker
     ) {
-        var tx = readTx(in, tsTracker, resources, null, null);
+        var tx = readTx(in, tsTracker, resources, null, null, null);
 
         String schema = in.unpackString();
         String query = in.unpackString();
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 7d31321be48..3291b83d25b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.table;
 
 import static 
org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE;
 import static 
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT;
+import static 
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT;
 import static org.apache.ignite.lang.ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
 
@@ -26,6 +27,7 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientResource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
 import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
@@ -314,29 +316,61 @@ public class ClientTableCommon {
      * Write tx metadata.
      *
      * @param out Packer.
+     * @param tsTracker Timestamp tracker.
      * @param clockService Clock service.
-     * @param tx The transaction.
+     * @param req Request.
      */
-    public static void writeTxMeta(
-            ClientMessagePacker out, HybridTimestampTracker tsTracker, 
@Nullable ClockService clockService, InternalTransaction tx) {
-        if (!tx.remote()) {
-            return;
-        }
+    static void writeTxMeta(
+            ClientMessagePacker out, HybridTimestampTracker tsTracker, 
@Nullable ClockService clockService, ClientTupleRequestBase req) {
+        writeTxMeta(out, tsTracker, clockService, req.tx(), req.resourceId());
+    }
 
-        TxState state = tx.state();
+    /**
+     * Write tx metadata.
+     *
+     * @param out Packer.
+     * @param tsTracker Timestamp tracker.
+     * @param clockService Clock service.
+     * @param req Request.
+     */
+    static void writeTxMeta(
+            ClientMessagePacker out, HybridTimestampTracker tsTracker, 
@Nullable ClockService clockService, ClientTuplesRequestBase req) {
+        writeTxMeta(out, tsTracker, clockService, req.tx(), req.resourceId());
+    }
 
-        if (state == TxState.ABORTED) {
-            // No-op enlistment.
-            out.packNil();
-        } else {
-            // Remote tx carries operation enlistment info.
-            PendingTxPartitionEnlistment token = tx.enlistedPartition(null);
-            out.packString(token.primaryNodeConsistentId());
-            out.packLong(token.consistencyToken());
-        }
+    /**
+     * Write tx metadata.
+     *
+     * @param out Packer.
+     * @param tsTracker Timestamp tracker.
+     * @param clockService Clock service.
+     * @param tx Transaction.
+     * @param resourceId Resource id.
+     */
+    public static void writeTxMeta(ClientMessagePacker out, 
HybridTimestampTracker tsTracker, @Nullable ClockService clockService,
+            InternalTransaction tx, long resourceId) {
+        if (resourceId != 0) {
+            // Resource id is assigned on a first request in direct mode.
+            out.packLong(resourceId);
+            out.packUuid(tx.id());
+            out.packUuid(tx.coordinatorId());
+            out.packLong(tx.getTimeout());
+        } else if (tx.remote()) {
+            TxState state = tx.state();
+
+            if (state == TxState.ABORTED) {
+                // No-op enlistment.
+                out.packNil();
+            } else {
+                // Remote tx carries operation enlistment info.
+                PendingTxPartitionEnlistment token = 
tx.enlistedPartition(null);
+                out.packString(token.primaryNodeConsistentId());
+                out.packLong(token.consistencyToken());
+            }
 
-        if (clockService != null) {
-            tsTracker.update(clockService.current());
+            if (clockService != null) {
+                tsTracker.update(clockService.current());
+            }
         }
     }
 
@@ -358,6 +392,7 @@ public class ClientTableCommon {
      * @param resources Resource registry.
      * @param txManager Tx manager.
      * @param notificationSender Notification sender.
+     * @param resourceIdHolder Resource id holder.
      * @return Transaction, if present, or null.
      */
     public static @Nullable InternalTransaction readTx(
@@ -365,15 +400,34 @@ public class ClientTableCommon {
             HybridTimestampTracker tsUpdater,
             ClientResourceRegistry resources,
             @Nullable TxManager txManager,
-            @Nullable NotificationSender notificationSender
-    ) {
+            @Nullable NotificationSender notificationSender,
+            long[] resourceIdHolder) {
         if (in.tryUnpackNil()) {
             return null;
         }
 
         try {
             long id = in.unpackLong();
-            if (id == TX_ID_DIRECT) {
+            if (id == TX_ID_FIRST_DIRECT) {
+                long observableTs = in.unpackLong();
+
+                // This is first mapping request, which piggybacks transaction 
creation.
+                boolean readOnly = in.unpackBoolean();
+                long timeoutMillis = in.unpackLong();
+
+                InternalTxOptions txOptions = InternalTxOptions.builder()
+                        .timeoutMillis(timeoutMillis)
+                        .build();
+
+                var tx = startExplicitTx(tsUpdater, txManager, 
HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly,
+                        txOptions);
+
+                // Attach resource id only on first direct request.
+                resourceIdHolder[0] = resources.put(new ClientResource(tx, 
tx::rollbackAsync));
+
+                return tx;
+            } else if (id == TX_ID_DIRECT) {
+                // This is direct request mapping.
                 long token = in.unpackLong();
                 UUID txId = in.unpackUuid();
                 int commitTableId = in.unpackInt();
@@ -416,8 +470,9 @@ public class ClientTableCommon {
             ClientResourceRegistry resources,
             TxManager txManager,
             boolean readOnly,
-            @Nullable NotificationSender notificationSender) {
-        InternalTransaction tx = readTx(in, readTs, resources, txManager, 
notificationSender);
+            @Nullable NotificationSender notificationSender,
+            long[] resourceIdHolder) {
+        InternalTransaction tx = readTx(in, readTs, resources, txManager, 
notificationSender, resourceIdHolder);
 
         if (tx == null) {
             // Implicit transactions do not use an observation timestamp 
because RW never depends on it, and implicit RO is always direct.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
index 74ddb89b931..89c37b08da7 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
@@ -53,7 +53,7 @@ public class ClientTupleContainsAllKeysRequest {
         return ClientTuplesRequestBase.readAsync(in, tables, resources, 
txManager, false, null, tsTracker, true)
                 .thenCompose(req -> 
req.table().recordView().containsAllAsync(req.tx(), req.tuples())
                         .thenApply(containsAll -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                             out.packBoolean(containsAll);
                         }));
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
index 55aaa42c8dd..e08a309c665 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
@@ -52,7 +52,7 @@ public class ClientTupleContainsKeyRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, true, null, tsTracker, true)
                 .thenCompose(req -> 
req.table().recordView().containsAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                             out.packBoolean(res);
                         }));
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
index 4d8607ebdd7..026983cfa18 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleDeleteAllExactRequest {
         return ClientTuplesRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false)
                 .thenCompose(req -> 
req.table().recordView().deleteAllExactAsync(req.tx(), req.tuples())
                         .thenApply(skippedTuples -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             writeTuples(out, skippedTuples, 
req.table().schemaView());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
index 15d604278b4..660f55c0a28 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
@@ -56,7 +56,7 @@ public class ClientTupleDeleteAllRequest {
         return ClientTuplesRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, true)
                 .thenCompose(req -> 
req.table().recordView().deleteAllAsync(req.tx(), req.tuples())
                         .thenApply(skippedTuples -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             writeTuples(out, skippedTuples, TuplePart.KEY, 
req.table().schemaView());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
index abd586779a5..c1a51fe3e22 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleDeleteExactRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false)
                 .thenCompose(req -> 
req.table().recordView().deleteExactAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                             out.packBoolean(res);
                         }));
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
index d537c446b12..3020caac20d 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleDeleteRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, true)
                 .thenCompose(req -> 
req.table().recordView().deleteAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                             out.packBoolean(res);
                         }));
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
index f1fa7a9119e..d9a4e90f52b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleGetAllRequest {
         return ClientTuplesRequestBase.readAsync(in, tables, resources, 
txManager, false, null, tsTracker, true)
                 .thenCompose(req -> 
req.table().recordView().getAllAsync(req.tx(), req.tuples())
                         .thenApply(resTuples -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             writeTuplesNullable(out, resTuples, 
TuplePart.KEY_AND_VAL, req.table().schemaView());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
index 6922fa171aa..d03003add7e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleGetAndDeleteRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, true)
                 .thenCompose(req -> 
req.table().recordView().getAndDeleteAsync(req.tx(), req.tuple())
                         .thenApply(resTuple -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             ClientTableCommon.writeTupleOrNil(out, resTuple, 
TuplePart.KEY_AND_VAL, req.table().schemaView());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
index bf53fe42ddd..a6a27b32991 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleGetAndReplaceRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false)
                 .thenCompose(req -> 
req.table().recordView().getAndReplaceAsync(req.tx(), req.tuple())
                         .thenApply(resTuple -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             ClientTableCommon.writeTupleOrNil(out, resTuple, 
TuplePart.KEY_AND_VAL, req.table().schemaView());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
index a5c0630d8fd..26aae656f79 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleGetAndUpsertRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false)
                 .thenCompose(req -> 
req.table().recordView().getAndUpsertAsync(req.tx(), req.tuple())
                         .thenApply(resTuple -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             ClientTableCommon.writeTupleOrNil(out, resTuple, 
TuplePart.KEY_AND_VAL, req.table().schemaView());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
index 148811f1ca5..234e2964841 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
@@ -53,7 +53,7 @@ public class ClientTupleGetRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, true, null, tsTracker, true)
                 .thenCompose(req -> 
req.table().recordView().getAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             ClientTableCommon.writeTupleOrNil(out, res, 
TuplePart.KEY_AND_VAL, req.table().schemaView());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
index e918c2ecd1e..f1ca252dcc5 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
@@ -57,7 +57,7 @@ public class ClientTupleInsertAllRequest {
         return ClientTuplesRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false)
                 .thenCompose(req -> 
req.table().recordView().insertAllAsync(req.tx(), req.tuples())
                         .thenApply(skippedTuples -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             writeTuples(out, skippedTuples, 
req.table().schemaView());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
index f6bcf5f908f..4988f724e04 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
@@ -56,7 +56,7 @@ public class ClientTupleInsertRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false)
                 .thenCompose(req -> 
req.table().recordView().insertAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                             out.packBoolean(res);
                         }));
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
index c3a72023b1f..240fac2f29c 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleReplaceExactRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false, true)
                 .thenCompose(req -> 
req.table().recordView().replaceAsync(req.tx(), req.tuple(), req.tuple2())
                         .thenApply(res -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                             out.packBoolean(res);
                         }));
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
index f4fd263eaa9..49dc05f277d 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleReplaceRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false)
                 .thenCompose(req -> 
req.table().recordView().replaceAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                             out.packBoolean(res);
                         }));
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
index 0d33dc026c5..6908613a81a 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
@@ -39,12 +39,15 @@ class ClientTupleRequestBase {
     private final TableViewInternal table;
     private final Tuple tuple;
     private final @Nullable Tuple tuple2;
+    private final long resourceId;
 
-    private ClientTupleRequestBase(@Nullable InternalTransaction tx, 
TableViewInternal table, Tuple tuple, @Nullable Tuple tuple2) {
+    private ClientTupleRequestBase(@Nullable InternalTransaction tx, 
TableViewInternal table, Tuple tuple, @Nullable Tuple tuple2,
+            long resourceId) {
         this.tx = tx;
         this.table = table;
         this.tuple = tuple;
         this.tuple2 = tuple2;
+        this.resourceId = resourceId;
     }
 
     public InternalTransaction tx() {
@@ -53,6 +56,10 @@ class ClientTupleRequestBase {
         return tx;
     }
 
+    public long resourceId() {
+        return resourceId;
+    }
+
     public TableViewInternal table() {
         return table;
     }
@@ -95,9 +102,11 @@ class ClientTupleRequestBase {
 
         int tableId = in.unpackInt();
 
+        long[] resIdHolder = {0};
+
         InternalTransaction tx = txManager == null
                 ? null
-                : readOrStartImplicitTx(in, tsTracker, resources, txManager, 
txReadOnly, notificationSender);
+                : readOrStartImplicitTx(in, tsTracker, resources, txManager, 
txReadOnly, notificationSender, resIdHolder);
 
         int schemaId = in.unpackInt();
 
@@ -113,7 +122,7 @@ class ClientTupleRequestBase {
                             var tuple = readTuple(noValueSet, tupleBytes, 
keyOnly, schema);
                             var tuple2 = readSecondTuple ? 
readTuple(noValueSet2, tupleBytes2, keyOnly, schema) : null;
 
-                            return new ClientTupleRequestBase(tx, table, 
tuple, tuple2);
+                            return new ClientTupleRequestBase(tx, table, 
tuple, tuple2, resIdHolder[0]);
                         }));
 
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
index d60dd38a926..a11e4574be3 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleUpsertAllRequest {
         return ClientTuplesRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false)
                 .thenCompose(req -> 
req.table().recordView().upsertAllAsync(req.tx(), req.tuples())
                         .thenApply(v -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
index 1e785291c9c..38e94ccd4d0 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
@@ -56,7 +56,7 @@ public class ClientTupleUpsertRequest {
         return ClientTupleRequestBase.readAsync(in, tables, resources, 
txManager, false, notificationSender, tsTracker, false)
                 .thenCompose(req -> 
req.table().recordView().upsertAsync(req.tx(), req.tuple())
                         .thenApply(v -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, 
req.tx());
+                            writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                         }));
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
index 88aae188726..c6dd6ec54d4 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
@@ -40,17 +40,23 @@ class ClientTuplesRequestBase {
     private final InternalTransaction tx;
     private final TableViewInternal table;
     private final List<Tuple> tuples;
+    private final long resourceId;
 
-    private ClientTuplesRequestBase(InternalTransaction tx, TableViewInternal 
table, List<Tuple> tuples) {
+    private ClientTuplesRequestBase(InternalTransaction tx, TableViewInternal 
table, List<Tuple> tuples, long resourceId) {
         this.tx = tx;
         this.table = table;
         this.tuples = tuples;
+        this.resourceId = resourceId;
     }
 
     public InternalTransaction tx() {
         return tx;
     }
 
+    public long resourceId() {
+        return resourceId;
+    }
+
     public TableViewInternal table() {
         return table;
     }
@@ -85,7 +91,9 @@ class ClientTuplesRequestBase {
     ) {
         int tableId = in.unpackInt();
 
-        InternalTransaction tx = readOrStartImplicitTx(in, tsTracker, 
resources, txManager, txReadOnly, notificationSender);
+        long[] resIdHolder = {0};
+
+        InternalTransaction tx = readOrStartImplicitTx(in, tsTracker, 
resources, txManager, txReadOnly, notificationSender, resIdHolder);
 
         int schemaId = in.unpackInt();
 
@@ -108,7 +116,7 @@ class ClientTuplesRequestBase {
                                 tuples.add(readTuple(noValueSet[i], 
tupleBytes[i], keyOnly, schema));
                             }
 
-                            return new ClientTuplesRequestBase(tx, table, 
tuples);
+                            return new ClientTuplesRequestBase(tx, table, 
tuples, resIdHolder[0]);
                         }));
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
index 5a33b3971c8..2e04f3f016e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.client.handler.requests.tx;
 
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.startExplicitTx;
-import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.tableIdNotFoundException;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
@@ -29,11 +28,6 @@ import 
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
-import org.apache.ignite.internal.lang.IgniteSystemProperties;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.table.IgniteTablesInternal;
-import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.tx.InternalTxOptions;
 import org.apache.ignite.internal.tx.TxManager;
 
@@ -41,8 +35,6 @@ import org.apache.ignite.internal.tx.TxManager;
  * Client transaction begin request.
  */
 public class ClientTransactionBeginRequest {
-    private static final boolean enabledColocation = 
IgniteSystemProperties.enabledColocation();
-
     /**
      * Processes the request.
      *
@@ -50,8 +42,6 @@ public class ClientTransactionBeginRequest {
      * @param txManager Transactions.
      * @param resources Resources.
      * @param metrics Metrics.
-     * @param igniteTables Tables facade.
-     * @param enableDirectMapping Direct mapping feature.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -59,26 +49,15 @@ public class ClientTransactionBeginRequest {
             TxManager txManager,
             ClientResourceRegistry resources,
             ClientHandlerMetricSource metrics,
-            IgniteTablesInternal igniteTables,
-            boolean enableDirectMapping,
             HybridTimestampTracker tsTracker
     ) throws IgniteInternalCheckedException {
         boolean readOnly = in.unpackBoolean();
         long timeoutMillis = in.unpackLong();
-        long observable = in.unpackLong();
 
         HybridTimestamp observableTs = null;
-        int tableId = -1;
-        int partition = -1;
-
         if (readOnly) {
             // Timestamp makes sense only for read-only transactions.
-            observableTs = HybridTimestamp.nullableHybridTimestamp(observable);
-        } else if (enableDirectMapping) {
-            // Read commit partition info.
-            // It may be not available if client has not yet loaded partition 
map.
-            tableId = in.unpackInt();
-            partition = in.unpackInt();
+            observableTs = 
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
         }
 
         InternalTxOptions txOptions = InternalTxOptions.builder()
@@ -87,22 +66,6 @@ public class ClientTransactionBeginRequest {
 
         var tx = startExplicitTx(tsTracker, txManager, observableTs, readOnly, 
txOptions);
 
-        // Assign commit partition at the beginning to avoid races on a client.
-        if (tableId != -1) {
-            if (enabledColocation) {
-                TableViewInternal tableView = 
igniteTables.cachedTable(tableId);
-
-                if (tableView == null) {
-                    tx.rollback();
-                    throw tableIdNotFoundException(tableId);
-                }
-
-                tx.assignCommitPartition(new 
ZonePartitionId(tableView.zoneId(), partition));
-            } else {
-                tx.assignCommitPartition(new TablePartitionId(tableId, 
partition));
-            }
-        }
-
         if (readOnly) {
             // Propagate assigned read timestamp to client to enforce 
serializability on subsequent reads from another node.
             tsTracker.update(tx.readTimestamp());
@@ -114,11 +77,6 @@ public class ClientTransactionBeginRequest {
 
             return CompletableFuture.completedFuture(out -> {
                 out.packLong(resourceId);
-
-                if (enableDirectMapping && !readOnly) {
-                    out.packUuid(tx.id());
-                    out.packUuid(tx.coordinatorId());
-                }
             });
         } catch (IgniteInternalCheckedException e) {
             tx.rollback();
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
index 15e9f0bd37a..114e1f8efcb 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
@@ -70,11 +70,6 @@ public class ClientTransactionCommitRequest {
 
         // Attempt to merge server and client mappings.
         if (enableDirectMapping && !tx.isReadOnly()) {
-            long causality = in.unpackLong();
-
-            // Update causality.
-            
clockService.updateClock(HybridTimestamp.hybridTimestamp(causality));
-
             int cnt = in.unpackInt(); // Number of direct enlistments.
 
             List<IgniteTuple3<TablePartitionId, String, Long>> list = new 
ArrayList<>();
@@ -87,6 +82,13 @@ public class ClientTransactionCommitRequest {
                 list.add(new IgniteTuple3<>(new TablePartitionId(tableId, 
partId), consistentId, token));
             }
 
+            if (cnt > 0) {
+                long causality = in.unpackLong();
+
+                // Update causality.
+                
clockService.updateClock(HybridTimestamp.hybridTimestamp(causality));
+            }
+
             Exception ex = null;
 
             for (IgniteTuple3<TablePartitionId, String, Long> enlistment : 
list) {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 3b812b2bb87..f3229847db6 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.function.ToIntFunction;
 import java.util.stream.Collectors;
 import org.apache.ignite.client.ClientOperationType;
@@ -246,13 +247,12 @@ public final class ReliableChannel implements 
AutoCloseable {
     /**
      * Sends request and handles response asynchronously.
      *
+     * @param <T> response type.
      * @param opCode Operation code.
      * @param payloadWriter Payload writer.
      * @param payloadReader Payload reader.
-     * @param <T> response type.
      * @param preferredNodeName Unique name (consistent id) of the preferred 
target node. When a connection to the specified node
      *         exists, it will be used to handle the request; otherwise, 
default connection will be used.
-     * @param fallbackNodeName Fallback node name.
      * @param retryPolicyOverride Retry policy override.
      * @return Future for the operation.
      */
@@ -261,12 +261,11 @@ public final class ReliableChannel implements 
AutoCloseable {
             @Nullable PayloadWriter payloadWriter,
             @Nullable PayloadReader<T> payloadReader,
             @Nullable String preferredNodeName,
-            @Nullable String fallbackNodeName,
             @Nullable RetryPolicy retryPolicyOverride,
             boolean expectNotifications
     ) {
         return ClientFutureUtils.doWithRetryAsync(
-                () -> getChannelAsync(preferredNodeName, fallbackNodeName)
+                () -> getChannelAsync(preferredNodeName)
                         .thenCompose(ch -> serviceAsyncInternal(opCode, 
payloadWriter, payloadReader, expectNotifications, ch)),
                 null,
                 ctx -> shouldRetry(opCode, ctx, retryPolicyOverride));
@@ -279,9 +278,7 @@ public final class ReliableChannel implements AutoCloseable 
{
      * @param payloadWriter Payload writer.
      * @param payloadReader Payload reader.
      * @param <T> response type.
-     * @param preferredNodeName Unique name (consistent id) of the preferred 
target node. When a connection to the specified node
-     *         exists, it will be used to handle the request; otherwise, 
default connection will be used.
-     * @param fallbackNodeName Fallback node name to connect if a preferred 
node connection is not available.
+     * @param channelResolver Channel resolver.
      * @param retryPolicyOverride Retry policy override.
      * @return Future for the operation.
      */
@@ -290,13 +287,12 @@ public final class ReliableChannel implements 
AutoCloseable {
             Function<ClientChannel, CompletableFuture<Void>> channelReadyCb,
             @Nullable PayloadWriter payloadWriter,
             @Nullable PayloadReader<T> payloadReader,
-            @Nullable String preferredNodeName,
-            @Nullable String fallbackNodeName,
+            Supplier<CompletableFuture<ClientChannel>> channelResolver,
             @Nullable RetryPolicy retryPolicyOverride,
             boolean expectNotifications
     ) {
         return ClientFutureUtils.doWithRetryAsync(
-                () -> getChannelAsync(preferredNodeName, fallbackNodeName)
+                () -> channelResolver.get()
                         .thenCompose(ch -> 
channelReadyCb.apply(ch).thenApply(ignored -> ch))
                         .thenCompose(ch -> serviceAsyncInternal(opCode, 
payloadWriter, payloadReader, expectNotifications, ch)),
                 null,
@@ -320,7 +316,7 @@ public final class ReliableChannel implements AutoCloseable 
{
             @Nullable PayloadReader<T> payloadReader
     ) {
         return ClientFutureUtils.doWithRetryAsync(
-                () -> getChannelAsync(null, null)
+                () -> getChannelAsync(null)
                         .thenCompose(ch -> {
                             int opCode = opCodeFunc.applyAsInt(ch);
                             return serviceAsyncInternal(opCode, payloadWriter, 
payloadReader, false, ch);
@@ -343,7 +339,7 @@ public final class ReliableChannel implements AutoCloseable 
{
             PayloadWriter payloadWriter,
             @Nullable PayloadReader<T> payloadReader
     ) {
-        return serviceAsync(opCode, payloadWriter, payloadReader, null, null, 
null, false);
+        return serviceAsync(opCode, payloadWriter, payloadReader, null, null, 
false);
     }
 
     /**
@@ -355,7 +351,7 @@ public final class ReliableChannel implements AutoCloseable 
{
      * @return Future for the operation.
      */
     public <T> CompletableFuture<T> serviceAsync(int opCode, PayloadReader<T> 
payloadReader) {
-        return serviceAsync(opCode, null, payloadReader, null, null, null, 
false);
+        return serviceAsync(opCode, null, payloadReader, null, null, false);
     }
 
     private <T> CompletableFuture<T> serviceAsyncInternal(
@@ -371,15 +367,18 @@ public final class ReliableChannel implements 
AutoCloseable {
         });
     }
 
-    private CompletableFuture<ClientChannel> getChannelAsync(@Nullable String 
preferredNodeName, @Nullable String fallbackNodeName) {
+    /**
+     * Get the channel.
+     *
+     * @param preferredNodeName Preferred node name.
+     *
+     * @return The future.
+     */
+    public CompletableFuture<ClientChannel> getChannelAsync(@Nullable String 
preferredNodeName) {
         // 1. Preferred node connection.
         if (preferredNodeName != null) {
             ClientChannelHolder holder = 
nodeChannelsByName.get(preferredNodeName);
 
-            if (fallbackNodeName != null && holder == null) {
-                holder = nodeChannelsByName.get(fallbackNodeName);
-            }
-
             if (holder != null) {
                 return holder.getOrCreateChannelAsync().thenCompose(ch -> {
                     if (ch != null) {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 6baf36f4a7e..e1c314c3376 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -88,7 +88,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             ProtocolBitmaskFeature.TABLE_GET_REQS_USE_QUALIFIED_NAME,
             ProtocolBitmaskFeature.TX_DIRECT_MAPPING,
             ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB,
-            ProtocolBitmaskFeature.TX_DELAYED_ACKS
+            ProtocolBitmaskFeature.TX_DELAYED_ACKS,
+            ProtocolBitmaskFeature.TX_PIGGYBACK
     ));
 
     /** Minimum supported heartbeat interval. */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/WriteContext.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/WriteContext.java
index 0683a148a93..3564f3a2b0e 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/WriteContext.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/WriteContext.java
@@ -15,9 +15,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.ignite.internal.client;
 
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -26,4 +28,12 @@ import org.jetbrains.annotations.Nullable;
 public class WriteContext {
     public @Nullable PartitionMapping pm;
     public @Nullable Long enlistmentToken;
+    public CompletableFuture<ClientTransaction> firstReqFut;
+    public final HybridTimestampTracker tracker;
+    public boolean readOnly;
+    public @Nullable ClientChannel channel;
+
+    public WriteContext(HybridTimestampTracker tracker) {
+        this.tracker = tracker;
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 0602694b6ff..6f7057e7c2c 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -319,7 +319,6 @@ public class ClientCompute implements IgniteCompute {
                 ClientCompute::unpackSubmitTaskResult,
                 null,
                 null,
-                null,
                 true
         );
     }
@@ -336,7 +335,6 @@ public class ClientCompute implements IgniteCompute {
                 ClientCompute::unpackSubmitResult,
                 node.name(),
                 null,
-                null,
                 true
         );
     }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index efcc8ba5560..c78e3f31796 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -121,7 +121,6 @@ class ClientJobExecution<R> implements JobExecution<R> {
                 ClientJobExecution::unpackJobState,
                 null,
                 null,
-                null,
                 false
         );
     }
@@ -135,7 +134,6 @@ class ClientJobExecution<R> implements JobExecution<R> {
                 ClientJobExecution::unpackTaskState,
                 null,
                 null,
-                null,
                 false
         );
     }
@@ -149,7 +147,6 @@ class ClientJobExecution<R> implements JobExecution<R> {
                 ClientJobExecution::unpackBooleanResult,
                 null,
                 null,
-                null,
                 false
         );
     }
@@ -166,7 +163,6 @@ class ClientJobExecution<R> implements JobExecution<R> {
                 ClientJobExecution::unpackBooleanResult,
                 null,
                 null,
-                null,
                 false
         );
     }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 14aeb37aade..3af0b29e9be 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -283,7 +283,7 @@ public class ClientSql implements IgniteSql {
         if (transaction != null) {
             try {
                 //noinspection resource
-                return ClientLazyTransaction.ensureStarted(transaction, ch, 
null)
+                return ClientLazyTransaction.ensureStarted(transaction, 
ch).get1()
                         .thenCompose(tx -> 
tx.channel().serviceAsync(ClientOp.SQL_EXEC, payloadWriter, payloadReader))
                         .exceptionally(ClientSql::handleException);
             } catch (TransactionException e) {
@@ -350,7 +350,7 @@ public class ClientSql implements IgniteSql {
         if (transaction != null) {
             try {
                 //noinspection resource
-                return ClientLazyTransaction.ensureStarted(transaction, ch, 
null)
+                return ClientLazyTransaction.ensureStarted(transaction, 
ch).get1()
                         .thenCompose(tx -> 
tx.channel().serviceAsync(ClientOp.SQL_EXEC_BATCH, payloadWriter, 
payloadReader))
                         .exceptionally(ClientSql::handleException);
             } catch (TransactionException e) {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
index 251701cb44b..a4300b7f014 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
@@ -104,7 +104,6 @@ class ClientDataStreamer {
                                         ? 
StreamerReceiverSerializer.deserializeReceiverResultsOnClient(in.in())
                                         : null,
                                 partitionAssignment.get(partitionId),
-                                null,
                                 new 
RetryLimitPolicy().retryLimit(options.retryLimit()),
                                 false)
                 );
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index e81bf06acc7..cb0ecb7a10c 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -21,7 +21,9 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
 import static 
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT;
+import static 
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT;
 import static 
org.apache.ignite.internal.client.tx.ClientLazyTransaction.ensureStarted;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.matchAny;
@@ -35,12 +37,12 @@ import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHE
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import org.apache.ignite.client.RetryPolicy;
 import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.ClientSchemaVersionMismatchException;
@@ -56,6 +58,7 @@ import 
org.apache.ignite.internal.client.proto.ColumnTypeConverter;
 import org.apache.ignite.internal.client.sql.ClientSql;
 import org.apache.ignite.internal.client.table.api.PublicApiClientKeyValueView;
 import org.apache.ignite.internal.client.table.api.PublicApiClientRecordView;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -311,17 +314,26 @@ public class ClientTable implements Table {
         if (tx == null) {
             out.out().packNil();
         } else {
-            ClientTransaction tx0 = ClientTransaction.get(tx);
-
-            if (ctx != null && ctx.enlistmentToken != null) {
-                out.out().packLong(TX_ID_DIRECT); // For direct enlistment, 
pass 0 for resourceId to distinguish with proxy mode.
-                out.out().packLong(ctx.enlistmentToken);
-                out.out().packUuid(tx0.txId());
-                out.out().packInt(tx0.commitTableId());
-                out.out().packInt(tx0.commitPartition());
-                out.out().packUuid(tx0.coordinatorId());
-                out.out().packLong(tx0.timeout());
+            if (ctx != null && (ctx.enlistmentToken != null || ctx.firstReqFut 
!= null)) {
+                if (ctx.firstReqFut != null) {
+                    ClientLazyTransaction tx0 = (ClientLazyTransaction) tx;
+                    out.out().packLong(TX_ID_FIRST_DIRECT);
+                    out.out().packLong(ctx.tracker.get().longValue());
+                    out.out().packBoolean(tx.isReadOnly());
+                    out.out().packLong(tx0.timeout());
+                } else {
+                    ClientTransaction tx0 = ClientTransaction.get(tx);
+                    out.out().packLong(TX_ID_DIRECT);
+                    out.out().packLong(ctx.enlistmentToken);
+                    out.out().packUuid(tx0.txId());
+                    out.out().packInt(tx0.commitTableId());
+                    out.out().packInt(tx0.commitPartition());
+                    out.out().packUuid(tx0.coordinatorId());
+                    out.out().packLong(tx0.timeout());
+                }
             } else {
+                ClientTransaction tx0 = ClientTransaction.get(tx);
+
                 //noinspection resource
                 if (tx0.channel() != out.clientChannel()) {
                     // Do not throw IgniteClientConnectionException to avoid 
retry kicking in.
@@ -488,24 +500,64 @@ public class ClientTable implements Table {
                 .thenCompose(v -> {
                     ClientSchema schema = schemaFut.getNow(null);
 
-                    Supplier<PartitionMapping> sup =
-                            () -> getPreferredNodeName(tableId(), provider, 
partitionsFut.getNow(null), schema, true);
-                    return ensureStarted(tx, ch, sup).thenCompose(tx0 -> {
-                        // Force coordinator mode for implicit transactions.
-                        @Nullable PartitionMapping forOp =
-                                getPreferredNodeName(tableId(), provider, 
partitionsFut.getNow(null), schema, tx0 == null);
-
-                        WriteContext ctx = new WriteContext();
-                        // Force proxy mode for requests colocated with 
coordinator to reduce passed enlistment info on commit.
-                        ctx.pm = tx0 != null && forOp != null && 
forOp.nodeConsistentId().equals(tx0.nodeName()) ? null : forOp;
+                    // If a partition mapping is known apriori, a request for 
explicit RW txn will be attempted in direct mode.
+                    // Direct mode is only possible if:
+                    // * a client's connection exists to a corresponding node.
+                    // * a transaction has commit partition
+                    @Nullable PartitionMapping pm =
+                            getPreferredNodeName(tableId(), provider, 
partitionsFut.getNow(null), schema, tx == null || tx.isReadOnly());
+
+                    // Write context carries request execution details over 
async chain.
+                    WriteContext ctx = new 
WriteContext(ch.observableTimestamp());
+
+                    CompletableFuture<ClientTransaction> txStartFut;
+
+                    if (tx == null) {
+                        txStartFut = nullCompletedFuture();
+                    } else {
+                        if (pm == null) {
+                            txStartFut = ensureStarted(tx, ch, () -> 
ch.getChannelAsync(null)).get1();
+                        } else {
+                            txStartFut = 
ch.getChannelAsync(pm.nodeConsistentId()).thenCompose(ch0 -> {
+                                // Enough to check only TX_PIGGYBACK flag - 
other tx flags are set if this flag is set.
+                                boolean supports = 
ch0.protocolContext().isFeatureSupported(TX_PIGGYBACK)
+                                        && 
ch0.protocolContext().clusterNode().name().equals(pm.nodeConsistentId());
+
+                                assert 
ch0.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS);
+
+                                
IgniteBiTuple<CompletableFuture<ClientTransaction>, Boolean> tuple = 
ensureStarted(tx, ch,
+                                        supports ? null : () -> 
completedFuture(ch0));
+
+                                // If this is the first direct request in 
transaction, it will also piggyback a transaction start.
+                                if (tuple.get2()) {
+                                    ctx.pm = pm;
+                                    ctx.readOnly = tx.isReadOnly();
+                                    ctx.channel = ch0;
+                                    ctx.firstReqFut = tuple.get1();
+                                    return nullCompletedFuture();
+                                } else {
+                                    return tuple.get1();
+                                }
+                            });
+                        }
+                    }
 
+                    return txStartFut.thenCompose(tx0 -> {
                         return ch.serviceAsync(opCode,
-                                        (opCh) -> useDirectMapping(tx0, ctx, 
opCh) ? enlistDirect(tx0, ch, opCh, ctx, opCode)
-                                                : nullCompletedFuture(),
+                                        (opCh) -> {
+                                            if (tx0 != null && 
tx0.hasCommitPartition()
+                                                    // If a request is 
colocated with a coordinator, it's executed in proxy mode.
+                                                    && 
!tx0.nodeName().equals(opCh.protocolContext().clusterNode().name())) {
+                                                ctx.pm = pm;
+                                                return enlistDirect(tx0, ch, 
opCh, ctx, opCode);
+                                            } else {
+                                                return nullCompletedFuture();
+                                            }
+                                        },
                                         w -> writer.accept(schema, w, ctx),
                                         r -> readSchemaAndReadData(schema, r, 
reader, defaultValue, responseSchemaRequired, ctx, tx0),
-                                        resolvePreferredNode(tx0, ctx.pm),
-                                        tx0 == null ? null : tx0.nodeName(),
+                                        () -> ctx.firstReqFut != null ? 
completedFuture(ctx.channel)
+                                                : 
ch.getChannelAsync(resolvePreferredNode(tx0, pm)),
                                         retryPolicyOverride,
                                         expectNotifications)
                                 // Read resulting schema and the rest of the 
response.
@@ -596,13 +648,10 @@ public class ClientTable implements Table {
         }
     }
 
-    private static boolean useDirectMapping(@Nullable ClientTransaction tx0, 
WriteContext ctx, ClientChannel opChannel) {
-        // Fulfilling this condition forces proxy mode.
-        boolean proxy = tx0 == null || tx0.isReadOnly() || ctx.pm == null || 
!opChannel.protocolContext()
-                .allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS);
-
-        return !proxy && ctx.pm != null && 
ctx.pm.nodeConsistentId().equals(opChannel.protocolContext().clusterNode().name())
-                && tx0.hasCommitPartition();
+    private static boolean isProxy(@Nullable Transaction tx, @Nullable 
PartitionMapping pm, ClientChannel opChannel) {
+        return tx == null || tx.isReadOnly() || pm == null
+                || 
!opChannel.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS, TX_PIGGYBACK)
+                || 
!pm.nodeConsistentId().equals(opChannel.protocolContext().clusterNode().name());
     }
 
     private static CompletableFuture<Void> enlistDirect(
@@ -631,20 +680,32 @@ public class ClientTable implements Table {
             @Nullable T defaultValue,
             boolean responseSchemaRequired,
             WriteContext ctx,
-            @Nullable ClientTransaction tx
+            @Nullable ClientTransaction tx0
     ) {
-        // Use enlistment meta only for remote transactions.
-        if (ctx.enlistmentToken != null) {
-            assert tx != null;
+        ClientMessageUnpacker in1 = in.in();
+        if (ctx.firstReqFut != null) {
+            assert tx0 == null;
+
+            long id = in1.unpackLong();
+            UUID txId = in1.unpackUuid();
+            UUID coordId = in1.unpackUuid();
+            long timeout = in1.unpackLong();
+
+            ClientTransaction tx =
+                    new ClientTransaction(in.clientChannel(), id, 
ctx.readOnly, txId, ctx.pm, coordId, ch.observableTimestamp(), timeout);
+
+            ctx.firstReqFut.complete(tx);
+        } else if (ctx.enlistmentToken != null) { // Use enlistment meta only 
for remote transactions.
+            assert tx0 != null;
             assert ctx.pm != null;
 
             if (in.in().tryUnpackNil()) {
                 // No-op.
-                in.clientChannel().inflights().removeInflight(tx.txId(), null);
+                in.clientChannel().inflights().removeInflight(tx0.txId(), 
null);
 
                 // Finish enlist on first request only.
                 if (ctx.enlistmentToken == 0) {
-                    tx.tryFinishEnlist(ctx.pm, null, 0, true);
+                    tx0.tryFinishEnlist(ctx.pm, null, 0, true);
                 }
             } else {
                 String consistentId = in.in().unpackString();
@@ -652,12 +713,12 @@ public class ClientTable implements Table {
 
                 // Finish enlist on first request only.
                 if (ctx.enlistmentToken == 0) {
-                    tx.tryFinishEnlist(ctx.pm, consistentId, token, false);
+                    tx0.tryFinishEnlist(ctx.pm, consistentId, token, false);
                 }
             }
         }
 
-        int schemaVer = in.in().unpackInt();
+        int schemaVer = in1.unpackInt();
 
         if (!responseSchemaRequired) {
             ensureSchemaLoadedAsync(schemaVer);
@@ -665,7 +726,7 @@ public class ClientTable implements Table {
             return fn.apply(null, in);
         }
 
-        if (in.in().tryUnpackNil()) {
+        if (in1.tryUnpackNil()) {
             ensureSchemaLoadedAsync(schemaVer);
 
             return defaultValue;
@@ -679,7 +740,7 @@ public class ClientTable implements Table {
 
         // Schema is not yet known - request.
         // Retain unpacker - normally it is closed when this method exits.
-        in.in().retain();
+        in1.retain();
         return new IgniteBiTuple<>(in, schemaVer);
     }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
index 9ef1fcfecd8..b905b61ef96 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.client.tx;
 
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.client.tx.ClientTransactions.USE_CONFIGURED_TIMEOUT_DEFAULT;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
-import org.apache.ignite.internal.client.PartitionMapping;
+import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
 import org.apache.ignite.tx.TransactionOptions;
@@ -97,6 +98,10 @@ public class ClientLazyTransaction implements Transaction {
         return options != null && options.readOnly();
     }
 
+    public long timeout() {
+        return options == null ? USE_CONFIGURED_TIMEOUT_DEFAULT : 
options.timeoutMillis();
+    }
+
     /**
      * Gets the node name of the node where the transaction is started. If not 
started yet, returns {@code null}.
      *
@@ -135,41 +140,52 @@ public class ClientLazyTransaction implements Transaction 
{
      *
      * @param tx Transaction.
      * @param ch Channel.
-     * @param sup Partition mapping supplier.
-     * @return Future that will be completed when the transaction is started.
+     *
+     * @return Future that will be completed when the transaction is started 
and first request flag.
      */
-    public static CompletableFuture<ClientTransaction> ensureStarted(
-            @Nullable Transaction tx,
-            ReliableChannel ch,
-            @Nullable Supplier<PartitionMapping> sup
+    public static IgniteBiTuple<CompletableFuture<ClientTransaction>, Boolean> 
ensureStarted(
+            Transaction tx,
+            ReliableChannel ch
     ) {
-        if (tx == null) {
-            return nullCompletedFuture();
-        }
+        return ensureStarted(tx, ch, () -> ch.getChannelAsync(null));
+    }
 
+    /**
+     * Ensures that the underlying transaction is actually started on the 
server.
+     *
+     * @param tx Transaction.
+     * @param ch Channel.
+     * @param channelResolver Client channel resolver. {@code null} value 
means skipping explicit tx begin request.
+     *
+     * @return Future that will be completed when the transaction is started 
and first request flag.
+     */
+    public static IgniteBiTuple<CompletableFuture<ClientTransaction>, Boolean> 
ensureStarted(
+            Transaction tx,
+            ReliableChannel ch,
+            @Nullable Supplier<CompletableFuture<ClientChannel>> 
channelResolver
+    ) {
         if (!(tx instanceof ClientLazyTransaction)) {
             throw ClientTransaction.unsupportedTxTypeException(tx);
         }
 
-        return ((ClientLazyTransaction) tx).ensureStarted(ch, sup);
+        return ((ClientLazyTransaction) tx).ensureStarted(ch, channelResolver);
     }
 
-    private synchronized CompletableFuture<ClientTransaction> ensureStarted(
+    private synchronized IgniteBiTuple<CompletableFuture<ClientTransaction>, 
Boolean> ensureStarted(
             ReliableChannel ch,
-            @Nullable Supplier<PartitionMapping> sup
+            @Nullable Supplier<CompletableFuture<ClientChannel>> 
channelResolver
     ) {
         var tx0 = tx;
 
         if (tx0 != null) {
-            return tx0;
+            return new IgniteBiTuple<>(tx0, false);
         }
 
-        @Nullable PartitionMapping pm = sup == null ? null : sup.get();
-
-        tx0 = ClientTransactions.beginAsync(ch, pm, options, 
observableTimestamp);
+        tx0 = channelResolver != null ? ClientTransactions.beginAsync(ch, 
options, observableTimestamp, channelResolver)
+                : new CompletableFuture<>();
         tx = tx0;
 
-        return tx0;
+        return new IgniteBiTuple<>(tx0, channelResolver == null);
     }
 
     /**
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index 2d9d2d33b96..7cb9fb508d4 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.tx;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ViewUtils.sync;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
@@ -135,9 +136,7 @@ public class ClientTransaction implements Transaction {
         this.timeout = timeout;
 
         if (cpm != null) {
-            // If mapping is known, assign commit partition.
-            // However, we don't require direct connection to a commit 
partition primary replica here because where is a guarantee that
-            // commit partition will be assigned to provided value at the txn 
beginning.
+            // if commit partition is known, we can attempt to do direct 
mappings in this transaction.
             this.commitTableId = cpm.tableId();
             this.commitPartition = cpm.partition();
         } else {
@@ -220,18 +219,22 @@ public class ClientTransaction implements Transaction {
             enlistPartitionLock.writeLock().unlock();
         }
 
-        CompletableFuture<Void> finishFut = 
ch.inflights().finishFuture(txId());
+        boolean enabled = 
ch.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS);
+        CompletableFuture<Void> finishFut = enabled ? 
ch.inflights().finishFuture(txId()) : nullCompletedFuture();
 
         CompletableFuture<Void> mainFinishFut = finishFut.thenCompose(ignored 
-> ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
             w.out().packLong(id);
-            if (!isReadOnly && 
w.clientChannel().protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS)) {
-                w.out().packLong(tracker.get().longValue());
+
+            if (!isReadOnly && enabled) {
                 w.out().packInt(enlisted.size());
-                for (Entry<TablePartitionId, 
CompletableFuture<IgniteBiTuple<String, Long>>> entry : enlisted.entrySet()) {
-                    w.out().packInt(entry.getKey().tableId());
-                    w.out().packInt(entry.getKey().partitionId());
-                    w.out().packString(entry.getValue().getNow(null).get1());
-                    w.out().packLong(entry.getValue().getNow(null).get2());
+                if (!enlisted.isEmpty()) {
+                    for (Entry<TablePartitionId, 
CompletableFuture<IgniteBiTuple<String, Long>>> entry : enlisted.entrySet()) {
+                        w.out().packInt(entry.getKey().tableId());
+                        w.out().packInt(entry.getKey().partitionId());
+                        
w.out().packString(entry.getValue().getNow(null).get1());
+                        w.out().packLong(entry.getValue().getNow(null).get2());
+                    }
+                    w.out().packLong(tracker.get().longValue());
                 }
             }
         }, r -> null));
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
index 1b5fbdd3032..7b8ea671b17 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
@@ -17,14 +17,14 @@
 
 package org.apache.ignite.internal.client.tx;
 
-import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
-import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.client.tx.ClientTransaction.EMPTY;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ViewUtils.sync;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.client.PartitionMapping;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.PayloadInputChannel;
 import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -40,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class ClientTransactions implements IgniteTransactions {
     /** 0 timeout is used as a flag to use the configured timeout. */
-    private static final int USE_CONFIGURED_TIMEOUT_DEFAULT = 0;
+    public static final int USE_CONFIGURED_TIMEOUT_DEFAULT = 0;
 
     /** Channel. */
     private final ReliableChannel ch;
@@ -63,32 +63,38 @@ public class ClientTransactions implements 
IgniteTransactions {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Transaction> beginAsync(@Nullable 
TransactionOptions options) {
-        return CompletableFuture.completedFuture(new 
ClientLazyTransaction(ch.observableTimestamp(), options));
+        return completedFuture(new 
ClientLazyTransaction(ch.observableTimestamp(), options));
     }
 
+    /**
+     * Begins the transaction on any node.
+     *
+     * @param ch Reliable channel.
+     * @param options The options.
+     * @param observableTimestamp The timestamp.
+     * @param channelResolver Client channel resolver.
+     *
+     * @return The future.
+     */
     static CompletableFuture<ClientTransaction> beginAsync(
             ReliableChannel ch,
-            @Nullable PartitionMapping pm,
             @Nullable TransactionOptions options,
-            HybridTimestampTracker observableTimestamp
+            HybridTimestampTracker observableTimestamp,
+            Supplier<CompletableFuture<ClientChannel>> channelResolver
     ) {
         boolean readOnly = options != null && options.readOnly();
         long timeout = options == null ? USE_CONFIGURED_TIMEOUT_DEFAULT : 
options.timeoutMillis();
 
         return ch.serviceAsync(
                 ClientOp.TX_BEGIN,
+                ch0 -> nullCompletedFuture(),
                 w -> {
                     w.out().packBoolean(readOnly);
                     w.out().packLong(timeout);
                     w.out().packLong(observableTimestamp.get().longValue());
-                    if (!readOnly && 
w.clientChannel().protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS)) {
-                        w.out().packInt(pm == null ? -1 : pm.tableId());
-                        w.out().packInt(pm == null ? -1 : pm.partition());
-                    }
                 },
-                r -> readTx(r, readOnly, pm, observableTimestamp, timeout),
-                pm == null ? null : pm.nodeConsistentId(),
-                null,
+                r -> readTx(r, readOnly, timeout),
+                channelResolver,
                 null,
                 false);
     }
@@ -96,19 +102,12 @@ public class ClientTransactions implements 
IgniteTransactions {
     private static ClientTransaction readTx(
             PayloadInputChannel r,
             boolean isReadOnly,
-            @Nullable PartitionMapping pm,
-            HybridTimestampTracker tracker,
             long timeout
     ) {
         ClientMessageUnpacker in = r.in();
 
         long id = in.unpackLong();
-        if (isReadOnly || 
!r.clientChannel().protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS)) {
-            return new ClientTransaction(r.clientChannel(), id, isReadOnly, 
EMPTY, null, EMPTY, null, timeout);
-        } else {
-            UUID txId = in.unpackUuid();
-            UUID coordId = in.unpackUuid();
-            return new ClientTransaction(r.clientChannel(), id, isReadOnly, 
txId, pm, coordId, tracker, timeout);
-        }
+
+        return new ClientTransaction(r.clientChannel(), id, isReadOnly, EMPTY, 
null, EMPTY, null, timeout);
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index ede65c51059..2aa1124553c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -138,7 +138,7 @@ public class ConnectionTest extends AbstractClientTest {
         FakeIgnite ignite = new FakeIgnite(nodeName);
 
         try (TestServer testServer =
-                new TestServer(0, ignite, null, null, nodeName, 
UUID.randomUUID(), null, null, null, false)) {
+                new TestServer(0, ignite, null, null, nodeName, 
UUID.randomUUID(), null, null, null, false, null)) {
 
             Builder clientBuilder = IgniteClient.builder()
                     .addresses("127.0.0.1:" + testServer.port())
@@ -166,7 +166,7 @@ public class ConnectionTest extends AbstractClientTest {
         FakeIgnite ignite = new FakeIgnite(nodeName);
 
         try (TestServer testServer =
-                new TestServer(0, ignite, null, null, nodeName, 
UUID.randomUUID(), null, null, null, false)) {
+                new TestServer(0, ignite, null, null, nodeName, 
UUID.randomUUID(), null, null, null, false, null)) {
 
             Builder clientBuilder = IgniteClient.builder()
                     .addresses("127.0.0.1:" + testServer.port())
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
new file mode 100644
index 00000000000..dcb27b1d457
--- /dev/null
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.client.ClientChannel;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.TcpIgniteClient;
+import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that after handshake client and servers mutual features are in 
expected state.
+ */
+public class FeatureCompatibilityTest extends BaseIgniteAbstractTest {
+    private TestServer testServer;
+
+    private FakeIgnite ignite;
+
+    private IgniteClient client;
+
+    private void startServer(@Nullable BitSet features) {
+        TestHybridClock clock = new TestHybridClock(System::currentTimeMillis);
+
+        ignite = new FakeIgnite("server-1");
+        testServer = new TestServer(0, ignite, reqId -> false, null, 
"server-1", UUID.randomUUID(), null, null, clock, true, features);
+
+        client = IgniteClient.builder().addresses("127.0.0.1:" + 
testServer.port()).build();
+    }
+
+    private void stopServer() throws Exception {
+        closeAll(client, testServer);
+    }
+
+    @Test
+    public void testDirectMappingEnabled() throws Exception {
+        startServer(null);
+
+        try {
+            ReliableChannel ch = ((TcpIgniteClient) client).channel();
+
+            ClientChannel ch0 = ch.getChannelAsync(null).join();
+
+            
assertTrue(ch0.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, 
TX_DELAYED_ACKS, TX_PIGGYBACK));
+        } finally {
+            stopServer();
+        }
+    }
+
+    @Test
+    public void testDirectMappingDisabled() throws Exception {
+        BitSet features = new BitSet(ProtocolBitmaskFeature.values().length);
+        features.set(TX_DIRECT_MAPPING.featureId());
+        startServer(features);
+
+        try {
+            ReliableChannel ch = ((TcpIgniteClient) client).channel();
+
+            ClientChannel ch0 = ch.getChannelAsync(null).join();
+
+            
assertFalse(ch0.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING));
+            
assertFalse(ch0.protocolContext().isFeatureSupported(TX_DELAYED_ACKS));
+            
assertFalse(ch0.protocolContext().isFeatureSupported(TX_PIGGYBACK));
+        } finally {
+            stopServer();
+        }
+    }
+}
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
index 09275b81a49..2b8ee6d2953 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -51,7 +51,7 @@ public class ObservableTimestampPropagationTest extends 
BaseIgniteAbstractTest {
         TestHybridClock clock = new 
TestHybridClock(currentServerTimestamp::get);
 
         ignite = new FakeIgnite("server-2");
-        testServer = new TestServer(0, ignite, null, null, "server-2", 
UUID.randomUUID(), null, null, clock, true);
+        testServer = new TestServer(0, ignite, null, null, "server-2", 
UUID.randomUUID(), null, null, clock, true, null);
 
         client = IgniteClient.builder().addresses("127.0.0.1:" + 
testServer.port()).build();
     }
@@ -74,27 +74,27 @@ public class ObservableTimestampPropagationTest extends 
BaseIgniteAbstractTest {
 
         // RW TX does not propagate timestamp.
         var rwTx = client.transactions().begin();
-        ClientLazyTransaction.ensureStarted(rwTx, ch, null).join();
+        ClientLazyTransaction.ensureStarted(rwTx, ch).get1().join();
 
         // RO TX propagates timestamp.
         var roTx = client.transactions().begin(roOpts);
-        ClientLazyTransaction.ensureStarted(roTx, ch, null).join();
+        ClientLazyTransaction.ensureStarted(roTx, ch).get1().join();
         assertEquals(1, lastObservableTimestamp(ch));
 
         // Increase timestamp on server - client does not know about it 
initially.
         currentServerTimestamp.set(11);
-        
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch, 
null).join();
+        
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), 
ch).get1().join();
         assertEquals(1, lastObservableTimestamp(ch));
 
         // Subsequent RO TX propagates latest known timestamp.
         client.tables().tables();
-        
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch, 
null).join();
+        
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), 
ch).get1().join();
         assertEquals(11, lastObservableTimestamp(ch));
 
         // Smaller timestamp from server is ignored by client.
         currentServerTimestamp.set(9);
-        
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch, 
null).join();
-        
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch, 
null).join();
+        
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), 
ch).get1().join();
+        
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), 
ch).get1().join();
         assertEquals(11, lastObservableTimestamp(ch));
     }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 86743153c48..e2f6d670106 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -104,7 +104,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest 
{
 
         try (var client = getClient(plc)) {
             Transaction tx = client.transactions().begin();
-            ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient) 
client).channel(), null).join();
+            ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient) 
client).channel()).get1().join();
 
             assertThrows(IgniteClientConnectionException.class, tx::commit);
             assertEquals(0, plc.invocations.size());
@@ -167,7 +167,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest 
{
         try (var client = getClient(plc)) {
             RecordView<Tuple> recView = 
client.tables().table("t").recordView();
             Transaction tx = client.transactions().begin();
-            ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient) 
client).channel(), null).join();
+            ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient) 
client).channel()).get1().join();
 
             var ex = assertThrows(IgniteException.class, () -> recView.get(tx, 
Tuple.create().set("id", 1)));
             assertThat(ex.getMessage(), containsString("Transaction context 
has been lost due to connection errors."));
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index 36036ce631d..55df8413910 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -110,6 +110,9 @@ public class TestClientHandlerModule implements 
IgniteComponent {
     /** Configuration of the client connector. */
     private final ClientConnectorConfiguration clientConnectorConfiguration;
 
+    /** Features set. */
+    private final BitSet features;
+
     /**
      * Constructor.
      *
@@ -124,6 +127,7 @@ public class TestClientHandlerModule implements 
IgniteComponent {
      * @param clock Clock.
      * @param placementDriver Placement driver.
      * @param clientConnectorConfiguration Configuration of the client 
connector.
+     * @param features Features.
      */
     public TestClientHandlerModule(
             Ignite ignite,
@@ -136,7 +140,8 @@ public class TestClientHandlerModule implements 
IgniteComponent {
             AuthenticationManager authenticationManager,
             HybridClock clock,
             PlacementDriver placementDriver,
-            ClientConnectorConfiguration clientConnectorConfiguration
+            ClientConnectorConfiguration clientConnectorConfiguration,
+            @Nullable BitSet features
     ) {
         assert ignite != null;
         assert bootstrapFactory != null;
@@ -153,6 +158,7 @@ public class TestClientHandlerModule implements 
IgniteComponent {
         this.clock = clock;
         this.placementDriver = placementDriver;
         this.clientConnectorConfiguration = clientConnectorConfiguration;
+        this.features = features;
     }
 
     /** {@inheritDoc} */
@@ -212,10 +218,20 @@ public class TestClientHandlerModule implements 
IgniteComponent {
 
         ServerBootstrap bootstrap = bootstrapFactory.createServerBootstrap();
 
-        BitSet features = BitSet.valueOf(new 
long[]{ThreadLocalRandom.current().nextLong()});
-        features.set(ProtocolBitmaskFeature.TX_DIRECT_MAPPING.featureId());
-        features.set(ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB.featureId());
-        features.set(ProtocolBitmaskFeature.TX_DELAYED_ACKS.featureId());
+        BitSet features;
+
+        if (this.features == null) {
+            features = BitSet.valueOf(new 
long[]{ThreadLocalRandom.current().nextLong()});
+            features.set(ProtocolBitmaskFeature.TX_DIRECT_MAPPING.featureId());
+            
features.set(ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB.featureId());
+            features.set(ProtocolBitmaskFeature.TX_DELAYED_ACKS.featureId());
+            features.set(ProtocolBitmaskFeature.TX_PIGGYBACK.featureId());
+        } else {
+            features = new BitSet(ProtocolBitmaskFeature.values().length);
+            for (int i = this.features.nextSetBit(0); i != -1; i = 
this.features.nextSetBit(i + 1)) {
+                features.set(i);
+            }
+        }
 
         bootstrap.childHandler(new ChannelInitializer<>() {
                     @Override
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java 
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 34194725490..dfb66ff1e75 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
@@ -150,7 +151,8 @@ public class TestServer implements AutoCloseable {
                 securityConfiguration,
                 port,
                 null,
-                true
+                true,
+                null
         );
     }
 
@@ -170,7 +172,8 @@ public class TestServer implements AutoCloseable {
             @Nullable SecurityConfiguration securityConfiguration,
             @Nullable Integer port,
             @Nullable HybridClock clock,
-            boolean enableRequestHandling
+            boolean enableRequestHandling,
+            @Nullable BitSet features
     ) {
         ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
 
@@ -262,7 +265,8 @@ public class TestServer implements AutoCloseable {
                         authenticationManager,
                         clock,
                         ignite.placementDriver(),
-                        clientConnectorConfiguration)
+                        clientConnectorConfiguration,
+                        features)
                 : new ClientHandlerModule(
                         ignite.queryEngine(),
                         (IgniteTablesInternal) ignite.tables(),
diff --git 
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
 
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
index 6816ef2c891..3cf582e5785 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client;
 import static java.util.UUID.randomUUID;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.hlc.HybridTimestampTracker.EMPTY_TS_PROVIDER;
+import static 
org.apache.ignite.internal.hlc.HybridTimestampTracker.emptyTracker;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -220,7 +221,7 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
 
         tx.commit();
 
-        WriteContext wc = new WriteContext();
+        WriteContext wc = new WriteContext(emptyTracker());
         wc.pm = pm;
 
         try {
@@ -251,7 +252,7 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
 
         tx.rollback();
 
-        WriteContext wc = new WriteContext();
+        WriteContext wc = new WriteContext(emptyTracker());
         wc.pm = pm;
 
         try {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index e470017f652..07497f463dd 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.runner.app.client;
 
+import static java.util.Collections.emptyList;
 import static java.util.Comparator.comparing;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
@@ -467,8 +468,9 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
     static List<Tuple> generateKeysForNode(int start, int count, 
Map<Partition, ClusterNode> map, ClusterNode clusterNode, Table table) {
         String clusterNodeName = clusterNode.name();
-        assert map.values().stream().anyMatch(x -> Objects.equals(x.name(), 
clusterNodeName)) :
-                "No partitions for node: " + clusterNodeName;
+        if (map.values().stream().noneMatch(x -> Objects.equals(x.name(), 
clusterNodeName))) {
+            return emptyList();
+        }
 
         List<Tuple> keys = new ArrayList<>();
         PartitionManager partitionManager = table.partitionManager();
@@ -755,6 +757,38 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         }
     }
 
+    @Test
+    void testExplicitReadOnlyTransaction() {
+        ClientTable table = (ClientTable) table();
+
+        KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
+
+        // Load partition map to ensure all entries are directly mapped.
+        Map<Partition, ClusterNode> map = 
table.partitionManager().primaryReplicasAsync().join();
+
+        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+        List<Tuple> tuples0 = generateKeysForNode(600, 20, map, 
server0.clusterService().topologyService().localMember(), table);
+        List<Tuple> tuples1 = generateKeysForNode(600, 20, map, 
server1.clusterService().topologyService().localMember(), table);
+
+        Tuple k1 = tuples0.get(0);
+        Tuple v1 = val(tuples0.get(0).intValue(0) + "");
+
+        Tuple k2 = tuples1.get(1);
+        Tuple v2 = val(tuples1.get(1).intValue(0) + "");
+
+        kvView.put(null, k1, v1);
+        kvView.put(null, k2, v2);
+
+        Transaction tx = client().transactions().begin(new 
TransactionOptions().readOnly(true));
+
+        assertTrue(Tuple.equals(v1, kvView.get(tx, k1)));
+        assertTrue(Tuple.equals(v2, kvView.get(tx, k2)));
+
+        tx.commit();
+    }
+
     private KeyValueView<Integer, String> kvView() {
         return table().keyValueView(Mapper.of(Integer.class), 
Mapper.of(String.class));
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
index a2c4d18c47f..fd9458925ba 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.partition.Partition;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -43,7 +42,6 @@ import org.junit.jupiter.api.Test;
  */
 public class ItThinClientTransactionsWithReplicasTest extends 
ItAbstractThinClientTest {
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-25431";)
     void testStaleMapping() {
         Map<Partition, ClusterNode> map = 
table().partitionManager().primaryReplicasAsync().orTimeout(9, 
TimeUnit.SECONDS).join();
 
@@ -57,6 +55,10 @@ public class ItThinClientTransactionsWithReplicasTest 
extends ItAbstractThinClie
         List<Tuple> tuples1 = generateKeysForNode(100, 1, map, 
server1.clusterService().topologyService().localMember(), table);
         List<Tuple> tuples2 = generateKeysForNode(100, 1, map, 
server2.clusterService().topologyService().localMember(), table);
 
+        if (tuples0.isEmpty() || tuples1.isEmpty() || tuples2.isEmpty()) {
+            return; // Skip the test if assignments are bad.
+        }
+
         Transaction tx0 = client().transactions().begin();
 
         KeyValueView<Tuple, Tuple> view = table.keyValueView();

Reply via email to