This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9cd0e5017f IGNITE-21070 Ensure that data node's primary replica 
expiration properly handled (#3027)
9cd0e5017f is described below

commit 9cd0e5017fac6d54da5f6c9c802e8ce686267668
Author: Cyrill <cyrill.si...@gmail.com>
AuthorDate: Fri Jan 12 18:18:42 2024 +0300

    IGNITE-21070 Ensure that data node's primary replica expiration properly 
handled (#3027)
---
 .../ItSchemaForwardCompatibilityTest.java          |   2 +-
 .../internal/table/ItTransactionRecoveryTest.java  |  24 +-
 .../IncompatibleSchemaAbortException.java          |  31 ---
 .../replicator/PartitionReplicaListener.java       |  22 +-
 .../replication/PartitionReplicaListenerTest.java  |  12 +-
 .../ignite/internal/tx/impl/OrphanDetector.java    |  85 +++---
 .../internal/tx/impl/TxCleanupRequestHandler.java  |  16 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  42 ++-
 .../tx/impl/WriteIntentSwitchProcessor.java        |  14 +-
 .../apache/ignite/internal/tx/TxCleanupTest.java   | 198 ++++++++++++++
 .../apache/ignite/internal/tx/TxManagerTest.java   | 122 ++++++++-
 .../internal/tx/impl/OrphanDetectorTest.java       | 290 +++++++++++++++++++++
 12 files changed, 704 insertions(+), 154 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java
index 9ed055436d..1d7782a67d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java
@@ -112,7 +112,7 @@ class ItSchemaForwardCompatibilityTest extends 
ClusterPerTestIntegrationTest {
                 ))
         );
 
-        assertThat(ex.code(), is(Transactions.TX_COMMIT_ERR));
+        assertThat(ex.code(), is(Transactions.TX_UNEXPECTED_STATE_ERR));
 
         assertThat(tx.state(), is(TxState.ABORTED));
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index b2bc84c043..a9c1f993cb 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -350,7 +350,8 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
 
-        AtomicInteger msgCount = new AtomicInteger();
+        AtomicInteger stateMsgCount = new AtomicInteger();
+        AtomicInteger recoveryMsgCount = new AtomicInteger();
 
         IgniteImpl roCoordNode = node(0);
 
@@ -360,13 +361,17 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         commitPartNode.dropMessages((nodeName, msg) -> {
             if (msg instanceof TxStateCommitPartitionRequest) {
-                msgCount.incrementAndGet();
+                stateMsgCount.incrementAndGet();
 
                 assertEquals(TxState.ABANDONED, 
txVolatileState(commitPartNode, orphanTxId));
 
                 txMsgCaptureFut.complete(((TxStateCommitPartitionRequest) 
msg).txId());
             }
 
+            if (msg instanceof TxRecoveryMessage) {
+                recoveryMsgCount.incrementAndGet();
+            }
+
             return false;
         });
 
@@ -374,21 +379,19 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
 
         RecordView view = roCoordNode.tables().table(TABLE_NAME).recordView();
 
-        try {
-            view.getAsync(recoveryTxReadOnly, Tuple.create().set("key", 42));
-        } catch (Exception e) {
-            assertEquals(Transactions.ACQUIRE_LOCK_ERR, extractCodeFrom(e));
-
-            log.info("Expected lock conflict.", e);
-        }
+        view.getAsync(recoveryTxReadOnly, Tuple.create().set("key", 42));
 
         assertThat(txMsgCaptureFut, willCompleteSuccessfully());
 
         runConflictingTransaction(commitPartNode, 
commitPartNode.transactions().begin());
 
-        assertEquals(1, msgCount.get());
+        assertEquals(1, stateMsgCount.get());
+
+        assertEquals(0, recoveryMsgCount.get());
 
         assertTrue(waitForCondition(() -> txStoredState(commitPartNode, 
orphanTxId) == TxState.ABORTED, 10_000));
+
+        assertTrue(waitForCondition(() -> txStoredMeta(commitPartNode, 
orphanTxId).locksReleased(), 10_000));
     }
 
     /**
@@ -557,7 +560,6 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
         assertInstanceOf(TransactionAlreadyFinishedException.class, 
ExceptionUtils.unwrapCause(errorResponse.throwable()));
 
         assertEquals(TxState.ABORTED, txStoredState(commitPartNode, 
orphanTx.id()));
-
     }
 
     @Test
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaAbortException.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaAbortException.java
deleted file mode 100644
index 4855ee00bf..0000000000
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaAbortException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.replicator;
-
-import org.apache.ignite.lang.ErrorGroups.Transactions;
-import org.apache.ignite.tx.TransactionException;
-
-/**
- * Thrown when, during an attempt to commit a transaction, it turns out that 
the transaction cannot be committed
- * because an incompatible schema change has happened.
- */
-public class IncompatibleSchemaAbortException extends TransactionException {
-    public IncompatibleSchemaAbortException(String message) {
-        super(Transactions.TX_COMMIT_ERR, message);
-    }
-}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index a003dbc413..979726da73 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -533,7 +533,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId));
         }
 
