This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c41a26f2d8 IGNITE-21761 Within commitPartition mark txnState with 
cleanup finished timestamp (#3496)
c41a26f2d8 is described below

commit c41a26f2d85d95468846624192b353f722c1395d
Author: Cyrill <[email protected]>
AuthorDate: Tue Apr 2 16:40:00 2024 +0300

    IGNITE-21761 Within commitPartition mark txnState with cleanup finished 
timestamp (#3496)
---
 .../ignite/internal/replicator/ReplicaManager.java |   2 -
 .../ignite/internal/table/ItDurableFinishTest.java |  53 +++++++++++
 .../replicator/PartitionReplicaListener.java       |  41 +++++---
 .../org/apache/ignite/internal/tx/TxStateMeta.java |  49 +++++++++-
 .../internal/tx/impl/PlacementDriverHelper.java    |  45 +++++++--
 .../internal/tx/impl/TxCleanupRequestHandler.java  | 106 ++++++++++++++++++++-
 .../internal/tx/impl/TxCleanupRequestSender.java   | 103 +++++++++++++++++---
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  10 +-
 .../ignite/internal/tx/impl/TxMessageSender.java   |   7 +-
 .../tx/impl/WriteIntentSwitchProcessor.java        |   9 +-
 ...ageResponse.java => CleanupReplicatedInfo.java} |  30 +++++-
 .../tx/message/TxCleanupMessageResponse.java       |   6 ++
 ...e.java => WriteIntentSwitchReplicatedInfo.java} |  35 ++++++-
 .../apache/ignite/internal/tx/TxCleanupTest.java   |  18 ++--
 14 files changed, 443 insertions(+), 71 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index b721e39df3..b9a17a5d41 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -351,8 +351,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 }
 
                 if (ex == null && res.replicationFuture() != null) {
-                    assert request instanceof PrimaryReplicaRequest;
-
                     res.replicationFuture().whenComplete((res0, ex0) -> {
                         NetworkMessage msg0;
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 06ca6325f1..3da2b2425d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -28,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -47,7 +49,11 @@ import 
org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
 import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
 import org.apache.ignite.internal.tx.message.TxCleanupMessage;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -314,6 +320,53 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         storage.put(tx.id(), txMetaToSet);
     }
 
+
+    @Test
+    void testCleanupReplicatedMessage() throws ExecutionException, 
InterruptedException {
+        Context context = prepareTransactionData();
+
+        DefaultMessagingService primaryMessaging = 
messaging(context.primaryNode);
+
+        CompletableFuture<Void> cleanupReplicatedFuture = new 
CompletableFuture<>();
+
+        primaryMessaging.dropMessages((s, networkMessage) -> {
+            if (networkMessage instanceof TxCleanupMessageResponse) {
+                logger().info("Received message: {}.", networkMessage);
+
+                TxCleanupMessageResponse message = (TxCleanupMessageResponse) 
networkMessage;
+
+                if (message instanceof TxCleanupMessageErrorResponse) {
+                    TxCleanupMessageErrorResponse error = 
(TxCleanupMessageErrorResponse) message;
+
+                    logger().error("Cleanup Error: ", error);
+
+                    return false;
+                }
+
+                CleanupReplicatedInfo result = message.result();
+
+                if (result != null) {
+                    cleanupReplicatedFuture.complete(null);
+                }
+            }
+
+            return false;
+        });
+
+        commitAndValidate(context.tx, context.tbl, context.keyTpl);
+
+        assertThat(cleanupReplicatedFuture, willCompleteSuccessfully());
+
+        assertTrue(waitForCondition(
+                () -> {
+                    TxStateMeta stateMeta = 
context.primaryNode.txManager().stateMeta(context.tx.id());
+
+                    return stateMeta != null && 
stateMeta.cleanupCompletionTimestamp() != null;
+                },
+                10_000)
+        );
+    }
+
     private @Nullable Integer nodeIndex(String name) {
         for (int i = 0; i < initialNodes(); i++) {
             if (node(i).name().equals(name)) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index e5b42932a6..76401c9fd8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -177,6 +177,7 @@ import 
org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.Cursor;
@@ -1711,9 +1712,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * This operation is idempotent, so it's safe to retry it.
      *
      * @param request Transaction cleanup request.
-     * @return CompletableFuture of void.
+     * @return CompletableFuture of ReplicaResult.
      */
-    private CompletableFuture<Void> 
processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) {
+    private CompletableFuture<ReplicaResult> 
processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) {
         markFinished(request.txId(), request.commit() ? COMMITTED : ABORTED, 
request.commitTimestamp());
 
         return awaitCleanupReadyFutures(request.txId(), request.commit())
@@ -1722,19 +1723,22 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         HybridTimestamp commandTimestamp = clockService.now();
 
                         return reliableCatalogVersionFor(commandTimestamp)
-                                .thenCompose(catalogVersion -> {
-                                    applyWriteIntentSwitchCommand(
-                                            request.txId(),
-                                            request.commit(),
-                                            request.commitTimestamp(),
-                                            request.commitTimestampLong(),
-                                            catalogVersion
-                                    );
+                                .thenApply(catalogVersion -> {
+                                    
CompletableFuture<WriteIntentSwitchReplicatedInfo> commandReplicatedFuture =
+                                            applyWriteIntentSwitchCommand(
+                                                    request.txId(),
+                                                    request.commit(),
+                                                    request.commitTimestamp(),
+                                                    
request.commitTimestampLong(),
+                                                    catalogVersion
+                                            );
 
-                                    return nullCompletedFuture();
+                                    return new ReplicaResult(null, 
commandReplicatedFuture);
                                 });
                     } else {
-                        return nullCompletedFuture();
+                        return completedFuture(
+                                new ReplicaResult(new 
WriteIntentSwitchReplicatedInfo(request.txId(), replicationGroupId), null)
+                        );
                     }
                 });
     }
@@ -1766,7 +1770,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .thenApply(v -> new 
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty()));
     }
 
