This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 372b1ba8428 IGNITE-27202 Fix an issue a wrong commit partition ID for 
directly mapped transactions. (#7105)
372b1ba8428 is described below

commit 372b1ba842873a1340523d168659d841f7c0255a
Author: Slava Koptilin <[email protected]>
AuthorDate: Fri Nov 28 17:35:44 2025 +0200

    IGNITE-27202 Fix an issue a wrong commit partition ID for directly mapped 
transactions. (#7105)
---
 .../handler/ClientInboundMessageHandler.java       |   2 +-
 .../requests/sql/ClientSqlExecuteBatchRequest.java |  14 ++-
 .../requests/sql/ClientSqlExecuteRequest.java      |  17 ++-
 .../sql/ClientSqlQueryMetadataRequest.java         |   8 +-
 .../handler/requests/table/ClientTableCommon.java  | 133 +++++++++++++++------
 .../requests/table/ClientTupleRequestBase.java     |  16 ++-
 .../requests/table/ClientTuplesRequestBase.java    |   8 +-
 .../apache/ignite/client/fakes/FakeTxManager.java  |  14 ++-
 .../replicator/PartitionReplicaListener.java       |   6 +
 .../replication/PartitionReplicaListenerTest.java  |  27 ++---
 .../org/apache/ignite/internal/tx/TxManager.java   |   4 +-
 .../tx/impl/RemoteReadWriteTransaction.java        |   8 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  12 +-
 13 files changed, 184 insertions(+), 85 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 3dc893edaf6..01dea112d25 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -964,7 +964,7 @@ public class ClientInboundMessageHandler
                 return ClientSqlExecuteRequest.process(
                         partitionOperationsExecutor, in, requestId, 
cancelHandles, queryProcessor, resources, metrics, tsTracker,
                         clientContext.hasFeature(SQL_PARTITION_AWARENESS), 
clientContext.hasFeature(SQL_DIRECT_TX_MAPPING), txManager,
-                        clockService, notificationSender(requestId), 
resolveCurrentUsername(),
+                        igniteTables, clockService, 
notificationSender(requestId), resolveCurrentUsername(),
                         clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT)
                 );
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index fd7429b568b..03c353061b0 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.client.handler.requests.sql;
 
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -65,7 +64,16 @@ public class ClientSqlExecuteBatchRequest {
         CancelHandle cancelHandle = CancelHandle.create();
         cancelHandleMap.put(requestId, cancelHandle);
 
-        InternalTransaction tx = readTx(in, tsTracker, resources, null, null, 
null);
+        CompletableFuture<InternalTransaction> txFut = readTx(
+                in,
+                tsTracker,
+                resources,
+                null,
+                null,
+                null,
+                null
+        );
+
         ClientSqlProperties props = new ClientSqlProperties(in, false);
         String statement = in.unpackString();
         BatchedArguments arguments = readArgs(in);
@@ -73,7 +81,7 @@ public class ClientSqlExecuteBatchRequest {
         HybridTimestamp clientTs = 
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
         tsTracker.update(clientTs);
 
-        return nullCompletedFuture().thenComposeAsync(none -> {
+        return txFut.thenComposeAsync(tx -> {
             return IgniteSqlImpl.executeBatchCore(
                             sql,
                             tsTracker,
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 418c9238d5e..42390807490 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
 import static org.apache.ignite.internal.util.CompletableFutures.allOf;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -50,6 +49,7 @@ import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.IgniteTables;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -92,6 +92,7 @@ public class ClientSqlExecuteRequest {
             boolean sqlPartitionAwarenessSupported,
             boolean sqlDirectTxMappingSupported,
             TxManager txManager,
+            IgniteTables tables,
             ClockService clockService,
             NotificationSender notificationSender,
             @Nullable String username,
@@ -105,7 +106,17 @@ public class ClientSqlExecuteRequest {
         }
 
         long[] resIdHolder = {0};
-        InternalTransaction tx = readTx(in, timestampTracker, resources, 
txManager, notificationSender, resIdHolder);
+
+        CompletableFuture<InternalTransaction> txFut = readTx(
+                in,
+                timestampTracker,
+                resources,
+                txManager,
+                tables,
+                notificationSender,
+                resIdHolder
+        );
+
         ClientSqlProperties props = new ClientSqlProperties(in, 
sqlMultistatementsSupported);
         String statement = in.unpackString();
         Object[] arguments = readArgsNotNull(in);
@@ -115,7 +126,7 @@ public class ClientSqlExecuteRequest {
 
         boolean includePartitionAwarenessMeta = sqlPartitionAwarenessSupported 
&& in.unpackBoolean();
 
-        return nullCompletedFuture().thenComposeAsync(none -> executeAsync(
+        return txFut.thenComposeAsync(tx -> executeAsync(
                 tx,
                 sql,
                 timestampTracker,
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
index 4437b70536b..49db22c22bb 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.client.handler.requests.sql;
 
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -30,6 +29,7 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.SqlProperties;
 import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
+import org.apache.ignite.internal.tx.InternalTransaction;
 
 /**
  * Client SQL request for the parameter metadata.
@@ -51,15 +51,15 @@ public class ClientSqlQueryMetadataRequest {
             ClientResourceRegistry resources,
             HybridTimestampTracker tsTracker
     ) {
-        var tx = readTx(in, tsTracker, resources, null, null, null);
+        CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, 
resources, null, null, null, null);
 
         String schema = in.unpackString();
         String query = in.unpackString();
 
         SqlProperties properties = new SqlProperties().defaultSchema(schema);
 
-        return nullCompletedFuture()
-                .thenComposeAsync(none -> 
processor.prepareSingleAsync(properties, tx, query)
+        return txFut
+                .thenComposeAsync(tx -> 
processor.prepareSingleAsync(properties, tx, query)
                 .thenApply(meta -> out -> writeMeta(out, meta)), 
operationExecutor);
     }
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 60e17fb31c4..c3301a7fb86 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.client.handler.requests.table;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE;
 import static 
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT;
 import static 
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
 
@@ -46,7 +48,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.schema.SchemaAware;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -172,7 +174,8 @@ public class ClientTableCommon {
     static void writeTuples(
             ClientMessagePacker packer,
             Collection<Tuple> tuples,
-            SchemaRegistry schemaRegistry) {
+            SchemaRegistry schemaRegistry
+    ) {
         writeTuples(packer, tuples, TuplePart.KEY_AND_VAL, schemaRegistry);
     }
 
@@ -247,7 +250,12 @@ public class ClientTableCommon {
     }
 
     public static CompletableFuture<Tuple> readTuple(
-            int schemaId, BitSet noValueSet, byte[] tupleBytes, 
TableViewInternal table, boolean keyOnly) {
+            int schemaId,
+            BitSet noValueSet,
+            byte[] tupleBytes,
+            TableViewInternal table,
+            boolean keyOnly
+    ) {
         return readSchema(schemaId, table).thenApply(schema -> 
readTuple(noValueSet, tupleBytes, keyOnly, schema));
     }
 
@@ -301,7 +309,7 @@ public class ClientTableCommon {
             // This method can return a table that is being stopped, but it's 
not a problem - any operation on such table will fail.
             TableViewInternal cachedTable = 
tablesInternal.cachedTable(tableId);
             if (cachedTable != null) {
-                return CompletableFuture.completedFuture(cachedTable);
+                return completedFuture(cachedTable);
             }
 
             return tablesInternal.tableAsync(tableId)
@@ -326,7 +334,11 @@ public class ClientTableCommon {
      * @param req Request.
      */
     static void writeTxMeta(
-            ClientMessagePacker out, HybridTimestampTracker tsTracker, 
@Nullable ClockService clockService, ClientTupleRequestBase req) {
+            ClientMessagePacker out,
+            HybridTimestampTracker tsTracker,
+            @Nullable ClockService clockService,
+            ClientTupleRequestBase req
+    ) {
         writeTxMeta(out, tsTracker, clockService, req.tx(), req.resourceId());
     }
 
@@ -339,7 +351,11 @@ public class ClientTableCommon {
      * @param req Request.
      */
     static void writeTxMeta(
-            ClientMessagePacker out, HybridTimestampTracker tsTracker, 
@Nullable ClockService clockService, ClientTuplesRequestBase req) {
+            ClientMessagePacker out,
+            HybridTimestampTracker tsTracker,
+            @Nullable ClockService clockService,
+            ClientTuplesRequestBase req
+    ) {
         writeTxMeta(out, tsTracker, clockService, req.tx(), req.resourceId());
     }
 
@@ -352,8 +368,13 @@ public class ClientTableCommon {
      * @param tx Transaction.
      * @param resourceId Resource id.
      */
-    public static void writeTxMeta(ClientMessagePacker out, 
HybridTimestampTracker tsTracker, @Nullable ClockService clockService,
-            InternalTransaction tx, long resourceId) {
+    public static void writeTxMeta(
+            ClientMessagePacker out,
+            HybridTimestampTracker tsTracker,
+            @Nullable ClockService clockService,
+            InternalTransaction tx,
+            long resourceId
+    ) {
         if (resourceId != 0) {
             // Resource id is assigned on a first request in direct mode.
             out.packLong(resourceId);
@@ -393,14 +414,25 @@ public class ClientTableCommon {
      * @param resourceIdHolder Resource id holder.
      * @return Transaction, if present, or null.
      */
-    public static @Nullable InternalTransaction readTx(
+    public static CompletableFuture<@Nullable InternalTransaction> readTx(
             ClientMessageUnpacker in,
             HybridTimestampTracker tsUpdater,
             ClientResourceRegistry resources,
             @Nullable TxManager txManager,
+            @Nullable IgniteTables tables,
             @Nullable NotificationSender notificationSender,
-            long[] resourceIdHolder) {
-        return readTx(in, tsUpdater, resources, txManager, notificationSender, 
resourceIdHolder, EnumSet.noneOf(RequestOptions.class));
+            long[] resourceIdHolder
+    ) {
+        return readTx(
+                in,
+                tsUpdater,
+                resources,
+                txManager,
+                tables,
+                notificationSender,
+                resourceIdHolder,
+                EnumSet.noneOf(RequestOptions.class)
+        );
     }
 
     /**
@@ -415,21 +447,26 @@ public class ClientTableCommon {
      * @param options Request options. Defines how a request is processed.
      * @return Transaction, if present, or null.
      */
-    public static @Nullable InternalTransaction readTx(
+    public static CompletableFuture<@Nullable InternalTransaction> readTx(
             ClientMessageUnpacker in,
             HybridTimestampTracker tsUpdater,
             ClientResourceRegistry resources,
             @Nullable TxManager txManager,
+            @Nullable IgniteTables tables,
             @Nullable NotificationSender notificationSender,
             long[] resourceIdHolder,
-            EnumSet<RequestOptions> options) {
+            EnumSet<RequestOptions> options
+    ) {
         if (in.tryUnpackNil()) {
-            return null;
+            return nullCompletedFuture();
         }
 
         try {
             long id = in.unpackLong();
+
             if (id == TX_ID_FIRST_DIRECT) {
+                assert txManager != null : "Transaction manager must be 
specified to process directly mapped requests.";
+
                 // This is first mapping request, which piggybacks transaction 
creation.
                 long observableTs = in.unpackLong();
 
@@ -457,29 +494,43 @@ public class ClientTableCommon {
                 // Attach resource id only on first direct request.
                 resourceIdHolder[0] = resources.put(new ClientResource(tx, 
tx::rollbackAsync));
 
-                return tx;
+                return completedFuture(tx);
             } else if (id == TX_ID_DIRECT) {
+                assert txManager != null : "Transaction manager must be 
specified to process directly mapped requests.";
+                assert tables != null : "Tables manager must be specified to 
process directly mapped requests.";
+
                 // This is direct request mapping.
                 long token = in.unpackLong();
                 UUID txId = in.unpackUuid();
-                int commitTableId = in.unpackInt();
+
+                // TODO https://issues.apache.org/jira/browse/IGNITE-25017
+                // partition awareness feature should be reworked to use zone 
ids.
+                int tableId = in.unpackInt();
                 int commitPart = in.unpackInt();
+
                 UUID coord = in.unpackUuid();
                 long timeout = in.unpackLong();
 
-                InternalTransaction remote = txManager.beginRemote(txId, new 
TablePartitionId(commitTableId, commitPart),
-                        coord, token, timeout, err -> {
-                            // Will be called for write txns.
-                            notificationSender.sendNotification(w -> 
w.packUuid(txId), err, NULL_HYBRID_TIMESTAMP);
+                return readTableAsync(tableId, tables)
+                        .thenApply(table -> {
+                            InternalTransaction remote = 
txManager.beginRemote(txId, new ZonePartitionId(table.zoneId(), commitPart),
+                                    coord, token, timeout, err -> {
+                                            // Need clarification: why might 
this listener be null when processing SQL queries?
+                                            // See 
ClientSqlExecuteRequest#process
+                                            if (notificationSender != null) {
+                                                // Will be called for write 
txns.
+                                                
notificationSender.sendNotification(w -> w.packUuid(txId), err, 
NULL_HYBRID_TIMESTAMP);
+                                            }
+                                    });
+
+                            // Remote transaction will be synchronously rolled 
back if the timeout has exceeded.
+                            if (remote.isRolledBackWithTimeoutExceeded()) {
+                                throw new 
TransactionException(TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR,
+                                        "Transaction is already finished [tx=" 
+ remote + "].");
+                            }
+
+                            return remote;
                         });
-
-                // Remote transaction will be synchronously rolled back if the 
timeout has exceeded.
-                if (remote.isRolledBackWithTimeoutExceeded()) {
-                    throw new 
TransactionException(TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR,
-                            "Transaction is already finished [tx=" + remote + 
"].");
-                }
-
-                return remote;
             }
 
             var tx = resources.get(id).get(InternalTransaction.class);
@@ -490,29 +541,33 @@ public class ClientTableCommon {
                 tsUpdater.update(tx.readTimestamp()); // TODO 
https://issues.apache.org/jira/browse/IGNITE-24592
             }
 
-            return tx;
+            return completedFuture(tx);
         } catch (IgniteInternalCheckedException e) {
             throw new IgniteException(e.traceId(), e.code(), e.getMessage(), 
e);
         }
     }
 
-    static InternalTransaction readOrStartImplicitTx(
+    static CompletableFuture<InternalTransaction> readOrStartImplicitTx(
             ClientMessageUnpacker in,
             HybridTimestampTracker readTs,
             ClientResourceRegistry resources,
             TxManager txManager,
+            IgniteTables tables,
             EnumSet<RequestOptions> options,
             @Nullable NotificationSender notificationSender,
-            long[] resourceIdHolder) {
-        InternalTransaction tx = readTx(in, readTs, resources, txManager, 
notificationSender, resourceIdHolder, options);
-
-        if (tx == null) {
-            // Implicit transactions do not use an observation timestamp 
because RW never depends on it, and implicit RO is always direct.
-            // The direct transaction uses a current timestamp on the primary 
replica by definition.
-            tx = startImplicitTx(readTs, txManager, 
options.contains(RequestOptions.READ_ONLY));
-        }
+            long[] resourceIdHolder
+    ) {
+        return readTx(in, readTs, resources, txManager, tables, 
notificationSender, resourceIdHolder, options)
+                .thenApply(tx -> {
+                    if (tx == null) {
+                        // Implicit transactions do not use an observation 
timestamp because RW never depends on it,
+                        // and implicit RO is always direct.
+                        // The direct transaction uses a current timestamp on 
the primary replica by definition.
+                        tx = startImplicitTx(readTs, txManager, 
options.contains(RequestOptions.READ_ONLY));
+                    }
 
-        return tx;
+                    return tx;
+                });
     }
 
     /**
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
index 9787ef0a055..eefed432cd8 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
@@ -97,7 +97,16 @@ class ClientTupleRequestBase {
 
         long[] resIdHolder = {0};
 
-        InternalTransaction tx = readOrStartImplicitTx(in, tsTracker, 
resources, txManager, options, notificationSender, resIdHolder);
+        CompletableFuture<InternalTransaction> txFut = readOrStartImplicitTx(
+                in,
+                tsTracker,
+                resources,
+                txManager,
+                tables,
+                options,
+                notificationSender,
+                resIdHolder
+        );
 
         int schemaId = in.unpackInt();
 
@@ -107,7 +116,7 @@ class ClientTupleRequestBase {
         BitSet noValueSet2 = options.contains(READ_SECOND_TUPLE) ? 
in.unpackBitSet() : null;
         byte[] tupleBytes2 = options.contains(READ_SECOND_TUPLE) ? 
in.readBinary() : null;
 
-        return readTableAsync(tableId, tables)
+        return txFut.thenCompose(tx -> readTableAsync(tableId, tables)
                 .thenCompose(table -> ClientTableCommon.readSchema(schemaId, 
table)
                         .thenApply(schema -> {
                             var tuple = readTuple(noValueSet, tupleBytes, 
options.contains(KEY_ONLY), schema);
@@ -115,7 +124,6 @@ class ClientTupleRequestBase {
                                     ? readTuple(noValueSet2, tupleBytes2, 
options.contains(KEY_ONLY), schema) : null;
 
                             return new ClientTupleRequestBase(tx, table, 
tuple, tuple2, resIdHolder[0]);
-                        }));
-
+                        })));
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
index cf3ad63b583..c0dbcb6d0d6 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
@@ -81,8 +81,8 @@ class ClientTuplesRequestBase {
 
         long[] resIdHolder = {0};
 
-        InternalTransaction tx =
-                readOrStartImplicitTx(in, tsTracker, resources, txManager, 
options, notificationSender, resIdHolder);
+        CompletableFuture<InternalTransaction> txFut =
+                readOrStartImplicitTx(in, tsTracker, resources, txManager, 
tables, options, notificationSender, resIdHolder);
 
         int schemaId = in.unpackInt();
 
@@ -96,7 +96,7 @@ class ClientTuplesRequestBase {
             tupleBytes[i] = in.readBinary();
         }
 
-        return readTableAsync(tableId, tables)
+        return txFut.thenCompose(tx -> readTableAsync(tableId, tables)
                 .thenCompose(table -> ClientTableCommon.readSchema(schemaId, 
table)
                         .thenApply(schema -> {
                             var tuples = new ArrayList<Tuple>(count);
@@ -106,6 +106,6 @@ class ClientTuplesRequestBase {
                             }
 
                             return new ClientTuplesRequestBase(tx, table, 
tuples, resIdHolder[0]);
-                        }));
+                        })));
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 452fe0f817e..e538cc4a9eb 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.InternalTxOptions;
 import org.apache.ignite.internal.tx.LockManager;
@@ -103,7 +103,7 @@ public class FakeTxManager implements TxManager {
             }
 
             @Override
-            public TablePartitionId commitPartition() {
+            public ReplicationGroupId commitPartition() {
                 return null;
             }
 
@@ -289,8 +289,14 @@ public class FakeTxManager implements TxManager {
     }
 
     @Override
-    public InternalTransaction beginRemote(UUID txId, TablePartitionId 
commitPartId, UUID coord, long token, long timeout,
-            Consumer<Throwable> cb) {
+    public InternalTransaction beginRemote(
+            UUID txId,
+            ZonePartitionId commitPartId,
+            UUID coord,
+            long token,
+            long timeout,
+            Consumer<Throwable> cb
+    ) {
         return null;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 8832e28c220..5f25e9a307f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -2826,6 +2826,12 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
         assert commitPartitionId != null : "Commit partition is null [type=" + 
request.requestType() + ']';
 
+        // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this 
assert.
+        assert commitPartitionId instanceof ZonePartitionId : "Commit 
partition id must be zone aware ["
+                + "requestType=" + request.requestType()
+                + ", commitPartitionId=" + commitPartitionId
+                + ", class=" + commitPartitionId.getClass().getSimpleName() + 
']';
+
         switch (request.requestType()) {
             case RW_DELETE_EXACT: {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 459626831c6..2a8e878b8b5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -172,7 +172,6 @@ import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
 import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -440,7 +439,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .schemaId(1)
                 .primaryKeyIndexId(2)
                 .name(TABLE_NAME)
-                .zoneId(1)
+                .zoneId(ZONE_ID)
                 .newColumns(columns)
                 .primaryKeyColumns(IntList.of(0, 1))
                 .storageProfile(CatalogService.DEFAULT_STORAGE_PROFILE)
@@ -820,7 +819,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         var rowId = new RowId(PART_ID);
 
         pkStorage().put(testBinaryRow, rowId);
-        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
+        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, ZONE_ID, 
PART_ID);
         testMvPartitionStorage.commitWrite(rowId, clock.now(), txId);
 
         CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
@@ -839,7 +838,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         txState = COMMITTED;
 
         pkStorage().put(testBinaryRow, rowId);
-        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
+        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, ZONE_ID, 
PART_ID);
         txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, 
localNode.id(), commitPartitionId, clock.now(), null, null));
 
         CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
@@ -857,7 +856,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         var rowId = new RowId(PART_ID);
 
         pkStorage().put(testBinaryRow, rowId);
-        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
+        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, ZONE_ID, 
PART_ID);
         txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, 
localNode.id(), commitPartitionId, null, null, null));
 
         CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
@@ -876,7 +875,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         txState = ABORTED;
 
         pkStorage().put(testBinaryRow, rowId);
-        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
+        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, ZONE_ID, 
PART_ID);
         txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, 
localNode.id(), commitPartitionId, null, null, null));
 
         CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
@@ -900,7 +899,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                     new BinaryTupleBuilder(1).appendInt(indexedVal).build());
             BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
 
-            testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID, 
PART_ID);
+            testMvPartitionStorage.addWrite(rowId, storeRow, txId, ZONE_ID, 
PART_ID);
             sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue, 
rowId));
             testMvPartitionStorage.commitWrite(rowId, clock.now(), txId);
         });
@@ -1028,7 +1027,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                     new BinaryTupleBuilder(1).appendInt(indexedVal).build());
             BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
 
-            testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID, 
PART_ID);
+            testMvPartitionStorage.addWrite(rowId, storeRow, txId, ZONE_ID, 
PART_ID);
             sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue, 
rowId));
             testMvPartitionStorage.commitWrite(rowId, clock.now(), txId);
         });
@@ -1141,7 +1140,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                     new BinaryTupleBuilder(1).appendInt(indexedVal).build());
             BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
 
-            testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID, 
PART_ID);
+            testMvPartitionStorage.addWrite(rowId, storeRow, txId, ZONE_ID, 
PART_ID);
             hashIndexStorage.storage().put(new IndexRowImpl(indexedValue, 
rowId));
             testMvPartitionStorage.commitWrite(rowId, clock.now(), txId);
         });
@@ -1389,10 +1388,10 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         );
     }
 
-    private static TablePartitionIdMessage commitPartitionId() {
-        return REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage()
+    private static ZonePartitionIdMessage commitPartitionId() {
+        return REPLICA_MESSAGES_FACTORY.zonePartitionIdMessage()
                 .partitionId(PART_ID)
-                .tableId(TABLE_ID)
+                .zoneId(ZONE_ID)
                 .build();
     }
 
@@ -1566,7 +1565,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             // This is needed to check that this row will be skipped by RO tx 
and it will see the data anyway.
             // TODO https://issues.apache.org/jira/browse/IGNITE-18767 after 
this, the following check may be not needed.
             RowId emptyRowId = new RowId(PART_ID, new UUID(Long.MIN_VALUE, 
Long.MIN_VALUE));
-            testMvPartitionStorage.addWrite(emptyRowId, null, tx1, TABLE_ID, 
PART_ID);
+            testMvPartitionStorage.addWrite(emptyRowId, null, tx1, ZONE_ID, 
PART_ID);
 
             if (committed) {
                 testMvPartitionStorage.commitWrite(emptyRowId, clock.now(), 
tx1);
@@ -1701,7 +1700,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         );
 
         pkStorage().put(futureSchemaVersionRow, rowId);
-        testMvPartitionStorage.addWrite(rowId, futureSchemaVersionRow, 
futureSchemaVersionTxId, TABLE_ID, PART_ID);
+        testMvPartitionStorage.addWrite(rowId, futureSchemaVersionRow, 
futureSchemaVersionTxId, ZONE_ID, PART_ID);
         sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue, 
rowId));
         testMvPartitionStorage.commitWrite(rowId, clock.now(), 
futureSchemaVersionTxId);
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index bc56cf8e29b..34e1192081e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
 import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
 import org.jetbrains.annotations.Nullable;
@@ -117,7 +117,7 @@ public interface TxManager extends IgniteComponent {
      *
      * @return Remote transaction.
      */
-    InternalTransaction beginRemote(UUID txId, TablePartitionId commitPartId, 
UUID coord, long token, long timeout, Consumer<Throwable> cb);
+    InternalTransaction beginRemote(UUID txId, ZonePartitionId commitPartId, 
UUID coord, long token, long timeout, Consumer<Throwable> cb);
 
     /**
      * Returns a transaction state meta.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemoteReadWriteTransaction.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemoteReadWriteTransaction.java
index 2247e8ab3ad..c72e58b282f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemoteReadWriteTransaction.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemoteReadWriteTransaction.java
@@ -25,7 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -42,7 +42,7 @@ public abstract class RemoteReadWriteTransaction implements 
InternalTransaction
     private static final String EXCEPTION_MSG = "Remote transaction should 
never be finished directly";
 
     private final UUID txId;
-    private final TablePartitionId commitGroupId;
+    private final ZonePartitionId commitGroupId;
     private final long timeout;
     private final UUID coord;
     private final String localNodeConsistentId;
@@ -59,7 +59,7 @@ public abstract class RemoteReadWriteTransaction implements 
InternalTransaction
      * @param localNode Local node.
      * @param timeout The timeout.
      */
-    RemoteReadWriteTransaction(UUID txId, TablePartitionId commitGroupId, UUID 
coord, long token, InternalClusterNode localNode,
+    RemoteReadWriteTransaction(UUID txId, ZonePartitionId commitGroupId, UUID 
coord, long token, InternalClusterNode localNode,
             long timeout) {
         this.txId = txId;
         this.commitGroupId = commitGroupId;
@@ -110,7 +110,7 @@ public abstract class RemoteReadWriteTransaction implements 
InternalTransaction
     }
 
     @Override
-    public TablePartitionId commitPartition() {
+    public ZonePartitionId commitPartition() {
         return commitGroupId;
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 85253160aa1..83231bc39a0 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -952,9 +952,15 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
     }
 
     @Override
-    public InternalTransaction beginRemote(UUID txId, TablePartitionId 
commitPartId, UUID coord, long token, long timeout,
-            Consumer<Throwable> cb) {
-        assert commitPartId.tableId() > 0 && commitPartId.partitionId() >= 0 : 
"Illegal condition for direct mapping: " + commitPartId;
+    public InternalTransaction beginRemote(
+            UUID txId,
+            ZonePartitionId commitPartId,
+            UUID coord,
+            long token,
+            long timeout,
+            Consumer<Throwable> cb
+    ) {
+        assert commitPartId.zoneId() >= 0 && commitPartId.partitionId() >= 0 : 
"Illegal condition for direct mapping: " + commitPartId;
 
         // Switch to default timeout if needed.
         timeout = timeout == USE_CONFIGURED_TIMEOUT_DEFAULT ? 
txConfig.readWriteTimeoutMillis().value() : timeout;


Reply via email to