-        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+        LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", 
txId, txMeta);
 
         return triggerTxRecovery(txId, senderId);
     }
@@ -1569,7 +1569,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     txId,
                                     txCoordinatorId
                             ).thenApply(txResult -> {
-                                
throwIfSchemaValidationOnCommitFailed(validationResult);
+                                
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
                                 return txResult;
                             }));
         } else {
@@ -1578,18 +1578,22 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         }
     }
 
-    private static void 
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult) {
+    private static void 
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult, 
TransactionResult txResult) {
         if (!validationResult.isSuccessful()) {
             if (validationResult.isTableDropped()) {
                 // TODO: IGNITE-20966 - improve error message.
-                throw new IncompatibleSchemaAbortException(
-                        format("Commit failed because a table was already 
dropped [tableId={}]", validationResult.failedTableId())
+                throw new TransactionAlreadyFinishedException(
+                        format("Commit failed because a table was already 
dropped [tableId={}]", validationResult.failedTableId()),
+                        txResult
                 );
             } else {
                 // TODO: IGNITE-20966 - improve error message.
-                throw new IncompatibleSchemaAbortException("Commit failed 
because schema "
-                        + validationResult.fromSchemaVersion() + " is not 
forward-compatible with "
-                        + validationResult.toSchemaVersion() + " for table " + 
validationResult.failedTableId());
+                throw new TransactionAlreadyFinishedException(
+                        "Commit failed because schema "
+                                + validationResult.fromSchemaVersion() + " is 
not forward-compatible with "
+                                + validationResult.toSchemaVersion() + " for 
table " + validationResult.failedTableId(),
+                        txResult
+                );
             }
         }
     }
@@ -1676,7 +1680,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             @Nullable HybridTimestamp commitTimestamp,
             String txCoordinatorId
     ) {
-        assert !commit || (commitTimestamp != null);
+        assert !(commit && commitTimestamp == null) : "Cannot commit without 
the timestamp.";
 
         HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : 
hybridClock.now();
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 58c24ba8a9..5806bc9d80 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -144,7 +144,6 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
-import 
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
 import 
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
 import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
@@ -159,6 +158,7 @@ import 
org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TransactionAlreadyFinishedException;
 import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TransactionResult;
@@ -1624,9 +1624,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         CompletableFuture<?> future = beginAndCommitTx();
 
-        IncompatibleSchemaAbortException ex = assertWillThrowFast(future,
-                IncompatibleSchemaAbortException.class);
-        assertThat(ex.code(), is(Transactions.TX_COMMIT_ERR));
+        TransactionAlreadyFinishedException ex = assertWillThrowFast(future,
+                TransactionAlreadyFinishedException.class);
+
         assertThat(ex.getMessage(), containsString("Commit failed because 
schema 1 is not forward-compatible with 2"));
 
         assertThat(committed.get(), is(false));
@@ -2323,8 +2323,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 localNode.id()
         );
 
-        IncompatibleSchemaAbortException ex = assertWillThrowFast(future, 
IncompatibleSchemaAbortException.class);
-        assertThat(ex.code(), is(Transactions.TX_COMMIT_ERR));
+        TransactionAlreadyFinishedException ex = assertWillThrowFast(future, 
TransactionAlreadyFinishedException.class);
+
         assertThat(ex.getMessage(), is("Commit failed because a table was 
already dropped [tableId=" + tableToBeDroppedId + "]"));
 
         assertThat("The transaction must have been aborted", committed.get(), 
is(false));
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
index 3fe9a03c89..5032e70747 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -30,12 +30,10 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.internal.event.EventListener;
-import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.TxStateMetaAbandoned;
@@ -59,8 +57,6 @@ public class OrphanDetector {
     /** Tx messages factory. */
     private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
 
-    private static final long AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC = 10;
-
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -71,7 +67,7 @@ public class OrphanDetector {
     private final ReplicaService replicaService;
 
     /** Placement driver. */
-    private final PlacementDriver placementDriver;
+    private final PlacementDriverHelper placementDriverHelper;
 
     /** Lock manager. */
     private final LockManager lockManager;
@@ -79,9 +75,6 @@ public class OrphanDetector {
     /** Lock conflict events listener. */
     private final EventListener<LockEventParameters> lockConflictListener = 
this::lockConflictListener;
 
-    /** Hybrid clock. */
-    private final HybridClock clock;
-
     /**
      * The time interval in milliseconds in which the orphan resolution sends 
the recovery message again, in case the transaction is still
      * not finalized.
@@ -96,22 +89,19 @@ public class OrphanDetector {
      *
      * @param topologyService Topology service.
      * @param replicaService Replica service.
-     * @param placementDriver Placement driver.
+     * @param placementDriverHelper Placement driver helper.
      * @param lockManager Lock manager.
-     * @param clock Clock.
      */
     public OrphanDetector(
             TopologyService topologyService,
             ReplicaService replicaService,
-            PlacementDriver placementDriver,
-            LockManager lockManager,
-            HybridClock clock
+            PlacementDriverHelper placementDriverHelper,
+            LockManager lockManager
     ) {
         this.topologyService = topologyService;
         this.replicaService = replicaService;
-        this.placementDriver = placementDriver;
+        this.placementDriverHelper = placementDriverHelper;
         this.lockManager = lockManager;
-        this.clock = clock;
     }
 
     /**
@@ -179,7 +169,7 @@ public class OrphanDetector {
                     txState.txCoordinatorId()
             );
 
-            sentTxRecoveryMessage(txState.commitPartitionId(), txId);
+            sendTxRecoveryMessage(txState.commitPartitionId(), txId);
         }
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-21153
@@ -193,37 +183,33 @@ public class OrphanDetector {
      * @param cmpPartGrp Replication group of commit partition.
      * @param txId Transaction id.
      */
-    private void sentTxRecoveryMessage(ReplicationGroupId cmpPartGrp, UUID 
txId) {
-        placementDriver.awaitPrimaryReplica(
-                cmpPartGrp,
-                clock.now(),
-                AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC,
-                SECONDS
-        ).thenCompose(replicaMeta -> {
-            ClusterNode commitPartPrimaryNode = 
topologyService.getByConsistentId(replicaMeta.getLeaseholder());
-
-            if (commitPartPrimaryNode == null) {
-                LOG.warn(
-                        "The primary replica of the commit partition is not 
available [commitPartGrp={}, tx={}]",
-                        cmpPartGrp,
-                        txId
-                );
-
-                return nullCompletedFuture();
-            }
-
-            return replicaService.invoke(commitPartPrimaryNode, 
FACTORY.txRecoveryMessage()
-                    .groupId(cmpPartGrp)
-                    
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
-                    .txId(txId)
-                    .build());
-        }).exceptionally(throwable -> {
-            if (throwable != null) {
-                LOG.warn("A recovery message for the transaction was handled 
with the error [tx={}].", throwable, txId);
-            }
-
-            return null;
-        });
+    private void sendTxRecoveryMessage(TablePartitionId cmpPartGrp, UUID txId) 
{
+        
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(cmpPartGrp)
+                .thenCompose(replicaMeta -> {
+                    ClusterNode commitPartPrimaryNode = 
topologyService.getByConsistentId(replicaMeta.getLeaseholder());
+
+                    if (commitPartPrimaryNode == null) {
+                        LOG.warn(
+                                "The primary replica of the commit partition 
is not available [commitPartGrp={}, tx={}]",
+                                cmpPartGrp,
+                                txId
+                        );
+
+                        return nullCompletedFuture();
+                    }
+
+                    return replicaService.invoke(commitPartPrimaryNode, 
FACTORY.txRecoveryMessage()
+                            .groupId(cmpPartGrp)
+                            
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
+                            .txId(txId)
+                            .build());
+                }).exceptionally(throwable -> {
+                    if (throwable != null) {
+                        LOG.warn("A recovery message for the transaction was 
handled with the error [tx={}].", throwable, txId);
+                    }
+
+                    return null;
+                });
     }
 
     /**
@@ -270,6 +256,7 @@ public class OrphanDetector {
     private boolean isRecoveryNeeded(TxStateMeta txState) {
         return txState != null
                 && !isFinalState(txState.txState())
+                && txState.txState() != FINISHING
                 && !isTxAbandonedRecently(txState);
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index e79ed8ab4f..eaab109445 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.message.TxCleanupMessage;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
-import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 
@@ -42,8 +42,8 @@ public class TxCleanupRequestHandler {
     /** Tx messages factory. */
     private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
 
-    /** Cluster service. */
-    private final ClusterService clusterService;
+    /** Messaging service. */
+    private final MessagingService messagingService;
 
     /** Lock manager. */
     private final LockManager lockManager;
@@ -57,18 +57,18 @@ public class TxCleanupRequestHandler {
     /**
      * The constructor.
      *
-     * @param clusterService Cluster service.
+     * @param messagingService Messaging service.
      * @param lockManager Lock manager.
      * @param clock A hybrid logical clock.
      * @param writeIntentSwitchProcessor A cleanup processor.
      */
     public TxCleanupRequestHandler(
-            ClusterService clusterService,
+            MessagingService messagingService,
             LockManager lockManager,
             HybridClock clock,
             WriteIntentSwitchProcessor writeIntentSwitchProcessor
     ) {
-        this.clusterService = clusterService;
+        this.messagingService = messagingService;
         this.lockManager = lockManager;
         this.hybridClock = clock;
         this.writeIntentSwitchProcessor = writeIntentSwitchProcessor;
@@ -78,7 +78,7 @@ public class TxCleanupRequestHandler {
      * Starts the processor.
      */
     public void start() {
-        
clusterService.messagingService().addMessageHandler(TxMessageGroup.class, (msg, 
sender, correlationId) -> {
+        messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, 
correlationId) -> {
             if (msg instanceof TxCleanupMessage) {
                 processTxCleanup((TxCleanupMessage) msg, sender, 
correlationId);
             }
@@ -133,7 +133,7 @@ public class TxCleanupRequestHandler {
                         });
                     }
 
-                    clusterService.messagingService().respond(senderId, msg, 
correlationId);
+                    messagingService.respond(senderId, msg, correlationId);
                 });
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index f02d0dce80..cd8c65eb67 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -24,7 +24,6 @@ import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
-import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -86,8 +85,10 @@ import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.TopologyService;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -146,12 +147,15 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /** Cluster service. */
-    private final ClusterService clusterService;
-
     /** Detector of transactions that lost the coordinator. */
     private final OrphanDetector orphanDetector;
 
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Cluster service. */
+    private final MessagingService messagingService;
+
     /** Local node network identity. This id is available only after the 
network has started. */
     private String localNodeId;
 
@@ -196,9 +200,10 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         this.lockManager = lockManager;
         this.clock = clock;
         this.transactionIdGenerator = transactionIdGenerator;
-        this.clusterService = clusterService;
         this.placementDriver = placementDriver;
         this.idleSafeTimePropagationPeriodMsSupplier = 
idleSafeTimePropagationPeriodMsSupplier;
+        this.topologyService = clusterService.topologyService();
+        this.messagingService = clusterService.messagingService();
 
         placementDriverHelper = new PlacementDriverHelper(placementDriver, 
clock);
 
@@ -212,14 +217,14 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 new LinkedBlockingQueue<>(),
                 new NamedThreadFactory("tx-async-cleanup", LOG));
 
-        orphanDetector = new OrphanDetector(clusterService.topologyService(), 
replicaService, placementDriver, lockManager, clock);
+        orphanDetector = new OrphanDetector(topologyService, replicaService, 
placementDriverHelper, lockManager);
 
-        txMessageSender = new 
TxMessageSender(clusterService.messagingService(), replicaService, clock);
+        txMessageSender = new TxMessageSender(messagingService, 
replicaService, clock);
 
         WriteIntentSwitchProcessor writeIntentSwitchProcessor =
-                new WriteIntentSwitchProcessor(placementDriverHelper, 
txMessageSender, clusterService);
+                new WriteIntentSwitchProcessor(placementDriverHelper, 
txMessageSender, topologyService);
 
-        txCleanupRequestHandler = new TxCleanupRequestHandler(clusterService, 
lockManager, clock, writeIntentSwitchProcessor);
+        txCleanupRequestHandler = new 
TxCleanupRequestHandler(messagingService, lockManager, clock, 
writeIntentSwitchProcessor);
 
         txCleanupRequestSender = new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, writeIntentSwitchProcessor);
     }
@@ -293,23 +298,13 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     }
 
     @Override
-    public TxStateMeta stateMeta(UUID txId) {
+    public @Nullable TxStateMeta stateMeta(UUID txId) {
         return inBusyLock(busyLock, () -> txStateVolatileStorage.state(txId));
     }
 
     @Override
     public @Nullable <T extends TxStateMeta> T updateTxMeta(UUID txId, 
Function<TxStateMeta, TxStateMeta> updater) {
-        return txStateVolatileStorage.updateMeta(txId, oldMeta -> {
-            TxStateMeta newMeta = updater.apply(oldMeta);
-
-            if (newMeta == null) {
-                return null;
-            }
-
-            TxState oldState = oldMeta == null ? null : oldMeta.txState();
-
-            return checkTransitionCorrectness(oldState, newMeta.txState()) ? 
newMeta : oldMeta;
-        });
+        return txStateVolatileStorage.updateMeta(txId, updater);
     }
 
     @Override
@@ -590,8 +585,9 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
     @Override
     public CompletableFuture<Void> start() {
-        localNodeId = clusterService.topologyService().localMember().id();
-        
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
this);
+        localNodeId = topologyService.localMember().id();
+
+        messagingService.addMessageHandler(ReplicaMessageGroup.class, this);
 
         txStateVolatileStorage.start();
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
index eccd5d058d..82b7905c1c 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.util.CompletableFutures;
-import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.TopologyService;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -42,24 +42,24 @@ public class WriteIntentSwitchProcessor {
 
     private final TxMessageSender txMessageSender;
 
-    /** Cluster service. */
-    private final ClusterService clusterService;
+    /** Topology service. */
+    private final TopologyService topologyService;
 
     /**
      * The constructor.
      *
      * @param placementDriverHelper Placement driver helper.
      * @param txMessageSender Transaction message creator.
-     * @param clusterService Cluster service.
+     * @param topologyService Topology service.
      */
     public WriteIntentSwitchProcessor(
             PlacementDriverHelper placementDriverHelper,
             TxMessageSender txMessageSender,
-            ClusterService clusterService
+            TopologyService topologyService
     ) {
         this.placementDriverHelper = placementDriverHelper;
         this.txMessageSender = txMessageSender;
-        this.clusterService = clusterService;
+        this.topologyService = topologyService;
     }
 
     /**
@@ -71,7 +71,7 @@ public class WriteIntentSwitchProcessor {
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp
     ) {
-        String localNodeName = 
clusterService.topologyService().localMember().name();
+        String localNodeName = topologyService.localMember().name();
 
         return txMessageSender.switchWriteIntents(localNodeName, 
tablePartitionId, txId, commit, commitTimestamp);
     }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
new file mode 100644
index 0000000000..498d7c2d02
--- /dev/null
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
+import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TxCleanupRequestSender;
+import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.WriteIntentSwitchProcessor;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for a transaction cleanup.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TxCleanupTest extends IgniteAbstractTest {
+
+    private static final ClusterNode LOCAL_NODE =
+            new ClusterNodeImpl("local_id", "local", new 
NetworkAddress("127.0.0.1", 2024), null);
+
+    private static final ClusterNode REMOTE_NODE =
+            new ClusterNodeImpl("remote_id", "remote", new 
NetworkAddress("127.1.1.1", 2024), null);
+
+    @Mock(answer = RETURNS_DEEP_STUBS)
+    private MessagingService messagingService;
+
+    @Mock(answer = RETURNS_DEEP_STUBS)
+    private TopologyService topologyService;
+
+    @Mock(answer = RETURNS_DEEP_STUBS)
+    private ReplicaService replicaService;
+
+    @Mock
+    private PlacementDriver placementDriver;
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private TxCleanupRequestSender cleanupRequestSender;
+
+    private TransactionIdGenerator idGenerator;
+
+    private WriteIntentSwitchProcessor writeIntentSwitchProcessor;
+
+    private TxMessageSender txMessageSender;
+
+    /** Init test callback. */
+    @BeforeEach
+    public void setup() {
+        
when(topologyService.localMember().address()).thenReturn(LOCAL_NODE.address());
+
+        when(messagingService.invoke(anyString(), any(), 
anyLong())).thenReturn(nullCompletedFuture());
+
+        idGenerator = new TransactionIdGenerator(LOCAL_NODE.name().hashCode());
+
+        txMessageSender = spy(new TxMessageSender(messagingService, 
replicaService, clock));
+
+        PlacementDriverHelper placementDriverHelper = new 
PlacementDriverHelper(placementDriver, clock);
+
+        writeIntentSwitchProcessor = spy(new 
WriteIntentSwitchProcessor(placementDriverHelper, txMessageSender, 
topologyService));
+
+        cleanupRequestSender = new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, writeIntentSwitchProcessor);
+    }
+
+    @Test
+    void testPrimaryFoundForAllPartitions() {
+        TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
+        TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0);
+        TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0);
+
+        Set<TablePartitionId> partitions = Set.of(tablePartitionId1, 
tablePartitionId2, tablePartitionId3);
+
+        when(placementDriver.getPrimaryReplica(any(), any()))
+                .thenReturn(completedFuture(new 
TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE)));
+
+        HybridTimestamp beginTimestamp = clock.now();
+        UUID txId = idGenerator.transactionIdFor(beginTimestamp);
+
+        HybridTimestamp commitTimestamp = clock.now();
+
+        CompletableFuture<Void> cleanup = 
cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId);
+
+        assertThat(cleanup, willCompleteSuccessfully());
+
+        verifyNoInteractions(writeIntentSwitchProcessor);
+        verify(txMessageSender, times(1)).cleanup(any(), any(), any(), 
anyBoolean(), any());
+        verifyNoMoreInteractions(txMessageSender);
+    }
+
+    @Test
+    void testPrimaryNotFoundForSome() {
+        TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
+        TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0);
+        TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0);
+
+        Set<TablePartitionId> partitions = Set.of(tablePartitionId1, 
tablePartitionId2, tablePartitionId3);
+
+        when(placementDriver.getPrimaryReplica(any(), any()))
+                .thenReturn(completedFuture(new 
TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE)));
+        when(placementDriver.getPrimaryReplica(eq(tablePartitionId1), any()))
+                .thenReturn(nullCompletedFuture());
+
+        when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId1), any(), 
anyLong(), any()))
+                .thenReturn(completedFuture(new 
TestReplicaMetaImpl(REMOTE_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE)));
+
+        HybridTimestamp beginTimestamp = clock.now();
+        UUID txId = idGenerator.transactionIdFor(beginTimestamp);
+
+        HybridTimestamp commitTimestamp = clock.now();
+
+        CompletableFuture<Void> cleanup = 
cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId);
+
+        assertThat(cleanup, willCompleteSuccessfully());
+
+        verify(txMessageSender, times(1)).switchWriteIntents(any(), any(), 
any(), anyBoolean(), any());
+        verify(txMessageSender, times(1)).cleanup(any(), any(), any(), 
anyBoolean(), any());
+        verifyNoMoreInteractions(txMessageSender);
+    }
+
+    @Test
+    void testPrimaryNotFoundForAll() {
+        TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
+        TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0);
+        TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0);
+
+        Set<TablePartitionId> partitions = Set.of(tablePartitionId1, 
tablePartitionId2, tablePartitionId3);
+
+        when(placementDriver.getPrimaryReplica(any(), any()))
+                .thenReturn(nullCompletedFuture());
+
+        when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any()))
+                .thenReturn(completedFuture(new 
TestReplicaMetaImpl(REMOTE_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE)));
+
+        HybridTimestamp beginTimestamp = clock.now();
+        UUID txId = idGenerator.transactionIdFor(beginTimestamp);
+
+        HybridTimestamp commitTimestamp = clock.now();
+
+        CompletableFuture<Void> cleanup = 
cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId);
+
+        assertThat(cleanup, willCompleteSuccessfully());
+
+        verify(txMessageSender, times(3)).switchWriteIntents(any(), any(), 
any(), anyBoolean(), any());
+        verifyNoMoreInteractions(txMessageSender);
+    }
+}
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 22cad83558..1614346449 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.tx;
 
 import static java.lang.Math.abs;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
