This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c41a26f2d8 IGNITE-21761 Within commitPartition mark txnState with
cleanup finished timestamp (#3496)
c41a26f2d8 is described below
commit c41a26f2d85d95468846624192b353f722c1395d
Author: Cyrill <[email protected]>
AuthorDate: Tue Apr 2 16:40:00 2024 +0300
IGNITE-21761 Within commitPartition mark txnState with cleanup finished
timestamp (#3496)
---
.../ignite/internal/replicator/ReplicaManager.java | 2 -
.../ignite/internal/table/ItDurableFinishTest.java | 53 +++++++++++
.../replicator/PartitionReplicaListener.java | 41 +++++---
.../org/apache/ignite/internal/tx/TxStateMeta.java | 49 +++++++++-
.../internal/tx/impl/PlacementDriverHelper.java | 45 +++++++--
.../internal/tx/impl/TxCleanupRequestHandler.java | 106 ++++++++++++++++++++-
.../internal/tx/impl/TxCleanupRequestSender.java | 103 +++++++++++++++++---
.../ignite/internal/tx/impl/TxManagerImpl.java | 10 +-
.../ignite/internal/tx/impl/TxMessageSender.java | 7 +-
.../tx/impl/WriteIntentSwitchProcessor.java | 9 +-
...ageResponse.java => CleanupReplicatedInfo.java} | 30 +++++-
.../tx/message/TxCleanupMessageResponse.java | 6 ++
...e.java => WriteIntentSwitchReplicatedInfo.java} | 35 ++++++-
.../apache/ignite/internal/tx/TxCleanupTest.java | 18 ++--
14 files changed, 443 insertions(+), 71 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index b721e39df3..b9a17a5d41 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -351,8 +351,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
if (ex == null && res.replicationFuture() != null) {
- assert request instanceof PrimaryReplicaRequest;
-
res.replicationFuture().whenComplete((res0, ex0) -> {
NetworkMessage msg0;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 06ca6325f1..3da2b2425d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -28,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
@@ -47,7 +49,11 @@ import
org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
import org.apache.ignite.internal.tx.message.TxCleanupMessage;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -314,6 +320,53 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
storage.put(tx.id(), txMetaToSet);
}
+
+ @Test
+ void testCleanupReplicatedMessage() throws ExecutionException,
InterruptedException {
+ Context context = prepareTransactionData();
+
+ DefaultMessagingService primaryMessaging =
messaging(context.primaryNode);
+
+ CompletableFuture<Void> cleanupReplicatedFuture = new
CompletableFuture<>();
+
+ primaryMessaging.dropMessages((s, networkMessage) -> {
+ if (networkMessage instanceof TxCleanupMessageResponse) {
+ logger().info("Received message: {}.", networkMessage);
+
+ TxCleanupMessageResponse message = (TxCleanupMessageResponse)
networkMessage;
+
+ if (message instanceof TxCleanupMessageErrorResponse) {
+ TxCleanupMessageErrorResponse error =
(TxCleanupMessageErrorResponse) message;
+
+ logger().error("Cleanup Error: ", error);
+
+ return false;
+ }
+
+ CleanupReplicatedInfo result = message.result();
+
+ if (result != null) {
+ cleanupReplicatedFuture.complete(null);
+ }
+ }
+
+ return false;
+ });
+
+ commitAndValidate(context.tx, context.tbl, context.keyTpl);
+
+ assertThat(cleanupReplicatedFuture, willCompleteSuccessfully());
+
+ assertTrue(waitForCondition(
+ () -> {
+ TxStateMeta stateMeta =
context.primaryNode.txManager().stateMeta(context.tx.id());
+
+ return stateMeta != null &&
stateMeta.cleanupCompletionTimestamp() != null;
+ },
+ 10_000)
+ );
+ }
+
private @Nullable Integer nodeIndex(String name) {
for (int i = 0; i < initialNodes(); i++) {
if (node(i).name().equals(name)) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index e5b42932a6..76401c9fd8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -177,6 +177,7 @@ import
org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
@@ -1711,9 +1712,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
* This operation is idempotent, so it's safe to retry it.
*
* @param request Transaction cleanup request.
- * @return CompletableFuture of void.
+ * @return CompletableFuture of ReplicaResult.
*/
- private CompletableFuture<Void>
processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) {
+ private CompletableFuture<ReplicaResult>
processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) {
markFinished(request.txId(), request.commit() ? COMMITTED : ABORTED,
request.commitTimestamp());
return awaitCleanupReadyFutures(request.txId(), request.commit())
@@ -1722,19 +1723,22 @@ public class PartitionReplicaListener implements
ReplicaListener {
HybridTimestamp commandTimestamp = clockService.now();
return reliableCatalogVersionFor(commandTimestamp)
- .thenCompose(catalogVersion -> {
- applyWriteIntentSwitchCommand(
- request.txId(),
- request.commit(),
- request.commitTimestamp(),
- request.commitTimestampLong(),
- catalogVersion
- );
+ .thenApply(catalogVersion -> {
+
CompletableFuture<WriteIntentSwitchReplicatedInfo> commandReplicatedFuture =
+ applyWriteIntentSwitchCommand(
+ request.txId(),
+ request.commit(),
+ request.commitTimestamp(),
+
request.commitTimestampLong(),
+ catalogVersion
+ );
- return nullCompletedFuture();
+ return new ReplicaResult(null,
commandReplicatedFuture);
});
} else {
- return nullCompletedFuture();
+ return completedFuture(
+ new ReplicaResult(new
WriteIntentSwitchReplicatedInfo(request.txId(), replicationGroupId), null)
+ );
}
});
}
@@ -1766,7 +1770,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
.thenApply(v -> new
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty()));
}
- private CompletableFuture<Void> applyWriteIntentSwitchCommand(
+ private CompletableFuture<WriteIntentSwitchReplicatedInfo>
applyWriteIntentSwitchCommand(
UUID transactionId,
boolean commit,
HybridTimestamp commitTimestamp,
@@ -1798,7 +1802,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return nullCompletedFuture();
})
- .thenApply(res -> null);
+ .thenApply(res -> new
WriteIntentSwitchReplicatedInfo(transactionId, replicationGroupId));
}
/**
@@ -1934,7 +1938,14 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
if (cleanupReadyFut.isCompletedExceptionally()) {
- return failedFuture(new
TransactionException(TX_ALREADY_FINISHED_ERR, "Transaction is already
finished."));
+ TxStateMeta txStateMeta = txManager.stateMeta(txId);
+
+ TxState txState = txStateMeta == null ? null :
txStateMeta.txState();
+
+ return failedFuture(new TransactionException(
+ TX_ALREADY_FINISHED_ERR,
+ "Transaction is already finished txId=[" + txId + ",
txState=" + txState + "]."
+ ));
}
CompletableFuture<T> fut = op.get();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index 58bf55228b..686e86f8bc 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -43,6 +43,8 @@ public class TxStateMeta implements TransactionMeta {
private final Long initialVacuumObservationTimestamp;
+ private final Long cleanupCompletionTimestamp;
+
/**
* Constructor.
*
@@ -57,7 +59,7 @@ public class TxStateMeta implements TransactionMeta {
@Nullable TablePartitionId commitPartitionId,
@Nullable HybridTimestamp commitTimestamp
) {
- this(txState, txCoordinatorId, commitPartitionId, commitTimestamp,
null);
+ this(txState, txCoordinatorId, commitPartitionId, commitTimestamp,
null, null);
}
/**
@@ -67,7 +69,6 @@ public class TxStateMeta implements TransactionMeta {
* @param txCoordinatorId Transaction coordinator id.
* @param commitPartitionId Commit partition replication group id.
* @param commitTimestamp Commit timestamp.
- * @param initialVacuumObservationTimestamp Initial vacuum observation
timestamp.
*/
public TxStateMeta(
TxState txState,
@@ -75,12 +76,34 @@ public class TxStateMeta implements TransactionMeta {
@Nullable TablePartitionId commitPartitionId,
@Nullable HybridTimestamp commitTimestamp,
@Nullable Long initialVacuumObservationTimestamp
+ ) {
+ this(txState, txCoordinatorId, commitPartitionId, commitTimestamp,
initialVacuumObservationTimestamp, null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param txState Transaction state.
+ * @param txCoordinatorId Transaction coordinator id.
+ * @param commitPartitionId Commit partition replication group id.
+ * @param commitTimestamp Commit timestamp.
+ * @param initialVacuumObservationTimestamp Initial vacuum observation
timestamp.
+ * @param cleanupCompletionTimestamp Cleanup completion timestamp.
+ */
+ public TxStateMeta(
+ TxState txState,
+ @Nullable String txCoordinatorId,
+ @Nullable TablePartitionId commitPartitionId,
+ @Nullable HybridTimestamp commitTimestamp,
+ @Nullable Long initialVacuumObservationTimestamp,
+ @Nullable Long cleanupCompletionTimestamp
) {
this.txState = txState;
this.txCoordinatorId = txCoordinatorId;
this.commitPartitionId = commitPartitionId;
this.commitTimestamp = commitTimestamp;
this.initialVacuumObservationTimestamp =
initialVacuumObservationTimestamp;
+ this.cleanupCompletionTimestamp = cleanupCompletionTimestamp;
}
/**
@@ -125,6 +148,10 @@ public class TxStateMeta implements TransactionMeta {
return initialVacuumObservationTimestamp;
}
+ public @Nullable Long cleanupCompletionTimestamp() {
+ return cleanupCompletionTimestamp;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -147,12 +174,26 @@ public class TxStateMeta implements TransactionMeta {
return false;
}
- return commitTimestamp != null ?
commitTimestamp.equals(that.commitTimestamp) : that.commitTimestamp == null;
+ if (commitTimestamp != null ?
!commitTimestamp.equals(that.commitTimestamp) : that.commitTimestamp != null) {
+ return false;
+ }
+
+ if (initialVacuumObservationTimestamp != null
+ ?
!initialVacuumObservationTimestamp.equals(that.initialVacuumObservationTimestamp)
+ : that.initialVacuumObservationTimestamp != null
+ ) {
+ return false;
+ }
+
+ return cleanupCompletionTimestamp != null
+ ?
cleanupCompletionTimestamp.equals(that.cleanupCompletionTimestamp)
+ : that.cleanupCompletionTimestamp == null;
}
@Override
public int hashCode() {
- return Objects.hash(txState, txCoordinatorId, commitPartitionId,
commitTimestamp);
+ return Objects.hash(txState, txCoordinatorId, commitPartitionId,
commitTimestamp, initialVacuumObservationTimestamp,
+ cleanupCompletionTimestamp);
}
@Override
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
index 5b668a46b5..26dc59cfba 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -76,6 +77,13 @@ public class PlacementDriverHelper {
public CompletableFuture<ReplicaMeta>
awaitPrimaryReplicaWithExceptionHandling(TablePartitionId partitionId) {
HybridTimestamp timestamp = clockService.now();
+ return awaitPrimaryReplicaWithExceptionHandling(partitionId,
timestamp);
+ }
+
+ private CompletableFuture<ReplicaMeta>
awaitPrimaryReplicaWithExceptionHandling(
+ TablePartitionId partitionId,
+ HybridTimestamp timestamp
+ ) {
return placementDriver.awaitPrimaryReplica(partitionId, timestamp,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
.handle((primaryReplica, e) -> {
if (e != null) {
@@ -98,6 +106,36 @@ public class PlacementDriverHelper {
* failed to find the primary for.
*/
public CompletableFuture<PartitionData>
findPrimaryReplicas(Collection<TablePartitionId> partitions) {
+ // Please note that we are using `get primary replica` instead of
`await primary replica`.
+ // This method is faster, yet we still have the correctness:
+ // If the primary replica has not changed, get will return a valid
value and we'll send an unlock request to this node.
+ // If the primary replica has expired and get returns null (or a
different node), the primary node step down logic
+ // will automatically release the locks on that node. All we need to
do is to clean the storage.
+ return computePrimaryReplicas(partitions,
placementDriver::getPrimaryReplica);
+ }
+
+ /**
+ * Wait for primary replica to appear for the provided partitions.
+ *
+ * @param partitions A collection of partitions.
+ * @return A future that completes with a map of node to the partitions
the node is primary for.
+ */
+ public CompletableFuture<Map<String, Set<TablePartitionId>>>
awaitPrimaryReplicas(Collection<TablePartitionId> partitions) {
+ return computePrimaryReplicas(partitions,
this::awaitPrimaryReplicaWithExceptionHandling)
+ .thenApply(partitionData -> partitionData.partitionsByNode);
+ }
+
+ /**
+ * Get primary replicas for the provided partitions according to the
provided placement driver function.
+ *
+ * @param partitions A collection of partitions.
+ * @return A future that completes with a map of node to the partitions
the node is primary for and a collection of partitions that we
+ * failed to find the primary for.
+ */
+ private CompletableFuture<PartitionData> computePrimaryReplicas(
+ Collection<TablePartitionId> partitions,
+ BiFunction<TablePartitionId, HybridTimestamp,
CompletableFuture<ReplicaMeta>> placementFunction
+ ) {
if (partitions == null || partitions.isEmpty()) {
return completedFuture(new PartitionData(emptyMap(), emptySet()));
}
@@ -106,13 +144,8 @@ public class PlacementDriverHelper {
Map<TablePartitionId, CompletableFuture<ReplicaMeta>>
primaryReplicaFutures = new HashMap<>();
- // Please note that we are using `get primary replica` instead of
`await primary replica`.
- // This method is faster, yet we still have the correctness:
- // If the primary replica has not changed, get will return a valid
value and we'll send an unlock request to this node.
- // If the primary replica has expired and get returns null (or a
different node), the primary node step down logic
- // will automatically release the locks on that node. All we need to
do is to clean the storage.
for (TablePartitionId partitionId : partitions) {
- primaryReplicaFutures.put(partitionId,
placementDriver.getPrimaryReplica(partitionId, timestamp));
+ primaryReplicaFutures.put(partitionId,
placementFunction.apply(partitionId, timestamp));
}
return allOf(primaryReplicaFutures.values().toArray(new
CompletableFuture<?>[0]))
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index eb381903ac..7af5d71a0e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -18,21 +18,30 @@
package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.stream.Collectors.toSet;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
import org.apache.ignite.internal.tx.message.TxCleanupMessage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -57,6 +66,9 @@ public class TxCleanupRequestHandler {
/** Cursor registry. */
private final RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry;
+ /** The map of txId to a cleanup context, tracking replicated write
intents. */
+ private final ConcurrentMap<UUID, CleanupContext> writeIntentsReplicated =
new ConcurrentHashMap<>();
+
/**
* The constructor.
*
@@ -103,6 +115,8 @@ public class TxCleanupRequestHandler {
Collection<ReplicationGroupId> groups = txCleanupMessage.groups();
if (groups != null) {
+ trackPartitions(txCleanupMessage.txId(), groups, sender);
+
for (ReplicationGroupId group : groups) {
writeIntentSwitches.put((TablePartitionId) group,
writeIntentSwitchProcessor.switchLocalWriteIntents(
@@ -110,7 +124,7 @@ public class TxCleanupRequestHandler {
txCleanupMessage.txId(),
txCleanupMessage.commit(),
txCleanupMessage.commitTimestamp()
- ));
+ ).thenAccept(this::processWriteIntentSwitchResponse));
}
}
// First trigger the cleanup to properly release the locks if we know
all affected partitions on this node.
@@ -136,7 +150,7 @@ public class TxCleanupRequestHandler {
txCleanupMessage.commitTimestamp(),
txCleanupMessage.txId(),
groupId
- );
+
).thenAccept(this::processWriteIntentSwitchResponse);
}
});
}
@@ -156,6 +170,14 @@ public class TxCleanupRequestHandler {
.build();
}
+ private NetworkMessage prepareResponse(CleanupReplicatedInfo result) {
+ return FACTORY
+ .txCleanupMessageResponse()
+ .result(result)
+ .timestampLong(clockService.nowLong())
+ .build();
+ }
+
private NetworkMessage prepareErrorResponse(Throwable th) {
return FACTORY
.txCleanupMessageErrorResponse()
@@ -163,4 +185,84 @@ public class TxCleanupRequestHandler {
.timestampLong(clockService.nowLong())
.build();
}
+
+ /**
+ * Start tracking the cleanup replication process for the provided
transaction.
+ *
+ * @param txId Transaction id.
+ * @param groups Replication groups.
+ * @param sender Cleanup request sender, needed to send cleanup replicated
response.
+ */
+ private void trackPartitions(UUID txId, Collection<ReplicationGroupId>
groups, ClusterNode sender) {
+ Set<TablePartitionId> partitions =
+ groups.stream()
+ .map(TablePartitionId.class::cast)
+ .collect(toSet());
+
+ writeIntentsReplicated.put(txId, new CleanupContext(sender,
partitions, partitions));
+ }
+
+ /**
+ * Process the replication response from a write intent switch request.
+ *
+ * @param response Write intent replication response.
+ */
+ private void processWriteIntentSwitchResponse(ReplicaResponse response) {
+ if (response == null) {
+ return;
+ }
+
+ Object result = response.result();
+
+ assert (result instanceof WriteIntentSwitchReplicatedInfo) :
+ "Unexpected type of cleanup replication response: [result=" +
result + "].";
+
+ writeIntentSwitchReplicated((WriteIntentSwitchReplicatedInfo) result);
+ }
+
+ /**
+ * Process the replication response from a write intent switch request.
+ *
+ * @param info Write intent replication info.
+ */
+ void writeIntentSwitchReplicated(WriteIntentSwitchReplicatedInfo info) {
+ CleanupContext cleanupContext =
writeIntentsReplicated.computeIfPresent(info.txId(), (uuid, context) -> {
+ Set<TablePartitionId> partitions = new
HashSet<>(context.partitions);
+ partitions.remove(info.partitionId());
+
+ return new CleanupContext(context.sender, partitions,
context.initialPartitions);
+ });
+
+ if (cleanupContext != null && cleanupContext.partitions.isEmpty()) {
+ // Means all write intents have been replicated.
+ sendCleanupReplicatedResponse(info.txId(), cleanupContext.sender,
cleanupContext.initialPartitions);
+
+ writeIntentsReplicated.remove(info.txId());
+ }
+ }
+
+ /**
+ * Send cleanup replicated response back to the sender (which is the
commit partition primary).
+ *
+ * @param txId Transaction id.
+ * @param sender Cleanup request sender.
+ * @param partitions Partitions that we received replication confirmation
for.
+ */
+ private void sendCleanupReplicatedResponse(UUID txId, ClusterNode sender,
Collection<TablePartitionId> partitions) {
+ messagingService.send(sender, ChannelType.DEFAULT, prepareResponse(new
CleanupReplicatedInfo(txId, partitions)));
+ }
+
+ private static class CleanupContext {
+ private final ClusterNode sender;
+
+ private final Set<TablePartitionId> partitions;
+
+ private final Set<TablePartitionId> initialPartitions;
+
+ public CleanupContext(ClusterNode sender, Set<TablePartitionId>
partitions, Set<TablePartitionId> initialPartitions) {
+ this.sender = sender;
+ this.partitions = partitions;
+ this.initialPartitions = initialPartitions;
+ }
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 9849f647d8..f5b235b372 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -29,10 +29,17 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
import
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.util.CompletableFutures;
import org.jetbrains.annotations.Nullable;
@@ -43,26 +50,71 @@ public class TxCleanupRequestSender {
/** Placement driver helper. */
private final PlacementDriverHelper placementDriverHelper;
- /** Cleanup processor. */
- private final WriteIntentSwitchProcessor writeIntentSwitchProcessor;
-
private final TxMessageSender txMessageSender;
+ /** The map of txId to a cleanup context, tracking partitions with
replicated write intents. */
+ private final ConcurrentMap<UUID, CleanupContext> writeIntentsReplicated =
new ConcurrentHashMap<>();
+
+ /** Local transaction state storage. */
+ private final VolatileTxStateMetaStorage txStateVolatileStorage;
+
/**
* The constructor.
*
* @param txMessageSender Message sender.
* @param placementDriverHelper Placement driver helper.
- * @param writeIntentSwitchProcessor A cleanup processor.
+ * @param txStateVolatileStorage Volatile transaction state storage.
*/
public TxCleanupRequestSender(
TxMessageSender txMessageSender,
PlacementDriverHelper placementDriverHelper,
- WriteIntentSwitchProcessor writeIntentSwitchProcessor
+ VolatileTxStateMetaStorage txStateVolatileStorage
) {
this.txMessageSender = txMessageSender;
this.placementDriverHelper = placementDriverHelper;
- this.writeIntentSwitchProcessor = writeIntentSwitchProcessor;
+ this.txStateVolatileStorage = txStateVolatileStorage;
+ }
+
+ /**
+ * Starts the request sender.
+ */
+ public void start() {
+
txMessageSender.messagingService().addMessageHandler(TxMessageGroup.class,
(msg, sender, correlationId) -> {
+ if (msg instanceof TxCleanupMessageResponse && correlationId ==
null) {
+ CleanupReplicatedInfo result = ((TxCleanupMessageResponse)
msg).result();
+
+ if (result != null) {
+ onCleanupReplicated(result);
+ }
+ }
+ });
+ }
+
+ private void onCleanupReplicated(CleanupReplicatedInfo info) {
+ CleanupContext ctx =
writeIntentsReplicated.computeIfPresent(info.txId(), (uuid, cleanupContext) -> {
+ cleanupContext.partitions.removeAll(info.partitions());
+
+ return cleanupContext;
+ });
+
+ if (ctx != null && ctx.partitions.isEmpty()) {
+ markTxnCleanupReplicated(info.txId(), ctx.txState);
+
+ writeIntentsReplicated.remove(info.txId());
+ }
+ }
+
+ private void markTxnCleanupReplicated(UUID txId, TxState state) {
+ long cleanupCompletionTimestamp = System.currentTimeMillis();
+
+ txStateVolatileStorage.updateMeta(txId, oldMeta ->
+ new TxStateMeta(oldMeta == null ? state : oldMeta.txState(),
+ oldMeta == null ? null : oldMeta.txCoordinatorId(),
+ oldMeta == null ? null : oldMeta.commitPartitionId(),
+ oldMeta == null ? null : oldMeta.commitTimestamp(),
+ oldMeta == null ? null :
oldMeta.initialVacuumObservationTimestamp(),
+ cleanupCompletionTimestamp)
+ );
}
/**
@@ -91,6 +143,9 @@ public class TxCleanupRequestSender {
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
+ // Start tracking the partitions we want to learn the replication
confirmation from.
+ writeIntentsReplicated.put(txId, new
CleanupContext(enlistedPartitions.keySet(), commit ? TxState.COMMITTED :
TxState.ABORTED));
+
Map<String, Set<TablePartitionId>> partitions = new HashMap<>();
enlistedPartitions.forEach((partitionId, nodeId) ->
partitions.computeIfAbsent(nodeId, node -> new
HashSet<>()).add(partitionId));
@@ -106,24 +161,22 @@ public class TxCleanupRequestSender {
) {
return placementDriverHelper.findPrimaryReplicas(partitionIds)
.thenCompose(partitionData -> {
- switchWriteIntentsOnPartitions(commit, commitTimestamp,
txId, partitionData.partitionsWithoutPrimary);
+ cleanupPartitionsWithoutPrimary(commit, commitTimestamp,
txId, partitionData.partitionsWithoutPrimary);
return cleanupPartitions(partitionData.partitionsByNode,
commit, commitTimestamp, txId);
});
}
- private void switchWriteIntentsOnPartitions(
+ private void cleanupPartitionsWithoutPrimary(
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
Set<TablePartitionId> noPrimaryFound
) {
- for (TablePartitionId partition : noPrimaryFound) {
- // Okay, no primary found for that partition.
- // Means the old one is no longer primary thus the locks were
released.
- // All we need to do is to wait for the new primary to appear and
cleanup write intents.
- writeIntentSwitchProcessor.switchWriteIntentsWithRetry(commit,
commitTimestamp, txId, partition);
- }
+ // For the partitions without primary, we need to wait until a new
primary is found.
+ // Then we can proceed with the common cleanup flow.
+ placementDriverHelper.awaitPrimaryReplicas(noPrimaryFound)
+ .thenCompose(partitionsByNode ->
cleanupPartitions(partitionsByNode, commit, commitTimestamp, txId));
}
private CompletableFuture<Void> cleanupPartitions(
@@ -170,6 +223,8 @@ public class TxCleanupRequestSender {
return sendCleanupMessageWithRetries(commit,
commitTimestamp, txId, node, partitions);
}
+ // Run a cleanup that finds new primaries for the
given partitions.
+ // This covers the case when a partition primary
died and we still want to switch write intents.
return cleanup(partitions, commit,
commitTimestamp, txId);
}
@@ -180,4 +235,24 @@ public class TxCleanupRequestSender {
})
.thenCompose(v -> v);
}
+
+ private static class CleanupContext {
+
+ /**
+ * The partitions the we have not received write intent replication
confirmation for.
+ */
+ private final Set<TablePartitionId> partitions;
+
+ /**
+ * The state of the transaction.
+ */
+ private final TxState txState;
+
+ public CleanupContext(Set<TablePartitionId> partitions, TxState
txState) {
+ this.partitions = partitions;
+
+ this.txState = txState;
+ }
+ }
+
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 11bfee04a0..5455b70f9e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.TxStateMetaFinishing;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import
org.apache.ignite.internal.tx.impl.TransactionInflights.ReadWriteTxContext;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -318,7 +319,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
resourcesRegistry
);
- txCleanupRequestSender = new TxCleanupRequestSender(txMessageSender,
placementDriverHelper, writeIntentSwitchProcessor);
+ txCleanupRequestSender =
+ new TxCleanupRequestSender(txMessageSender,
placementDriverHelper, txStateVolatileStorage);
}
private CompletableFuture<Boolean>
primaryReplicaEventListener(PrimaryReplicaEventParameters eventParameters) {
@@ -715,6 +717,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
orphanDetector.start(txStateVolatileStorage,
txConfig.abandonedCheckTs());
+ txCleanupRequestSender.start();
+
txCleanupRequestHandler.start();
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
primaryReplicaEventListener);
@@ -831,6 +835,10 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
if (result instanceof UUID) {
transactionInflights.removeInflight((UUID) result);
}
+
+ if (result instanceof WriteIntentSwitchReplicatedInfo) {
+
txCleanupRequestHandler.writeIntentSwitchReplicated((WriteIntentSwitchReplicatedInfo)
result);
+ }
}
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 6822eaaa32..951cb896a8 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TransactionResult;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -84,7 +85,7 @@ public class TxMessageSender {
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
* @return Completable future of Void.
*/
- public CompletableFuture<Void> switchWriteIntents(
+ public CompletableFuture<ReplicaResponse> switchWriteIntents(
String primaryConsistentId,
TablePartitionId tablePartitionId,
UUID txId,
@@ -216,4 +217,8 @@ public class TxMessageSender {
return (TxStateResponse) resp;
});
}
+
+ public MessagingService messagingService() {
+ return messagingService;
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
index f811e579a9..ac8c47cae0 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -65,7 +66,7 @@ public class WriteIntentSwitchProcessor {
/**
* Run switch write intent on the provided node.
*/
- public CompletableFuture<Void> switchLocalWriteIntents(
+ public CompletableFuture<ReplicaResponse> switchLocalWriteIntents(
TablePartitionId tablePartitionId,
UUID txId,
boolean commit,
@@ -79,7 +80,7 @@ public class WriteIntentSwitchProcessor {
/**
* Run switch write intent on the primary node of the provided partition
in a durable manner.
*/
- public CompletableFuture<Void> switchWriteIntentsWithRetry(
+ public CompletableFuture<ReplicaResponse> switchWriteIntentsWithRetry(
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
@@ -100,10 +101,10 @@ public class WriteIntentSwitchProcessor {
LOG.info("Failed to switch write intents for Tx
[txId={}].", txId, ex);
- return CompletableFuture.<Void>failedFuture(ex);
+ return
CompletableFuture.<ReplicaResponse>failedFuture(ex);
}
- return CompletableFutures.<Void>nullCompletedFuture();
+ return
CompletableFutures.<ReplicaResponse>nullCompletedFuture();
})
.thenCompose(Function.identity());
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java
similarity index 54%
copy from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java
index 446a0ffb39..060e40af38 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java
@@ -17,12 +17,32 @@
package org.apache.ignite.internal.tx.message;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.message.TimestampAware;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.replicator.TablePartitionId;
/**
- * Cleanup transaction message response.
+ * The result of a replicated cleanup request.
*/
-@Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
-public interface TxCleanupMessageResponse extends TimestampAware {
+public class CleanupReplicatedInfo implements Serializable {
+
+ private static final long serialVersionUID = -975001033274630774L;
+
+ private final UUID txId;
+
+ private final Collection<TablePartitionId> partitions;
+
+ public CleanupReplicatedInfo(UUID txId, Collection<TablePartitionId>
partitions) {
+ this.txId = txId;
+ this.partitions = partitions;
+ }
+
+ public UUID txId() {
+ return txId;
+ }
+
+ public Collection<TablePartitionId> partitions() {
+ return partitions;
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
index 446a0ffb39..39cbd7aae9 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
@@ -17,12 +17,18 @@
package org.apache.ignite.internal.tx.message;
+import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.jetbrains.annotations.Nullable;
/**
* Cleanup transaction message response.
*/
@Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
public interface TxCleanupMessageResponse extends TimestampAware {
+
+ @Nullable
+ @Marshallable
+ CleanupReplicatedInfo result();
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicatedInfo.java
similarity index 50%
copy from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicatedInfo.java
index 446a0ffb39..725b843a52 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicatedInfo.java
@@ -17,12 +17,37 @@
package org.apache.ignite.internal.tx.message;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.message.TimestampAware;
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tostring.S;
/**
- * Cleanup transaction message response.
+ * The result of a replicated write intent switch request.
*/
-@Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
-public interface TxCleanupMessageResponse extends TimestampAware {
+public class WriteIntentSwitchReplicatedInfo implements Serializable {
+
+ private static final long serialVersionUID = 8130171618503063413L;
+
+ private final UUID txId;
+
+ private final TablePartitionId partitionId;
+
+ public WriteIntentSwitchReplicatedInfo(UUID txId, TablePartitionId
partitionId) {
+ this.txId = txId;
+ this.partitionId = partitionId;
+ }
+
+ public UUID txId() {
+ return txId;
+ }
+
+ public TablePartitionId partitionId() {
+ return partitionId;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(WriteIntentSwitchReplicatedInfo.class, this);
+ }
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index cb0eaf4554..918f849558 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -29,10 +29,10 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@@ -59,7 +59,7 @@ import
org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxCleanupRequestSender;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
-import org.apache.ignite.internal.tx.impl.WriteIntentSwitchProcessor;
+import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
@@ -104,8 +104,6 @@ public class TxCleanupTest extends IgniteAbstractTest {
private TransactionIdGenerator idGenerator;
- private WriteIntentSwitchProcessor writeIntentSwitchProcessor;
-
private TxMessageSender txMessageSender;
/** Init test callback. */
@@ -128,9 +126,8 @@ public class TxCleanupTest extends IgniteAbstractTest {
PlacementDriverHelper placementDriverHelper = new
PlacementDriverHelper(placementDriver, clockService);
- writeIntentSwitchProcessor = spy(new
WriteIntentSwitchProcessor(placementDriverHelper, txMessageSender,
topologyService));
-
- cleanupRequestSender = new TxCleanupRequestSender(txMessageSender,
placementDriverHelper, writeIntentSwitchProcessor);
+ cleanupRequestSender = new TxCleanupRequestSender(txMessageSender,
placementDriverHelper, mock(
+ VolatileTxStateMetaStorage.class));
}
@Test
@@ -153,7 +150,6 @@ public class TxCleanupTest extends IgniteAbstractTest {
assertThat(cleanup, willCompleteSuccessfully());
- verifyNoInteractions(writeIntentSwitchProcessor);
verify(txMessageSender, times(1)).cleanup(any(), any(), any(),
anyBoolean(), any());
verifyNoMoreInteractions(txMessageSender);
}
@@ -190,8 +186,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
assertThat(cleanup, willCompleteSuccessfully());
- verify(txMessageSender, times(1)).switchWriteIntents(any(), any(),
any(), anyBoolean(), any());
- verify(txMessageSender, times(2)).cleanup(any(), any(), any(),
anyBoolean(), any());
+ verify(txMessageSender, times(3)).cleanup(any(), any(), any(),
anyBoolean(), any());
verifyNoMoreInteractions(txMessageSender);
}
@@ -225,8 +220,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
assertThat(cleanup, willCompleteSuccessfully());
- verify(txMessageSender, times(3)).switchWriteIntents(any(), any(),
any(), anyBoolean(), any());
- verify(txMessageSender, times(1)).cleanup(any(), any(), any(),
anyBoolean(), any());
+ verify(txMessageSender, times(2)).cleanup(any(), any(), any(),
anyBoolean(), any());
verifyNoMoreInteractions(txMessageSender);
}