This is an automated email from the ASF dual-hosted git repository.

ashapkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c3fcdcca26c IGNITE-28385 Fix missing enlistment in client commit 
request handler (#7899)
c3fcdcca26c is described below

commit c3fcdcca26cc3dc4304bd5c99e70f981e1da27b7
Author: Cyrill <[email protected]>
AuthorDate: Wed Apr 1 14:47:08 2026 +0300

    IGNITE-28385 Fix missing enlistment in client commit request handler (#7899)
    
    Co-authored-by: Kirill Sizov <[email protected]>
---
 .../tx/ClientTransactionCommitRequest.java         | 10 ++++-
 .../apache/ignite/client/fakes/FakeTxManager.java  |  8 +++-
 .../handlers/TxFinishReplicaRequestHandler.java    | 11 ++++--
 .../handlers/TxRecoveryMessageHandler.java         |  4 +-
 .../replication/PartitionReplicaListenerTest.java  |  2 +-
 .../ZonePartitionReplicaListenerTest.java          |  2 +-
 .../org/apache/ignite/internal/tx/TxManager.java   | 10 ++++-
 .../internal/tx/impl/TxCleanupRequestSender.java   | 14 +++++--
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 10 ++++-
 .../ignite/internal/tx/impl/TxRecoveryEngine.java  | 44 ++++++++++++++++++----
 10 files changed, 92 insertions(+), 23 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
index 88896e09cc5..df377d133ae 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
@@ -172,8 +172,11 @@ public class ClientTransactionCommitRequest {
         if (existing == null) {
             tx.enlist(replicationGroupId, table.tableId(), consistentId, 
token);
         } else {
+            boolean tokenMatch = existing.consistencyToken() == token;
+            existing.addTableId(table.tableId());
+
             // Enlistment tokens should be equal on commit.
-            return !commit || existing.consistencyToken() == token;
+            return !commit || tokenMatch;
         }
 
         return true;
@@ -207,5 +210,10 @@ public class ClientTransactionCommitRequest {
         long token() {
             return token;
         }
+
+        @Override
+        public String toString() {
+            return "(tableId=" + tableId + ", partId=" + partitionId + ", 
node=" + consistentId + ")";
+        }
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 78f78cb2f2c..77348c22403 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -272,7 +272,13 @@ public class FakeTxManager implements TxManager {
     }
 
     @Override
-    public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, 
String node, UUID txId) {
+    public CompletableFuture<Void> cleanup(
+            ZonePartitionId commitPartitionId,
+            String node,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
         return nullCompletedFuture();
     }
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
index 9049f871372..0366339f2d9 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
@@ -224,10 +224,13 @@ public class TxFinishReplicaRequestHandler {
                 .map(entry -> new EnlistedPartitionGroup(entry.getKey(), 
entry.getValue().tableIds()))
                 .collect(toList());
         return finishTransaction(enlistedPartitionGroups, txId, commit, 
commitTimestamp)
-                .thenCompose(txResult ->
-                    txManager.cleanup(replicationGroupId, enlistedPartitions, 
commit, commitTimestamp, txId)
-                            .thenApply(v -> txResult)
-                );
+                .thenCompose(txResult -> {
+                    boolean actualCommit = txResult.transactionState() == 
COMMITTED;
+                    HybridTimestamp actualCommitTs = 
txResult.commitTimestamp();
+
+                    return txManager.cleanup(replicationGroupId, 
enlistedPartitions, actualCommit, actualCommitTs, txId)
+                            .thenApply(v -> txResult);
+                });
     }
 
     private static void 
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult, 
TransactionResult txResult) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
index fce8c788686..351dfd4cc61 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.partition.replicator.handlers;
 
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 
 import java.util.UUID;
@@ -73,7 +74,8 @@ public class TxRecoveryMessageHandler {
         // Check whether a transaction has already been finished.
         if (txMeta != null && isFinalState(txMeta.txState())) {
             // Tx recovery message is processed on the commit partition.
-            return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId, 
senderId);
+            boolean commit = txMeta.txState() == COMMITTED;
+            return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId, 
senderId, commit, txMeta.commitTimestamp());
         }
 
         LOG.info(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 0afb63126c6..340c94ed517 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -2050,7 +2050,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         doAnswer(invocation -> nullCompletedFuture())
                 .when(txManager).finish(any(), any(), anyBoolean(), any(), 
anyBoolean(), anyBoolean(), any(), any());
         doAnswer(invocation -> nullCompletedFuture())
-                .when(txManager).cleanup(any(), anyString(), any());
+                .when(txManager).cleanup(any(), anyString(), any(), 
anyBoolean(), any());
     }
 
     private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType 
requestType, RwListenerInvocation listenerInvocation) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index b346873ab17..a23d964bb65 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -991,7 +991,7 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
         doAnswer(invocation -> nullCompletedFuture())
                 .when(txManager).finish(any(), any(), anyBoolean(), any(), 
anyBoolean(), anyBoolean(), any(), any());
         doAnswer(invocation -> nullCompletedFuture())