@@ -39,6 +40,8 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -57,6 +60,7 @@ import 
org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -95,10 +99,10 @@ public class TxManagerTest extends IgniteAbstractTest {
 
     private TxManager txManager;
 
-    @Mock
+    @Mock(answer = RETURNS_DEEP_STUBS)
     private ClusterService clusterService;
 
-    @Mock
+    @Mock(answer = RETURNS_DEEP_STUBS)
     private ReplicaService replicaService;
 
     private final HybridClock clock = spy(new HybridClockImpl());
@@ -112,12 +116,8 @@ public class TxManagerTest extends IgniteAbstractTest {
     /** Init test callback. */
     @BeforeEach
     public void setup() {
-        clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
-
         
when(clusterService.topologyService().localMember().address()).thenReturn(LOCAL_NODE.address());
 
-        replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
-
         when(replicaService.invoke(any(ClusterNode.class), 
any())).thenReturn(nullCompletedFuture());
 
         when(replicaService.invoke(anyString(), 
any())).thenReturn(nullCompletedFuture());
@@ -375,18 +375,123 @@ public class TxManagerTest extends IgniteAbstractTest {
         assertRollbackSucceeds();
     }
 
+    @Test
+    public void testPrimaryMissOnFirstCall() {
+        // First call to the commit partition primary fails with 
PrimaryReplicaMissException,
+        // then we retry and the second call succeeds.
+        when(placementDriver.getPrimaryReplica(any(), any()))
+                .thenReturn(
+                        completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), hybridTimestamp(10)))
+                );
+        when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any()))
+                .thenReturn(
+                        completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), hybridTimestamp(10))),
+                        completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(12), HybridTimestamp.MAX_VALUE))
+                );
+
+        HybridTimestamp commitTimestamp = hybridTimestamp(9);
+
+        when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
+                .thenReturn(
+                        failedFuture(new PrimaryReplicaMissException(
+                                LOCAL_NODE.name(),
+                                null,
+                                10L,
+                                null,
+                                null
+                        )),
+                        completedFuture(new 
TransactionResult(TxState.COMMITTED, commitTimestamp))
+                );
+
+        when(clock.now()).thenReturn(hybridTimestamp(5));
+        // Ensure that commit doesn't throw exceptions.
+        InternalTransaction committedTransaction = prepareTransaction();
+
+        when(clock.now()).thenReturn(commitTimestamp, hybridTimestamp(13));
+
+        committedTransaction.commit();
+        assertEquals(TxState.COMMITTED, 
txManager.stateMeta(committedTransaction.id()).txState());
+        assertEquals(hybridTimestamp(9), 
txManager.stateMeta(committedTransaction.id()).commitTimestamp());
+
+        // Ensure that rollback doesn't throw exceptions.
+        assertRollbackSucceeds();
+    }
+
     @Test
     public void testFinishExpiredWithNullPrimary() {
         // Null is returned as primaryReplica during finish phase.
         when(placementDriver.getPrimaryReplica(any(), 
any())).thenReturn(nullCompletedFuture());
         when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any())).thenReturn(completedFuture(
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
hybridTimestamp(10))));
+        when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
+                .thenReturn(completedFuture(new 
TransactionResult(TxState.ABORTED, null)));
 
         
assertCommitThrowsTransactionExceptionWithPrimaryReplicaExpiredExceptionAsCause();
 
         assertRollbackSucceeds();
     }
 