-    private CompletableFuture<Void> applyWriteIntentSwitchCommand(
+    private CompletableFuture<WriteIntentSwitchReplicatedInfo> 
applyWriteIntentSwitchCommand(
             UUID transactionId,
             boolean commit,
             HybridTimestamp commitTimestamp,
@@ -1798,7 +1802,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return nullCompletedFuture();
                 })
-                .thenApply(res -> null);
+                .thenApply(res -> new 
WriteIntentSwitchReplicatedInfo(transactionId, replicationGroupId));
     }
 
     /**
@@ -1934,7 +1938,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         });
 
         if (cleanupReadyFut.isCompletedExceptionally()) {
-            return failedFuture(new 
TransactionException(TX_ALREADY_FINISHED_ERR, "Transaction is already 
finished."));
+            TxStateMeta txStateMeta = txManager.stateMeta(txId);
+
+            TxState txState = txStateMeta == null ? null : 
txStateMeta.txState();
+
+            return failedFuture(new TransactionException(
+                    TX_ALREADY_FINISHED_ERR,
+                    "Transaction is already finished txId=[" + txId + ", 
txState=" + txState + "]."
+            ));
         }
 
         CompletableFuture<T> fut = op.get();
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index 58bf55228b..686e86f8bc 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -43,6 +43,8 @@ public class TxStateMeta implements TransactionMeta {
 
     private final Long initialVacuumObservationTimestamp;
 
+    private final Long cleanupCompletionTimestamp;
+
     /**
      * Constructor.
      *
@@ -57,7 +59,7 @@ public class TxStateMeta implements TransactionMeta {
             @Nullable TablePartitionId commitPartitionId,
             @Nullable HybridTimestamp commitTimestamp
     ) {
-        this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, 
null);
+        this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, 
null, null);
     }
 
     /**
@@ -67,7 +69,6 @@ public class TxStateMeta implements TransactionMeta {
      * @param txCoordinatorId Transaction coordinator id.
      * @param commitPartitionId Commit partition replication group id.
      * @param commitTimestamp Commit timestamp.
-     * @param initialVacuumObservationTimestamp Initial vacuum observation 
timestamp.
      */
     public TxStateMeta(
             TxState txState,
@@ -75,12 +76,34 @@ public class TxStateMeta implements TransactionMeta {
             @Nullable TablePartitionId commitPartitionId,
             @Nullable HybridTimestamp commitTimestamp,
             @Nullable Long initialVacuumObservationTimestamp
+    ) {
+        this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, 
initialVacuumObservationTimestamp, null);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param txState Transaction state.
+     * @param txCoordinatorId Transaction coordinator id.
+     * @param commitPartitionId Commit partition replication group id.
+     * @param commitTimestamp Commit timestamp.
+     * @param initialVacuumObservationTimestamp Initial vacuum observation 
timestamp.
+     * @param cleanupCompletionTimestamp Cleanup completion timestamp.
+     */
+    public TxStateMeta(
+            TxState txState,
+            @Nullable String txCoordinatorId,
+            @Nullable TablePartitionId commitPartitionId,
+            @Nullable HybridTimestamp commitTimestamp,
+            @Nullable Long initialVacuumObservationTimestamp,
+            @Nullable Long cleanupCompletionTimestamp
     ) {
         this.txState = txState;
         this.txCoordinatorId = txCoordinatorId;
         this.commitPartitionId = commitPartitionId;
         this.commitTimestamp = commitTimestamp;
         this.initialVacuumObservationTimestamp = 
initialVacuumObservationTimestamp;
+        this.cleanupCompletionTimestamp = cleanupCompletionTimestamp;
     }
 
     /**
@@ -125,6 +148,10 @@ public class TxStateMeta implements TransactionMeta {
         return initialVacuumObservationTimestamp;
     }
 
+    public @Nullable Long cleanupCompletionTimestamp() {
+        return cleanupCompletionTimestamp;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -147,12 +174,26 @@ public class TxStateMeta implements TransactionMeta {
             return false;
         }
 
-        return commitTimestamp != null ? 
commitTimestamp.equals(that.commitTimestamp) : that.commitTimestamp == null;
+        if (commitTimestamp != null ? 
!commitTimestamp.equals(that.commitTimestamp) : that.commitTimestamp != null) {
+            return false;
+        }
+
+        if (initialVacuumObservationTimestamp != null
+                ? 
!initialVacuumObservationTimestamp.equals(that.initialVacuumObservationTimestamp)
+                : that.initialVacuumObservationTimestamp != null
+        ) {
+            return false;
+        }
+
+        return cleanupCompletionTimestamp != null
+                ? 
cleanupCompletionTimestamp.equals(that.cleanupCompletionTimestamp)
+                : that.cleanupCompletionTimestamp == null;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(txState, txCoordinatorId, commitPartitionId, 
commitTimestamp);
+        return Objects.hash(txState, txCoordinatorId, commitPartitionId, 
commitTimestamp, initialVacuumObservationTimestamp,
+                cleanupCompletionTimestamp);
     }
 
     @Override
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
index 5b668a46b5..26dc59cfba 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -76,6 +77,13 @@ public class PlacementDriverHelper {
     public CompletableFuture<ReplicaMeta> 
awaitPrimaryReplicaWithExceptionHandling(TablePartitionId partitionId) {
         HybridTimestamp timestamp = clockService.now();
 
+        return awaitPrimaryReplicaWithExceptionHandling(partitionId, 
timestamp);
+    }
+
+    private CompletableFuture<ReplicaMeta> 
awaitPrimaryReplicaWithExceptionHandling(
+            TablePartitionId partitionId,
+            HybridTimestamp timestamp
+    ) {
         return placementDriver.awaitPrimaryReplica(partitionId, timestamp, 
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
                 .handle((primaryReplica, e) -> {
                     if (e != null) {
@@ -98,6 +106,36 @@ public class PlacementDriverHelper {
      *         failed to find the primary for.
      */
     public CompletableFuture<PartitionData> 
findPrimaryReplicas(Collection<TablePartitionId> partitions) {
+        // Please note that we are using `get primary replica` instead of 
`await primary replica`.
+        // This method is faster, yet we still have the correctness:
+        // If the primary replica has not changed, get will return a valid 
value and we'll send an unlock request to this node.
+        // If the primary replica has expired and get returns null (or a 
different node), the primary node step down logic
+        // will automatically release the locks on that node. All we need to 
do is to clean the storage.
+        return computePrimaryReplicas(partitions, 
placementDriver::getPrimaryReplica);
+    }
+
+    /**
+     * Wait for primary replica to appear for the provided partitions.
+     *
+     * @param partitions A collection of partitions.
+     * @return A future that completes with a map of node to the partitions 
the node is primary for.
+     */
+    public CompletableFuture<Map<String, Set<TablePartitionId>>> 
awaitPrimaryReplicas(Collection<TablePartitionId> partitions) {
+        return computePrimaryReplicas(partitions, 
this::awaitPrimaryReplicaWithExceptionHandling)
+                .thenApply(partitionData -> partitionData.partitionsByNode);
+    }
+
+    /**
+     * Get primary replicas for the provided partitions according to the 
provided placement driver function.
+     *
+     * @param partitions A collection of partitions.
+     * @return A future that completes with a map of node to the partitions 
the node is primary for and a collection of partitions that we
+     *         failed to find the primary for.
+     */
+    private CompletableFuture<PartitionData> computePrimaryReplicas(
+            Collection<TablePartitionId> partitions,
+            BiFunction<TablePartitionId, HybridTimestamp, 
CompletableFuture<ReplicaMeta>> placementFunction
+    ) {
         if (partitions == null || partitions.isEmpty()) {
             return completedFuture(new PartitionData(emptyMap(), emptySet()));
         }
@@ -106,13 +144,8 @@ public class PlacementDriverHelper {
 
         Map<TablePartitionId, CompletableFuture<ReplicaMeta>> 
primaryReplicaFutures = new HashMap<>();
 
-        // Please note that we are using `get primary replica` instead of 
`await primary replica`.
-        // This method is faster, yet we still have the correctness:
-        // If the primary replica has not changed, get will return a valid 
value and we'll send an unlock request to this node.
-        // If the primary replica has expired and get returns null (or a 
different node), the primary node step down logic
-        // will automatically release the locks on that node. All we need to 
do is to clean the storage.
         for (TablePartitionId partitionId : partitions) {
-            primaryReplicaFutures.put(partitionId, 
placementDriver.getPrimaryReplica(partitionId, timestamp));
+            primaryReplicaFutures.put(partitionId, 
placementFunction.apply(partitionId, timestamp));
         }
 
         return allOf(primaryReplicaFutures.values().toArray(new 
CompletableFuture<?>[0]))
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index eb381903ac..7af5d71a0e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -18,21 +18,30 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.stream.Collectors.toSet;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.network.ChannelType;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
 import org.apache.ignite.internal.tx.message.TxCleanupMessage;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -57,6 +66,9 @@ public class TxCleanupRequestHandler {
     /** Cursor registry. */
     private final RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry;
 
+    /** The map of txId to a cleanup context, tracking replicated write 
intents. */
+    private final ConcurrentMap<UUID, CleanupContext> writeIntentsReplicated = 
new ConcurrentHashMap<>();
+
     /**
      * The constructor.
      *
@@ -103,6 +115,8 @@ public class TxCleanupRequestHandler {
         Collection<ReplicationGroupId> groups = txCleanupMessage.groups();
 
         if (groups != null) {
+            trackPartitions(txCleanupMessage.txId(), groups, sender);
+
             for (ReplicationGroupId group : groups) {
                 writeIntentSwitches.put((TablePartitionId) group,
                         writeIntentSwitchProcessor.switchLocalWriteIntents(
@@ -110,7 +124,7 @@ public class TxCleanupRequestHandler {
                                 txCleanupMessage.txId(),
                                 txCleanupMessage.commit(),
                                 txCleanupMessage.commitTimestamp()
-                        ));
+                        ).thenAccept(this::processWriteIntentSwitchResponse));
             }
         }
         // First trigger the cleanup to properly release the locks if we know 
all affected partitions on this node.
@@ -136,7 +150,7 @@ public class TxCleanupRequestHandler {
                                         txCleanupMessage.commitTimestamp(),
                                         txCleanupMessage.txId(),
                                         groupId
-                                );
+                                
).thenAccept(this::processWriteIntentSwitchResponse);
                             }
                         });
                     }
@@ -156,6 +170,14 @@ public class TxCleanupRequestHandler {
                 .build();
     }
 
+    private NetworkMessage prepareResponse(CleanupReplicatedInfo result) {
+        return FACTORY
+                .txCleanupMessageResponse()
+                .result(result)
+                .timestampLong(clockService.nowLong())
+                .build();
+    }
+
     private NetworkMessage prepareErrorResponse(Throwable th) {
         return FACTORY
                 .txCleanupMessageErrorResponse()
@@ -163,4 +185,84 @@ public class TxCleanupRequestHandler {
                 .timestampLong(clockService.nowLong())
                 .build();
     }
+
+    /**
+     * Start tracking the cleanup replication process for the provided 
transaction.
+     *
+     * @param txId Transaction id.
+     * @param groups Replication groups.
+     * @param sender Cleanup request sender, needed to send cleanup replicated 
response.
+     */
+    private void trackPartitions(UUID txId, Collection<ReplicationGroupId> 
groups, ClusterNode sender) {
+        Set<TablePartitionId> partitions =
+                groups.stream()
+                        .map(TablePartitionId.class::cast)
+                        .collect(toSet());
+
+        writeIntentsReplicated.put(txId, new CleanupContext(sender, 
partitions, partitions));
+    }
+
+    /**
+     * Process the replication response from a write intent switch request.
+     *
+     * @param response Write intent replication response.
+     */
+    private void processWriteIntentSwitchResponse(ReplicaResponse response) {
+        if (response == null) {
+            return;
+        }
+
+        Object result = response.result();
+
+        assert (result instanceof WriteIntentSwitchReplicatedInfo) :
+                "Unexpected type of cleanup replication response: [result=" + 
result + "].";
+
+        writeIntentSwitchReplicated((WriteIntentSwitchReplicatedInfo) result);
+    }
+
+    /**
+     * Process the replication response from a write intent switch request.
+     *
+     * @param info Write intent replication info.
+     */
+    void writeIntentSwitchReplicated(WriteIntentSwitchReplicatedInfo info) {
+        CleanupContext cleanupContext = 
writeIntentsReplicated.computeIfPresent(info.txId(), (uuid, context) -> {
+            Set<TablePartitionId> partitions = new 
HashSet<>(context.partitions);
+            partitions.remove(info.partitionId());
+
+            return new CleanupContext(context.sender, partitions, 
context.initialPartitions);
+        });
+
+        if (cleanupContext != null && cleanupContext.partitions.isEmpty()) {
+            // Means all write intents have been replicated.
+            sendCleanupReplicatedResponse(info.txId(), cleanupContext.sender, 
cleanupContext.initialPartitions);
+
+            writeIntentsReplicated.remove(info.txId());
+        }
+    }
+
+    /**
+     * Send cleanup replicated response back to the sender (which is the 
commit partition primary).
+     *
+     * @param txId Transaction id.
+     * @param sender Cleanup request sender.
+     * @param partitions Partitions that we received replication confirmation 
for.
+     */
+    private void sendCleanupReplicatedResponse(UUID txId, ClusterNode sender, 
Collection<TablePartitionId> partitions) {
+        messagingService.send(sender, ChannelType.DEFAULT, prepareResponse(new 
CleanupReplicatedInfo(txId, partitions)));
+    }
+
+    private static class CleanupContext {
+        private final ClusterNode sender;
+
+        private final Set<TablePartitionId> partitions;
+
+        private final Set<TablePartitionId> initialPartitions;
+
+        public CleanupContext(ClusterNode sender, Set<TablePartitionId> 
partitions, Set<TablePartitionId> initialPartitions) {
+            this.sender = sender;
+            this.partitions = partitions;
+            this.initialPartitions = initialPartitions;
+        }
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 9849f647d8..f5b235b372 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -29,10 +29,17 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
 import 
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.jetbrains.annotations.Nullable;
 
@@ -43,26 +50,71 @@ public class TxCleanupRequestSender {
     /** Placement driver helper. */
     private final PlacementDriverHelper placementDriverHelper;
 
-    /** Cleanup processor. */
-    private final WriteIntentSwitchProcessor writeIntentSwitchProcessor;
-
     private final TxMessageSender txMessageSender;
 
+    /** The map of txId to a cleanup context, tracking partitions with 
replicated write intents. */
+    private final ConcurrentMap<UUID, CleanupContext> writeIntentsReplicated = 
new ConcurrentHashMap<>();
+
+    /** Local transaction state storage. */
+    private final VolatileTxStateMetaStorage txStateVolatileStorage;
+
     /**
      * The constructor.
      *
      * @param txMessageSender Message sender.
      * @param placementDriverHelper Placement driver helper.
-     * @param writeIntentSwitchProcessor A cleanup processor.
+     * @param txStateVolatileStorage Volatile transaction state storage.
      */
     public TxCleanupRequestSender(
             TxMessageSender txMessageSender,
             PlacementDriverHelper placementDriverHelper,
-            WriteIntentSwitchProcessor writeIntentSwitchProcessor
+            VolatileTxStateMetaStorage txStateVolatileStorage
     ) {
         this.txMessageSender = txMessageSender;
         this.placementDriverHelper = placementDriverHelper;
-        this.writeIntentSwitchProcessor = writeIntentSwitchProcessor;
+        this.txStateVolatileStorage = txStateVolatileStorage;
+    }
+
+    /**
+     * Starts the request sender.
+     */
+    public void start() {
+        
txMessageSender.messagingService().addMessageHandler(TxMessageGroup.class, 
(msg, sender, correlationId) -> {
+            if (msg instanceof TxCleanupMessageResponse && correlationId == 
null) {
+                CleanupReplicatedInfo result = ((TxCleanupMessageResponse) 
msg).result();
+
+                if (result != null) {
+                    onCleanupReplicated(result);
+                }
+            }
+        });
+    }
+
+    private void onCleanupReplicated(CleanupReplicatedInfo info) {
+        CleanupContext ctx = 
writeIntentsReplicated.computeIfPresent(info.txId(), (uuid, cleanupContext) -> {
+            cleanupContext.partitions.removeAll(info.partitions());
+
+            return cleanupContext;
+        });
+
+        if (ctx != null && ctx.partitions.isEmpty()) {
+            markTxnCleanupReplicated(info.txId(), ctx.txState);
+
+            writeIntentsReplicated.remove(info.txId());
+        }
+    }
+
+    private void markTxnCleanupReplicated(UUID txId, TxState state) {
+        long cleanupCompletionTimestamp = System.currentTimeMillis();
+
+        txStateVolatileStorage.updateMeta(txId, oldMeta ->
+                new TxStateMeta(oldMeta == null ? state : oldMeta.txState(),
+                        oldMeta == null ? null : oldMeta.txCoordinatorId(),
+                        oldMeta == null ? null : oldMeta.commitPartitionId(),
+                        oldMeta == null ? null : oldMeta.commitTimestamp(),
+                        oldMeta == null ? null : 
oldMeta.initialVacuumObservationTimestamp(),
+                        cleanupCompletionTimestamp)
+        );
     }
 
     /**
@@ -91,6 +143,9 @@ public class TxCleanupRequestSender {
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
+        // Start tracking the partitions we want to learn the replication 
confirmation from.
+        writeIntentsReplicated.put(txId, new 
CleanupContext(enlistedPartitions.keySet(), commit ? TxState.COMMITTED : 
TxState.ABORTED));
+
         Map<String, Set<TablePartitionId>> partitions = new HashMap<>();
         enlistedPartitions.forEach((partitionId, nodeId) ->
                 partitions.computeIfAbsent(nodeId, node -> new 
HashSet<>()).add(partitionId));
@@ -106,24 +161,22 @@ public class TxCleanupRequestSender {
     ) {
         return placementDriverHelper.findPrimaryReplicas(partitionIds)
                 .thenCompose(partitionData -> {
-                    switchWriteIntentsOnPartitions(commit, commitTimestamp, 
txId, partitionData.partitionsWithoutPrimary);
+                    cleanupPartitionsWithoutPrimary(commit, commitTimestamp, 
txId, partitionData.partitionsWithoutPrimary);
 
                     return cleanupPartitions(partitionData.partitionsByNode, 
commit, commitTimestamp, txId);
                 });
     }
 
-    private void switchWriteIntentsOnPartitions(
+    private void cleanupPartitionsWithoutPrimary(
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
             Set<TablePartitionId> noPrimaryFound
     ) {
-        for (TablePartitionId partition : noPrimaryFound) {
-            // Okay, no primary found for that partition.
-            // Means the old one is no longer primary thus the locks were 
released.
-            // All we need to do is to wait for the new primary to appear and 
cleanup write intents.
-            writeIntentSwitchProcessor.switchWriteIntentsWithRetry(commit, 
commitTimestamp, txId, partition);
-        }
+        // For the partitions without primary, we need to wait until a new 
primary is found.
+        // Then we can proceed with the common cleanup flow.
+        placementDriverHelper.awaitPrimaryReplicas(noPrimaryFound)
+                .thenCompose(partitionsByNode -> 
cleanupPartitions(partitionsByNode, commit, commitTimestamp, txId));
     }
 
     private CompletableFuture<Void> cleanupPartitions(
@@ -170,6 +223,8 @@ public class TxCleanupRequestSender {
                                 return sendCleanupMessageWithRetries(commit, 
commitTimestamp, txId, node, partitions);
                             }
 
+                            // Run a cleanup that finds new primaries for the 
given partitions.
+                            // This covers the case when a partition primary 
died and we still want to switch write intents.
                             return cleanup(partitions, commit, 
commitTimestamp, txId);
                         }
 
@@ -180,4 +235,24 @@ public class TxCleanupRequestSender {
                 })
                 .thenCompose(v -> v);
     }
+
+    private static class CleanupContext {
+
+        /**
+         * The partitions the we have not received write intent replication 
confirmation for.
+         */
+        private final Set<TablePartitionId> partitions;
+
+        /**
+         * The state of the transaction.
+         */
+        private final TxState txState;
+
+        public CleanupContext(Set<TablePartitionId> partitions, TxState 
txState) {
+            this.partitions = partitions;
+
+            this.txState = txState;
+        }
+    }
+
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 11bfee04a0..5455b70f9e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.TxStateMetaFinishing;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import 
org.apache.ignite.internal.tx.impl.TransactionInflights.ReadWriteTxContext;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -318,7 +319,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 resourcesRegistry
         );
 
-        txCleanupRequestSender = new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, writeIntentSwitchProcessor);
+        txCleanupRequestSender =
+                new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, txStateVolatileStorage);
     }
 
     private CompletableFuture<Boolean> 
