This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e0424de400 IGNITE-22147 Fixed flaky
ItTxResourcesVacuumTest.testRecoveryAfterPersistentStateVacuumized (#3689)
e0424de400 is described below
commit e0424de400770c2b669f6faf5e0ff019c5934189
Author: Denis Chudov <[email protected]>
AuthorDate: Fri May 10 13:30:53 2024 +0300
IGNITE-22147 Fixed flaky
ItTxResourcesVacuumTest.testRecoveryAfterPersistentStateVacuumized (#3689)
---
.../internal/table/ItTxResourcesVacuumTest.java | 84 ++++++++++++++--------
.../table/distributed/raft/PartitionListener.java | 4 +-
.../replicator/PartitionReplicaListener.java | 4 +-
.../FinishedTransactionBatchRequestHandler.java | 4 +-
.../internal/tx/impl/TxCleanupRequestHandler.java | 5 +-
.../internal/tx/impl/TxCleanupRequestSender.java | 17 ++++-
.../tx/message/TxCleanupMessageErrorResponse.java | 8 +++
7 files changed, 90 insertions(+), 36 deletions(-)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index ccb91a867b..95f2408f3f 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -24,7 +24,6 @@ import static
org.apache.ignite.internal.table.NodeUtils.transferPrimary;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static
org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
import static
org.apache.ignite.internal.tx.test.ItTransactionTestUtils.findTupleToBeHostedOnNode;
@@ -229,7 +228,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
if (finishRequest.txId().equals(txId)) {
finishStartedFuture.complete(null);
- finishAllowedFuture.join();
+ joinWithTimeout(finishAllowedFuture);
}
}
@@ -250,7 +249,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
// Check that the volatile state of the transaction is preserved.
assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
- finishAllowedFuture.complete(null);
+ assertTrue(finishAllowedFuture.complete(null));
assertThat(commitFut, willCompleteSuccessfully());
@@ -382,7 +381,8 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
Set<String> commitPartNodes = partitionAssignment(node, new
TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
- log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].",
commitPartitionLeaseholder.name(), commitPartNodes);
+ log.info("Test: Commit partition [part={}, leaseholder={},
hostingNodes={}].", commitPartGrpId, commitPartitionLeaseholder.name(),
+ commitPartNodes);
// Some node that does not host the commit partition, will be the
primary node for upserting another tuple.
IgniteImpl leaseholderForAnotherTuple = findNode(n ->
!commitPartNodes.contains(n.name()));
@@ -405,7 +405,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
log.info("Test: cleanup started.");
if (commitPartNodes.contains(n)) {
- cleanupAllowed.join();
+ joinWithTimeout(cleanupAllowed);
}
}
@@ -425,13 +425,13 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
assertTxStateVacuumized(Set.of(leaseholderForAnotherTuple.name()),
txId, commitPartId, false);
// Unblocking cleanup.
- cleanupAllowed.complete(null);
+ assertTrue(cleanupAllowed.complete(null));
assertThat(commitFut, willCompleteSuccessfully());
Transaction roTxAfter = beginReadOnlyTx(anyNode());
- waitForCondition(() -> volatileTxState(commitPartitionLeaseholder,
txId) != null, 10_000);
+ waitForCleanupCompletion(commitPartNodes, txId);
triggerVacuum();
assertTxStateVacuumized(txId, commitPartId, true);
@@ -498,7 +498,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
cleanupStarted.complete(null);
- cleanupAllowedFut.join();
+ joinWithTimeout(cleanupAllowedFut);
}
return false;
@@ -512,7 +512,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
transferPrimary(cluster.runningNodes().collect(toSet()),
commitPartGrpId, commitPartNodes::contains);
- cleanupAllowedFut.complete(null);
+ assertTrue(cleanupAllowedFut.complete(null));
cleanupAllowed[0] = true;
@@ -570,7 +570,8 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
Set<String> commitPartNodes = partitionAssignment(node, new
TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
- log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].",
commitPartitionLeaseholder.name(), commitPartNodes);
+ log.info("Test: Commit partition [part={}, leaseholder={},
hostingNodes={}].", commitPartGrpId, commitPartitionLeaseholder.name(),
+ commitPartNodes);
view.upsert(tx, tuple);
@@ -578,17 +579,20 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
boolean[] cleanupAllowed = new boolean[1];
- commitPartitionLeaseholder.dropMessages((n, msg) -> {
+ // Cleanup may be triggered by the primary replica reelection as well.
+ runningNodes().filter(n ->
commitPartNodes.contains(n.name())).forEach(nd -> nd.dropMessages((n, msg) -> {
if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
cleanupStarted.complete(null);
- cleanupAllowedFut.join();
+ log.warn("Test: cleanup started.");
- return true;
+ joinWithTimeout(cleanupAllowedFut);
+
+ log.info("Test: cleanup resumed.");
}
return false;
- });
+ }));
Transaction roTxBefore = beginReadOnlyTx(anyNode());
@@ -596,6 +600,8 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+ log.info("Test: state replicated.");
+
assertThat(cleanupStarted, willCompleteSuccessfully());
// Wait for volatile tx state vacuum. This is possible because tx
finish is complete.
@@ -604,7 +610,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
log.info("Test: volatile state vacuumized");
- cleanupAllowedFut.complete(null);
+ assertTrue(cleanupAllowedFut.complete(null));
cleanupAllowed[0] = true;
@@ -614,11 +620,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
Transaction roTxAfter = beginReadOnlyTx(anyNode());
- waitForCondition(() -> {
- TxStateMeta txStateMeta = (TxStateMeta)
volatileTxState(commitPartitionLeaseholder, txId);
-
- return txStateMeta != null &&
txStateMeta.cleanupCompletionTimestamp() != null;
- }, 10_000);
+ waitForCleanupCompletion(commitPartNodes, txId);
log.info("Test: cleanup completed.");
@@ -689,13 +691,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
tx0.commitAsync();
- // Check that the final tx state COMMITTED is saved to the persistent
tx storage.
- assertTrue(waitForCondition(() -> cluster.runningNodes().filter(n ->
commitPartitionNodes.contains(n.name())).allMatch(n -> {
- TransactionMeta meta = persistentTxState(n, txId0, commitPartId);
-
- return meta != null && meta.txState() == COMMITTED;
- }), 10_000));
-
+ // Cleanup starts not earlier than the finish command is applied to
commit partition group.
assertThat(cleanupStarted, willCompleteSuccessfully());
// Stop the first transaction coordinator.
@@ -737,8 +733,6 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
*/
@Test
public void testRoReadTheCorrectDataInBetween() {
- setTxResourceTtl(0);
-
IgniteImpl node = anyNode();
String tableName = TABLE_NAME + "_1";
@@ -884,6 +878,28 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
assertTrue(r);
}
+ /**
+ * Wait for cleanup completion timestamp on any node of commit partition
group.
+ *
+ * @param commitPartitionNodeNames Node names of nodes in commit partition
group.
+ * @param txId Transaction id.
+ */
+ private void waitForCleanupCompletion(Set<String>
commitPartitionNodeNames, UUID txId) throws InterruptedException {
+ Set<IgniteImpl> commitPartitionNodes = runningNodes().filter(n ->
commitPartitionNodeNames.contains(n.name())).collect(toSet());
+
+ assertTrue(waitForCondition(() -> {
+ boolean res = false;
+
+ for (IgniteImpl node : commitPartitionNodes) {
+ TxStateMeta txStateMeta = (TxStateMeta) volatileTxState(node,
txId);
+
+ res = res || txStateMeta != null &&
txStateMeta.cleanupCompletionTimestamp() != null;
+ }
+
+ return res;
+ }, 10_000));
+ }
+
/**
* Assert that volatile (and if needed, persistent) state of the given tx
is vacuumized on all nodes of the cluster.
*
@@ -1051,4 +1067,14 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
.findFirst()
.get();
}
+
+ private void joinWithTimeout(CompletableFuture<?> future) {
+ future.orTimeout(60, TimeUnit.SECONDS)
+ .exceptionally(e -> {
+ log.error("Could not wait for the future.", e);
+
+ return null;
+ })
+ .join();
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 56f1813a78..1df76b7744 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -726,7 +726,9 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
commit ? COMMITTED : ABORTED,
old == null ? null : old.txCoordinatorId(),
old == null ? partId : old.commitPartitionId(),
- commit ? commitTimestamp : null
+ commit ? commitTimestamp : null,
+ old == null ? null : old.initialVacuumObservationTimestamp(),
+ old == null ? null : old.cleanupCompletionTimestamp()
));
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index aa3aab5613..52fef603f9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -3922,7 +3922,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
txState,
old == null ? null : old.txCoordinatorId(),
old == null ? null : old.commitPartitionId(),
- txState == COMMITTED ? commitTimestamp : null
+ txState == COMMITTED ? commitTimestamp : null,
+ old == null ? null : old.initialVacuumObservationTimestamp(),
+ old == null ? null : old.cleanupCompletionTimestamp()
));
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
index f73e9c9021..1676be8d35 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
@@ -57,12 +57,12 @@ public class FinishedTransactionBatchRequestHandler {
public void start() {
messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender,
correlationId) -> {
if (msg instanceof FinishedTransactionsBatchMessage) {
- processTxCleanup((FinishedTransactionsBatchMessage) msg);
+
processFinishedTransactionsBatchMessage((FinishedTransactionsBatchMessage) msg);
}
});
}
- private void processTxCleanup(FinishedTransactionsBatchMessage
closeCursorsMessage) {
+ private void
processFinishedTransactionsBatchMessage(FinishedTransactionsBatchMessage
closeCursorsMessage) {
asyncExecutor.execute(() ->
closeCursorsMessage.transactions().forEach(resourcesRegistry::close));
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index 7af5d71a0e..9796727c2c 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -139,7 +139,7 @@ public class TxCleanupRequestHandler {
if (ex == null) {
msg = prepareResponse();
} else {
- msg = prepareErrorResponse(ex);
+ msg = prepareErrorResponse(txCleanupMessage.txId(),
ex);
// Run durable cleanup for the partitions that we
failed to cleanup properly.
// No need to wait on this future.
@@ -178,9 +178,10 @@ public class TxCleanupRequestHandler {
.build();
}
- private NetworkMessage prepareErrorResponse(Throwable th) {
+ private NetworkMessage prepareErrorResponse(UUID txId, Throwable th) {
return FACTORY
.txCleanupMessageErrorResponse()
+ .txId(txId)
.throwable(th)
.timestampLong(clockService.nowLong())
.build();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 7c43ebf3e7..dcf97ff5da 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -32,12 +32,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.util.CompletableFutures;
@@ -47,6 +50,9 @@ import org.jetbrains.annotations.Nullable;
* Sends TX Cleanup request.
*/
public class TxCleanupRequestSender {
+ /** Logger. */
+ private final IgniteLogger log =
Loggers.forClass(TxCleanupRequestSender.class);
+
/** Placement driver helper. */
private final PlacementDriverHelper placementDriverHelper;
@@ -86,6 +92,12 @@ public class TxCleanupRequestSender {
if (result != null) {
onCleanupReplicated(result);
}
+
+ if (msg instanceof TxCleanupMessageErrorResponse) {
+ TxCleanupMessageErrorResponse response =
(TxCleanupMessageErrorResponse) msg;
+
+ log.warn("Exception happened during transaction cleanup
[txId={}].", response.throwable(), response.txId());
+ }
}
});
}
@@ -168,6 +180,9 @@ public class TxCleanupRequestSender {
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
+ // Start tracking the partitions we want to learn the replication
confirmation from.
+ writeIntentsReplicated.put(txId, new CleanupContext(new
HashSet<>(partitionIds), commit ? TxState.COMMITTED : TxState.ABORTED));
+
return placementDriverHelper.findPrimaryReplicas(partitionIds)
.thenCompose(partitionData -> {
cleanupPartitionsWithoutPrimary(commit, commitTimestamp,
txId, partitionData.partitionsWithoutPrimary);
@@ -257,7 +272,7 @@ public class TxCleanupRequestSender {
*/
private final TxState txState;
- public CleanupContext(Set<TablePartitionId> partitions, TxState
txState) {
+ private CleanupContext(Set<TablePartitionId> partitions, TxState
txState) {
this.partitions = partitions;
this.txState = txState;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
index 5be7788fa3..8539742de1 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.tx.message;
+import java.util.UUID;
import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
@@ -25,6 +26,13 @@ import
org.apache.ignite.internal.network.annotations.Transferable;
*/
@Transferable(TxMessageGroup.TX_CLEANUP_MSG_ERR_RESPONSE)
public interface TxCleanupMessageErrorResponse extends
TxCleanupMessageResponse {
+ /**
+ * Transaction id.
+ *
+ * @return Transaction id.
+ */
+ UUID txId();
+
/**
* Returns a {@link Throwable} that was thrown during handling a lock
release message.
*