+    @Test
+    public void testExpiredExceptionDoesNotShadeResponseExceptions() {
+        // Null is returned as primaryReplica during finish phase.
+        when(placementDriver.getPrimaryReplica(any(), 
any())).thenReturn(nullCompletedFuture());
+        when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any())).thenReturn(completedFuture(
+                new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), 
hybridTimestamp(10))));
+        when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
+                .thenReturn(failedFuture(new 
TransactionAlreadyFinishedException(
+                        "TX already finished.",
+                        new TransactionResult(TxState.ABORTED, null)
+                )));
+
+        InternalTransaction committedTransaction = prepareTransaction();
+
+        assertThrowsWithCause(committedTransaction::commit, 
TransactionAlreadyFinishedException.class);
+
+        assertEquals(TxState.ABORTED, 
txManager.stateMeta(committedTransaction.id()).txState());
+
+        assertRollbackSucceeds();
+    }
+
+    @Test
+    public void testOnlyPrimaryExpirationAffectsTransaction() {
+        // Prepare transaction.
+        InternalTransaction tx = txManager.begin(hybridTimestampTracker);
+
+        ClusterNode node = mock(ClusterNode.class);
+
+        TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0);
+        tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L));
+        tx.assignCommitPartition(tablePartitionId1);
+
+        TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0);
+        tx.enlist(tablePartitionId2, new IgniteBiTuple<>(node, 1L));
+
+        when(placementDriver.getPrimaryReplica(eq(tablePartitionId1), any()))
+                .thenReturn(completedFuture(
+                        new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), HybridTimestamp.MAX_VALUE)));
+        when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId1), any(), 
anyLong(), any()))
+                .thenReturn(completedFuture(
+                        new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), HybridTimestamp.MAX_VALUE)));
+
+        
lenient().when(placementDriver.getPrimaryReplica(eq(tablePartitionId2), any()))
+                .thenReturn(nullCompletedFuture());
+        
lenient().when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId2), 
any(), anyLong(), any()))
+                .thenReturn(completedFuture(
+                        new TestReplicaMetaImpl(LOCAL_NODE, 
hybridTimestamp(1), hybridTimestamp(10))));
+
+        HybridTimestamp commitTimestamp = clock.now();
+        when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
+                .thenReturn(completedFuture(new 
TransactionResult(TxState.COMMITTED, commitTimestamp)));
+
+        // Ensure that commit doesn't throw exceptions.
+        InternalTransaction committedTransaction = prepareTransaction();
+        committedTransaction.commit();
+        assertEquals(TxState.COMMITTED, 
txManager.stateMeta(committedTransaction.id()).txState());
+
+        // Ensure that rollback doesn't throw exceptions.
+        assertRollbackSucceeds();
+    }
+
     @Test
     public void testFinishExpiredWithExpiredPrimary() {
         // Primary with expirationTimestamp less than commitTimestamp is 
returned.
@@ -419,6 +524,8 @@ public class TxManagerTest extends IgniteAbstractTest {
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), 
HybridTimestamp.MAX_VALUE)));
         when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any())).thenReturn(completedFuture(
                 new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), 
HybridTimestamp.MAX_VALUE)));
+        when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
+                .thenReturn(completedFuture(new 
TransactionResult(TxState.ABORTED, null)));
 
         
assertCommitThrowsTransactionExceptionWithPrimaryReplicaExpiredExceptionAsCause();
 