primaryReplicaEventListener(PrimaryReplicaEventParameters eventParameters) {
@@ -715,6 +717,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
         orphanDetector.start(txStateVolatileStorage, 
txConfig.abandonedCheckTs());
 
+        txCleanupRequestSender.start();
+
         txCleanupRequestHandler.start();
 
         placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
primaryReplicaEventListener);
@@ -831,6 +835,10 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         if (result instanceof UUID) {
             transactionInflights.removeInflight((UUID) result);
         }
+
+        if (result instanceof WriteIntentSwitchReplicatedInfo) {
+            
txCleanupRequestHandler.writeIntentSwitchReplicated((WriteIntentSwitchReplicatedInfo)
 result);
+        }
     }
 
     /**
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 6822eaaa32..951cb896a8 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TransactionResult;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -84,7 +85,7 @@ public class TxMessageSender {
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
      * @return Completable future of Void.
      */
-    public CompletableFuture<Void> switchWriteIntents(
+    public CompletableFuture<ReplicaResponse> switchWriteIntents(
             String primaryConsistentId,
             TablePartitionId tablePartitionId,
             UUID txId,
@@ -216,4 +217,8 @@ public class TxMessageSender {
                     return (TxStateResponse) resp;
                 });
     }
+
+    public MessagingService messagingService() {
+        return messagingService;
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
index f811e579a9..ac8c47cae0 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import 
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -65,7 +66,7 @@ public class WriteIntentSwitchProcessor {
     /**
      * Run switch write intent on the provided node.
      */
-    public CompletableFuture<Void> switchLocalWriteIntents(
+    public CompletableFuture<ReplicaResponse> switchLocalWriteIntents(
             TablePartitionId tablePartitionId,
             UUID txId,
             boolean commit,
@@ -79,7 +80,7 @@ public class WriteIntentSwitchProcessor {
     /**
      * Run switch write intent on the primary node of the provided partition 
in a durable manner.
      */
-    public CompletableFuture<Void> switchWriteIntentsWithRetry(
+    public CompletableFuture<ReplicaResponse> switchWriteIntentsWithRetry(
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
@@ -100,10 +101,10 @@ public class WriteIntentSwitchProcessor {
 
                         LOG.info("Failed to switch write intents for Tx 
[txId={}].", txId, ex);
 
-                        return CompletableFuture.<Void>failedFuture(ex);
+                        return 
CompletableFuture.<ReplicaResponse>failedFuture(ex);
                     }
 
-                    return CompletableFutures.<Void>nullCompletedFuture();
+                    return 
CompletableFutures.<ReplicaResponse>nullCompletedFuture();
                 })
                 .thenCompose(Function.identity());
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java
similarity index 54%
copy from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java
index 446a0ffb39..060e40af38 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java
@@ -17,12 +17,32 @@
 
 package org.apache.ignite.internal.tx.message;
 
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.message.TimestampAware;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 
 /**
- * Cleanup transaction message response.
+ * The result of a replicated cleanup request.
  */
-@Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
-public interface TxCleanupMessageResponse extends TimestampAware {
+public class CleanupReplicatedInfo implements Serializable {
+
+    private static final long serialVersionUID = -975001033274630774L;
+
+    private final UUID txId;
+
+    private final Collection<TablePartitionId> partitions;
+
+    public CleanupReplicatedInfo(UUID txId, Collection<TablePartitionId> 
partitions) {
+        this.txId = txId;
+        this.partitions = partitions;
+    }
+
+    public UUID txId() {
+        return txId;
+    }
+
+    public Collection<TablePartitionId> partitions() {
+        return partitions;
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
index 446a0ffb39..39cbd7aae9 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
@@ -17,12 +17,18 @@
 
 package org.apache.ignite.internal.tx.message;
 
+import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Cleanup transaction message response.
  */
 @Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
 public interface TxCleanupMessageResponse extends TimestampAware {
+
+    @Nullable
+    @Marshallable
+    CleanupReplicatedInfo result();
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicatedInfo.java
similarity index 50%
copy from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicatedInfo.java
index 446a0ffb39..725b843a52 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicatedInfo.java
@@ -17,12 +17,37 @@
 
 package org.apache.ignite.internal.tx.message;
 
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.message.TimestampAware;
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tostring.S;
 
 /**
- * Cleanup transaction message response.
+ * The result of a replicated write intent switch request.
  */
-@Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
-public interface TxCleanupMessageResponse extends TimestampAware {
+public class WriteIntentSwitchReplicatedInfo implements Serializable {
+
+    private static final long serialVersionUID = 8130171618503063413L;
+
+    private final UUID txId;
+
+    private final TablePartitionId partitionId;
+
+    public WriteIntentSwitchReplicatedInfo(UUID txId, TablePartitionId 
partitionId) {
+        this.txId = txId;
+        this.partitionId = partitionId;
+    }
+
+    public UUID txId() {
+        return txId;
+    }
+
+    public TablePartitionId partitionId() {
+        return partitionId;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(WriteIntentSwitchReplicatedInfo.class, this);
+    }
 }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index cb0eaf4554..918f849558 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -29,10 +29,10 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
@@ -59,7 +59,7 @@ import 
org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TxCleanupRequestSender;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
-import org.apache.ignite.internal.tx.impl.WriteIntentSwitchProcessor;
+import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
@@ -104,8 +104,6 @@ public class TxCleanupTest extends IgniteAbstractTest {
 
     private TransactionIdGenerator idGenerator;
 
-    private WriteIntentSwitchProcessor writeIntentSwitchProcessor;
-
     private TxMessageSender txMessageSender;
 
     /** Init test callback. */
@@ -128,9 +126,8 @@ public class TxCleanupTest extends IgniteAbstractTest {
 
         PlacementDriverHelper placementDriverHelper = new 
PlacementDriverHelper(placementDriver, clockService);
 
-        writeIntentSwitchProcessor = spy(new 
WriteIntentSwitchProcessor(placementDriverHelper, txMessageSender, 
topologyService));
-
-        cleanupRequestSender = new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, writeIntentSwitchProcessor);
+        cleanupRequestSender = new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, mock(
+                VolatileTxStateMetaStorage.class));
     }
 
     @Test
@@ -153,7 +150,6 @@ public class TxCleanupTest extends IgniteAbstractTest {
 
         assertThat(cleanup, willCompleteSuccessfully());
 
-        verifyNoInteractions(writeIntentSwitchProcessor);
         verify(txMessageSender, times(1)).cleanup(any(), any(), any(), 
anyBoolean(), any());
         verifyNoMoreInteractions(txMessageSender);
     }
@@ -190,8 +186,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
 
         assertThat(cleanup, willCompleteSuccessfully());
 
-        verify(txMessageSender, times(1)).switchWriteIntents(any(), any(), 
any(), anyBoolean(), any());
-        verify(txMessageSender, times(2)).cleanup(any(), any(), any(), 
anyBoolean(), any());
+        verify(txMessageSender, times(3)).cleanup(any(), any(), any(), 
anyBoolean(), any());
         verifyNoMoreInteractions(txMessageSender);
     }
 
@@ -225,8 +220,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
 
         assertThat(cleanup, willCompleteSuccessfully());
 
-        verify(txMessageSender, times(3)).switchWriteIntents(any(), any(), 
any(), anyBoolean(), any());
-        verify(txMessageSender, times(1)).cleanup(any(), any(), any(), 
anyBoolean(), any());
+        verify(txMessageSender, times(2)).cleanup(any(), any(), any(), 
anyBoolean(), any());
 
         verifyNoMoreInteractions(txMessageSender);
     }

Reply via email to