This is an automated email from the ASF dual-hosted git repository.
ashapkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c3fcdcca26c IGNITE-28385 Fix missing enlistment in client commit
request handler (#7899)
c3fcdcca26c is described below
commit c3fcdcca26cc3dc4304bd5c99e70f981e1da27b7
Author: Cyrill <[email protected]>
AuthorDate: Wed Apr 1 14:47:08 2026 +0300
IGNITE-28385 Fix missing enlistment in client commit request handler (#7899)
Co-authored-by: Kirill Sizov <[email protected]>
---
.../tx/ClientTransactionCommitRequest.java | 10 ++++-
.../apache/ignite/client/fakes/FakeTxManager.java | 8 +++-
.../handlers/TxFinishReplicaRequestHandler.java | 11 ++++--
.../handlers/TxRecoveryMessageHandler.java | 4 +-
.../replication/PartitionReplicaListenerTest.java | 2 +-
.../ZonePartitionReplicaListenerTest.java | 2 +-
.../org/apache/ignite/internal/tx/TxManager.java | 10 ++++-
.../internal/tx/impl/TxCleanupRequestSender.java | 14 +++++--
.../ignite/internal/tx/impl/TxManagerImpl.java | 10 ++++-
.../ignite/internal/tx/impl/TxRecoveryEngine.java | 44 ++++++++++++++++++----
10 files changed, 92 insertions(+), 23 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
index 88896e09cc5..df377d133ae 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
@@ -172,8 +172,11 @@ public class ClientTransactionCommitRequest {
if (existing == null) {
tx.enlist(replicationGroupId, table.tableId(), consistentId,
token);
} else {
+ boolean tokenMatch = existing.consistencyToken() == token;
+ existing.addTableId(table.tableId());
+
// Enlistment tokens should be equal on commit.
- return !commit || existing.consistencyToken() == token;
+ return !commit || tokenMatch;
}
return true;
@@ -207,5 +210,10 @@ public class ClientTransactionCommitRequest {
long token() {
return token;
}
+
+ @Override
+ public String toString() {
+ return "(tableId=" + tableId + ", partId=" + partitionId + ",
node=" + consistentId + ")";
+ }
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 78f78cb2f2c..77348c22403 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -272,7 +272,13 @@ public class FakeTxManager implements TxManager {
}
@Override
- public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId,
String node, UUID txId) {
+ public CompletableFuture<Void> cleanup(
+ ZonePartitionId commitPartitionId,
+ String node,
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
return nullCompletedFuture();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
index 9049f871372..0366339f2d9 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
@@ -224,10 +224,13 @@ public class TxFinishReplicaRequestHandler {
.map(entry -> new EnlistedPartitionGroup(entry.getKey(),
entry.getValue().tableIds()))
.collect(toList());
return finishTransaction(enlistedPartitionGroups, txId, commit,
commitTimestamp)
- .thenCompose(txResult ->
- txManager.cleanup(replicationGroupId, enlistedPartitions,
commit, commitTimestamp, txId)
- .thenApply(v -> txResult)
- );
+ .thenCompose(txResult -> {
+ boolean actualCommit = txResult.transactionState() ==
COMMITTED;
+ HybridTimestamp actualCommitTs =
txResult.commitTimestamp();
+
+ return txManager.cleanup(replicationGroupId,
enlistedPartitions, actualCommit, actualCommitTs, txId)
+ .thenApply(v -> txResult);
+ });
}
private static void
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult,
TransactionResult txResult) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
index fce8c788686..351dfd4cc61 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.partition.replicator.handlers;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import java.util.UUID;
@@ -73,7 +74,8 @@ public class TxRecoveryMessageHandler {
// Check whether a transaction has already been finished.
if (txMeta != null && isFinalState(txMeta.txState())) {
// Tx recovery message is processed on the commit partition.
- return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId,
senderId);
+ boolean commit = txMeta.txState() == COMMITTED;
+ return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId,
senderId, commit, txMeta.commitTimestamp());
}
LOG.info(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 0afb63126c6..340c94ed517 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -2050,7 +2050,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).finish(any(), any(), anyBoolean(), any(),
anyBoolean(), anyBoolean(), any(), any());
doAnswer(invocation -> nullCompletedFuture())
- .when(txManager).cleanup(any(), anyString(), any());
+ .when(txManager).cleanup(any(), anyString(), any(),
anyBoolean(), any());
}
private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType
requestType, RwListenerInvocation listenerInvocation) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index b346873ab17..a23d964bb65 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -991,7 +991,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).finish(any(), any(), anyBoolean(), any(),
anyBoolean(), anyBoolean(), any(), any());
doAnswer(invocation -> nullCompletedFuture())
- .when(txManager).cleanup(any(), anyString(), any());
+ .when(txManager).cleanup(any(), anyString(), any(),
anyBoolean(), any());
}
private void upsertInNewTxFor(TestKey key) {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index c113153d880..83280fdc372 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -263,9 +263,17 @@ public interface TxManager extends IgniteComponent {
* @param commitPartitionId Commit partition id.
* @param node Target node.
* @param txId Transaction id.
+ * @param commit Whether the transaction was committed.
+ * @param commitTimestamp Commit timestamp, if committed.
* @return Completable future of Void.
*/
- CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, String
node, UUID txId);
+ CompletableFuture<Void> cleanup(
+ ZonePartitionId commitPartitionId,
+ String node,
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp
+ );
/**
* Locally vacuums no longer needed transactional resources, like txnState
both persistent and volatile.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index fa17ba72e74..95ef78dcc41 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -188,15 +188,23 @@ public class TxCleanupRequestSender {
}
/**
- * Sends unlock request to the nodes than initiated recovery.
+ * Sends cleanup request to the node that initiated recovery.
*
* @param commitPartitionId Commit partition id.
* @param node Target node.
* @param txId Transaction id.
+ * @param commit Whether the transaction was committed.
+ * @param commitTimestamp Commit timestamp, if committed.
* @return Completable future of Void.
*/
- public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId,
String node, UUID txId) {
- return sendCleanupMessageWithRetries(commitPartitionId, false, null,
txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
+ public CompletableFuture<Void> cleanup(
+ ZonePartitionId commitPartitionId,
+ String node,
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
+ return sendCleanupMessageWithRetries(commitPartitionId, commit,
commitTimestamp, txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
}
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index c19c80b3131..595aef5b33e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -1163,8 +1163,14 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
}
@Override
- public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId,
String node, UUID txId) {
- return txCleanupRequestSender.cleanup(commitPartitionId, node, txId);
+ public CompletableFuture<Void> cleanup(
+ ZonePartitionId commitPartitionId,
+ String node,
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
+ return txCleanupRequestSender.cleanup(commitPartitionId, node, txId,
commit, commitTimestamp);
}
@Override
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
index 70f9ab8bc21..787a933b1ac 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
@@ -22,6 +22,7 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static
org.apache.ignite.internal.tx.TxStateMetaFinishing.castToFinishing;
@@ -33,6 +34,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.InternalClusterNode;
@@ -127,10 +129,20 @@ public class TxRecoveryEngine {
})
.thenCompose(Function.identity())
.whenComplete((v, ex) -> {
- runCleanupOnNode(commitPartitionId, txId,
commitPartitionNode);
+ // Cleanup must use the actual resolved tx state; using
commit=false unconditionally
+ // would corrupt data when the tx is actually COMMITTED
(abort cleanup races with
+ // commit cleanup, producing a mix of committed and
aborted rows).
+ boolean commit = v != null && v.txState() == COMMITTED;
+ @Nullable HybridTimestamp commitTs = v != null ?
v.commitTimestamp() : null;
+
+ runCleanupOnNode(commitPartitionId, txId,
commitPartitionNode, commit, commitTs);
if (senderGroupId != null && senderId != null) {
- runCleanupOnNode(senderGroupId, txId, senderId);
+ String senderConsistentId =
clusterNodeResolver.getConsistentIdById(senderId);
+
+ if (senderConsistentId != null) {
+ runCleanupOnNode(senderGroupId, txId,
senderConsistentId, commit, commitTs);
+ }
}
});
}
@@ -163,13 +175,21 @@ public class TxRecoveryEngine {
*
* @param groupId Group id.
* @param txId Transaction id.
- * @param nodeId Node id (inconsistent).
+ * @param nodeId Node id (ephemeral).
+ * @param commit Whether the transaction was committed.
+ * @param commitTimestamp Commit timestamp, if committed.
*/
- public CompletableFuture<Void> runCleanupOnNode(ZonePartitionId groupId,
UUID txId, UUID nodeId) {
- // Get node id of the sender to send back cleanup requests.
+ public CompletableFuture<Void> runCleanupOnNode(
+ ZonePartitionId groupId,
+ UUID txId,
+ UUID nodeId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
String nodeConsistentId =
clusterNodeResolver.getConsistentIdById(nodeId);
- return nodeConsistentId == null ? nullCompletedFuture() :
runCleanupOnNode(groupId, txId, nodeConsistentId);
+ return nodeConsistentId == null ? nullCompletedFuture()
+ : runCleanupOnNode(groupId, txId, nodeConsistentId, commit,
commitTimestamp);
}
/**
@@ -178,8 +198,16 @@ public class TxRecoveryEngine {
* @param commitPartitionId Commit partition id.
* @param txId Transaction id.
* @param nodeName Node consistent id.
+ * @param commit Whether the transaction was committed.
+ * @param commitTimestamp Commit timestamp, if committed.
*/
- private CompletableFuture<Void> runCleanupOnNode(ZonePartitionId
commitPartitionId, UUID txId, String nodeName) {
- return txManager.cleanup(commitPartitionId, nodeName, txId);
+ private CompletableFuture<Void> runCleanupOnNode(
+ ZonePartitionId commitPartitionId,
+ UUID txId,
+ String nodeName,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
+ return txManager.cleanup(commitPartitionId, nodeName, txId, commit,
commitTimestamp);
}
}