@@ -438,9 +545,6 @@ public class TxManagerTest extends IgniteAbstractTest {
     }
 
     private void 
assertCommitThrowsTransactionExceptionWithPrimaryReplicaExpiredExceptionAsCause()
 {
-        when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
-                .thenReturn(completedFuture(new 
TransactionResult(TxState.ABORTED, null)));
-
         InternalTransaction committedTransaction = prepareTransaction();
         Throwable throwable = 
assertThrowsWithCause(committedTransaction::commit, 
PrimaryReplicaExpiredException.class);
 
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
new file mode 100644
index 0000000000..13b799a1ba
--- /dev/null
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.Lock;
+import org.apache.ignite.internal.tx.LockException;
+import org.apache.ignite.internal.tx.LockKey;
+import org.apache.ignite.internal.tx.LockMode;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Test how OrphanDetector reacts on tx lock conflicts.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+public class OrphanDetectorTest extends BaseIgniteAbstractTest {
+
+    private static final ClusterNode LOCAL_NODE =
+            new ClusterNodeImpl("local_id", "local", new 
NetworkAddress("127.0.0.1", 2024), null);
+
+    private static final ClusterNode REMOTE_NODE =
+            new ClusterNodeImpl("remote_id", "remote", new 
NetworkAddress("127.1.1.1", 2024), null);
+
+    @Mock(answer = RETURNS_DEEP_STUBS)
+    private TopologyService topologyService;
+
+    @Mock(answer = RETURNS_DEEP_STUBS)
+    private ReplicaService replicaService;
+
+    @Mock
+    private PlacementDriver placementDriver;
+
+    private final HeapLockManager lockManager = new HeapLockManager();
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @InjectConfiguration
+    private TransactionConfiguration txConfiguration;
+
+    private VolatileTxStateMetaStorage txStateMetaStorage;
+
+    private TransactionIdGenerator idGenerator;
+
+    @BeforeEach
+    public void setup() {
+        idGenerator = new TransactionIdGenerator(LOCAL_NODE.name().hashCode());
+
+        PlacementDriverHelper placementDriverHelper = new 
PlacementDriverHelper(placementDriver, clock);
+
+        OrphanDetector orphanDetector = new OrphanDetector(topologyService, 
replicaService, placementDriverHelper, lockManager);
+
+        txStateMetaStorage = new VolatileTxStateMetaStorage();
+
+        txStateMetaStorage.start();
+
+        orphanDetector.start(txStateMetaStorage, 
txConfiguration.abandonedCheckTs());
+    }
+
+    @Test
+    void testNoTriggerNoState() {
+        UUID orphanTxId = idGenerator.transactionIdFor(clock.now());
+
+        RowId rowId = new RowId(0);
+
+        // Coordinator is dead.
+        when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null);
+
+        lockManager.acquire(orphanTxId, new LockKey(1, rowId), LockMode.X);
+
+        UUID concurrentTxId = idGenerator.transactionIdFor(clock.now());
+
+        // Should trigger lock conflict listener in OrphanDetector.
+        lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X);
+
+        TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId);
+
+        // OrphanDetector didn't change the state.
+        assertNull(orphanState);
+
+        verifyNoInteractions(replicaService);
+    }
+
+    @Test
+    void testNoTriggerCommittedState() {
+        UUID orphanTxId = idGenerator.transactionIdFor(clock.now());
+
+        TablePartitionId tpId = new TablePartitionId(1, 0);
+
+        RowId rowId = new RowId(tpId.partitionId());
+
+        // Coordinator is dead.
+        when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null);
+
+        lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), 
LockMode.X);
+
+        UUID concurrentTxId = idGenerator.transactionIdFor(clock.now());
+
+        TxStateMeta committedState = new TxStateMeta(TxState.COMMITTED, 
LOCAL_NODE.id(), tpId, clock.now());
+
+        txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> committedState);
+
+        // Should trigger lock conflict listener in OrphanDetector.
+        lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X);
+
+        TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId);
+
+        // OrphanDetector didn't change the state.
+        assertEquals(committedState, orphanState);
+
+        verifyNoInteractions(replicaService);
+    }
+
+    @Test
+    void testNoTriggerAbortedState() {
+        UUID orphanTxId = idGenerator.transactionIdFor(clock.now());
+
+        TablePartitionId tpId = new TablePartitionId(1, 0);
+
+        RowId rowId = new RowId(tpId.partitionId());
+
+        // Coordinator is dead.
+        when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null);
+
+        lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), 
LockMode.X);
+
+        UUID concurrentTxId = idGenerator.transactionIdFor(clock.now());
+
+        TxStateMeta abortedState = new TxStateMeta(TxState.ABORTED, 
LOCAL_NODE.id(), tpId, null);
+
+        txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> abortedState);
+
+        // Should trigger lock conflict listener in OrphanDetector.
+        lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X);
+
+        TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId);
+
+        // OrphanDetector didn't change the state.
+        assertEquals(abortedState, orphanState);
+
+        verifyNoInteractions(replicaService);
+    }
+
+    @Test
+    void testNoTriggerFinishingState() {
+        UUID orphanTxId = idGenerator.transactionIdFor(clock.now());
+
+        TablePartitionId tpId = new TablePartitionId(1, 0);
+
+        RowId rowId = new RowId(tpId.partitionId());
+
+        lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), 
LockMode.X);
+
+        UUID concurrentTxId = idGenerator.transactionIdFor(clock.now());
+
+        TxStateMeta finishingState = new TxStateMeta(TxState.FINISHING, 
LOCAL_NODE.id(), tpId, null);
+
+        txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> finishingState);
+
+        // Coordinator is dead.
+        when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null);
+
+        // Should trigger lock conflict listener in OrphanDetector.
+        lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X);
+
+        TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId);
+
+        // OrphanDetector didn't change the state.
+        assertEquals(finishingState, orphanState);
+
+        verifyNoInteractions(replicaService);
+    }
+
+    @Test
+    void testNoTriggerCoordinatorAlive() {
+        UUID orphanTxId = idGenerator.transactionIdFor(clock.now());
+
+        TablePartitionId tpId = new TablePartitionId(1, 0);
+
+        RowId rowId = new RowId(tpId.partitionId());
+
+        lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), 
LockMode.X);
+
+        UUID concurrentTxId = idGenerator.transactionIdFor(clock.now());
+
+        TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, 
LOCAL_NODE.id(), tpId, null);
+
+        txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> pendingState);
+
+        
when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(mock(ClusterNode.class));
+
+        // Should trigger lock conflict listener in OrphanDetector.
+        lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X);
+
+        TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId);
+
+        // OrphanDetector didn't change the state.
+        assertEquals(pendingState, orphanState);
+
+        verifyNoInteractions(replicaService);
+    }
+
+    @Test
+    void testTriggerOnLockConflictCoordinatorDead() {
+        UUID orphanTxId = idGenerator.transactionIdFor(clock.now());
+
+        TablePartitionId tpId = new TablePartitionId(1, 0);
+
+        RowId rowId = new RowId(tpId.partitionId());
+
+        when(placementDriver.awaitPrimaryReplica(eq(tpId), any(), anyLong(), 
any()))
+                .thenReturn(completedFuture(new 
TestReplicaMetaImpl(REMOTE_NODE, hybridTimestamp(1), 
HybridTimestamp.MAX_VALUE)));
+
+        lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), 
LockMode.X);
+
+        UUID concurrentTxId = idGenerator.transactionIdFor(clock.now());
+
+        TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, 
LOCAL_NODE.id(), tpId, null);
+
+        txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> pendingState);
+
+        // Coordinator is dead.
+        when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null);
+
+        // Should trigger lock conflict listener in OrphanDetector.
+        CompletableFuture<Lock> acquire = lockManager.acquire(concurrentTxId, 
new LockKey(1, rowId), LockMode.X);
+
+        TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId);
+
+        // OrphanDetector didn't change the state.
+        assertEquals(TxState.ABANDONED, orphanState.txState());
+
+        // Send tx recovery message.
+        verify(replicaService).invoke(any(ClusterNode.class), any());
+
+        assertThat(acquire, willThrow(LockException.class, "Failed to acquire 
an abandoned lock due to a possible deadlock"));
+    }
+}

Reply via email to