-                .when(txManager).cleanup(any(), anyString(), any());
+                .when(txManager).cleanup(any(), anyString(), any(), 
anyBoolean(), any());
     }
 
     private void upsertInNewTxFor(TestKey key) {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index c113153d880..83280fdc372 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -263,9 +263,17 @@ public interface TxManager extends IgniteComponent {
      * @param commitPartitionId Commit partition id.
      * @param node Target node.
      * @param txId Transaction id.
+     * @param commit Whether the transaction was committed.
+     * @param commitTimestamp Commit timestamp, if committed.
      * @return Completable future of Void.
      */
-    CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, String 
node, UUID txId);
+    CompletableFuture<Void> cleanup(
+            ZonePartitionId commitPartitionId,
+            String node,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    );
 
     /**
      * Locally vacuums no longer needed transactional resources, like txnState 
both persistent and volatile.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index fa17ba72e74..95ef78dcc41 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -188,15 +188,23 @@ public class TxCleanupRequestSender {
     }
 
     /**
-     * Sends unlock request to the nodes than initiated recovery.
+     * Sends cleanup request to the node that initiated recovery.
      *
      * @param commitPartitionId Commit partition id.
      * @param node Target node.
      * @param txId Transaction id.
+     * @param commit Whether the transaction was committed.
+     * @param commitTimestamp Commit timestamp, if committed.
      * @return Completable future of Void.
      */
-    public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, 
String node, UUID txId) {
-        return sendCleanupMessageWithRetries(commitPartitionId, false, null, 
txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
+    public CompletableFuture<Void> cleanup(
+            ZonePartitionId commitPartitionId,
+            String node,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        return sendCleanupMessageWithRetries(commitPartitionId, commit, 
commitTimestamp, txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
     }
 
     /**
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index c19c80b3131..595aef5b33e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -1163,8 +1163,14 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
     }
 
     @Override
-    public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, 
String node, UUID txId) {
-        return txCleanupRequestSender.cleanup(commitPartitionId, node, txId);
+    public CompletableFuture<Void> cleanup(
+            ZonePartitionId commitPartitionId,
+            String node,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        return txCleanupRequestSender.cleanup(commitPartitionId, node, txId, 
commit, commitTimestamp);
     }
 
     @Override
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
index 70f9ab8bc21..787a933b1ac 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
@@ -22,6 +22,7 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static 
org.apache.ignite.internal.tx.TxStateMetaFinishing.castToFinishing;
@@ -33,6 +34,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.network.InternalClusterNode;
@@ -127,10 +129,20 @@ public class TxRecoveryEngine {
                 })
                 .thenCompose(Function.identity())
                 .whenComplete((v, ex) -> {
-                    runCleanupOnNode(commitPartitionId, txId, 
commitPartitionNode);
+                    // Cleanup must use the actual resolved tx state; using 
commit=false unconditionally
+                    // would corrupt data when the tx is actually COMMITTED 
(abort cleanup races with
+                    // commit cleanup, producing a mix of committed and 
aborted rows).
+                    boolean commit = v != null && v.txState() == COMMITTED;
+                    @Nullable HybridTimestamp commitTs = v != null ? 
v.commitTimestamp() : null;
+
+                    runCleanupOnNode(commitPartitionId, txId, 
commitPartitionNode, commit, commitTs);
 
                     if (senderGroupId != null && senderId != null) {
-                        runCleanupOnNode(senderGroupId, txId, senderId);
+                        String senderConsistentId = 
clusterNodeResolver.getConsistentIdById(senderId);
+
+                        if (senderConsistentId != null) {
+                            runCleanupOnNode(senderGroupId, txId, 
senderConsistentId, commit, commitTs);
+                        }
                     }
                 });
     }
@@ -163,13 +175,21 @@ public class TxRecoveryEngine {
      *
      * @param groupId Group id.
      * @param txId Transaction id.
-     * @param nodeId Node id (inconsistent).
+     * @param nodeId Node id (ephemeral).
+     * @param commit Whether the transaction was committed.
+     * @param commitTimestamp Commit timestamp, if committed.
      */
-    public CompletableFuture<Void> runCleanupOnNode(ZonePartitionId groupId, 
UUID txId, UUID nodeId) {
-        // Get node id of the sender to send back cleanup requests.
+    public CompletableFuture<Void> runCleanupOnNode(
+            ZonePartitionId groupId,
+            UUID txId,
+            UUID nodeId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
         String nodeConsistentId = 
clusterNodeResolver.getConsistentIdById(nodeId);
 
-        return nodeConsistentId == null ? nullCompletedFuture() : 
runCleanupOnNode(groupId, txId, nodeConsistentId);
+        return nodeConsistentId == null ? nullCompletedFuture()
+                : runCleanupOnNode(groupId, txId, nodeConsistentId, commit, 
commitTimestamp);
     }
 
     /**
@@ -178,8 +198,16 @@ public class TxRecoveryEngine {
      * @param commitPartitionId Commit partition id.
      * @param txId Transaction id.
      * @param nodeName Node consistent id.
+     * @param commit Whether the transaction was committed.
+     * @param commitTimestamp Commit timestamp, if committed.
      */
-    private CompletableFuture<Void> runCleanupOnNode(ZonePartitionId 
commitPartitionId, UUID txId, String nodeName) {
-        return txManager.cleanup(commitPartitionId, nodeName, txId);
+    private CompletableFuture<Void> runCleanupOnNode(
+            ZonePartitionId commitPartitionId,
+            UUID txId,
+            String nodeName,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        return txManager.cleanup(commitPartitionId, nodeName, txId, commit, 
commitTimestamp);
     }
 }

Reply via email to