This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e0424de400 IGNITE-22147 Fixed flaky 
ItTxResourcesVacuumTest.testRecoveryAfterPersistentStateVacuumized (#3689)
e0424de400 is described below

commit e0424de400770c2b669f6faf5e0ff019c5934189
Author: Denis Chudov <[email protected]>
AuthorDate: Fri May 10 13:30:53 2024 +0300

    IGNITE-22147 Fixed flaky 
ItTxResourcesVacuumTest.testRecoveryAfterPersistentStateVacuumized (#3689)
---
 .../internal/table/ItTxResourcesVacuumTest.java    | 84 ++++++++++++++--------
 .../table/distributed/raft/PartitionListener.java  |  4 +-
 .../replicator/PartitionReplicaListener.java       |  4 +-
 .../FinishedTransactionBatchRequestHandler.java    |  4 +-
 .../internal/tx/impl/TxCleanupRequestHandler.java  |  5 +-
 .../internal/tx/impl/TxCleanupRequestSender.java   | 17 ++++-
 .../tx/message/TxCleanupMessageErrorResponse.java  |  8 +++
 7 files changed, 90 insertions(+), 36 deletions(-)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index ccb91a867b..95f2408f3f 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -24,7 +24,6 @@ import static 
org.apache.ignite.internal.table.NodeUtils.transferPrimary;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static 
org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
 import static 
org.apache.ignite.internal.tx.test.ItTransactionTestUtils.findTupleToBeHostedOnNode;
@@ -229,7 +228,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
                 if (finishRequest.txId().equals(txId)) {
                     finishStartedFuture.complete(null);
 
-                    finishAllowedFuture.join();
+                    joinWithTimeout(finishAllowedFuture);
                 }
             }
 
@@ -250,7 +249,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         // Check that the volatile state of the transaction is preserved.
         assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
 
-        finishAllowedFuture.complete(null);
+        assertTrue(finishAllowedFuture.complete(null));
 
         assertThat(commitFut, willCompleteSuccessfully());
 
@@ -382,7 +381,8 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         Set<String> commitPartNodes = partitionAssignment(node, new 
TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
 
-        log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", 
commitPartitionLeaseholder.name(), commitPartNodes);
+        log.info("Test: Commit partition [part={}, leaseholder={}, 
hostingNodes={}].", commitPartGrpId, commitPartitionLeaseholder.name(),
+                commitPartNodes);
 
         // Some node that does not host the commit partition, will be the 
primary node for upserting another tuple.
         IgniteImpl leaseholderForAnotherTuple = findNode(n -> 
!commitPartNodes.contains(n.name()));
@@ -405,7 +405,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
                 log.info("Test: cleanup started.");
 
                 if (commitPartNodes.contains(n)) {
-                    cleanupAllowed.join();
+                    joinWithTimeout(cleanupAllowed);
                 }
             }
 
@@ -425,13 +425,13 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         assertTxStateVacuumized(Set.of(leaseholderForAnotherTuple.name()), 
txId, commitPartId, false);
 
         // Unblocking cleanup.
-        cleanupAllowed.complete(null);
+        assertTrue(cleanupAllowed.complete(null));
 
         assertThat(commitFut, willCompleteSuccessfully());
 
         Transaction roTxAfter = beginReadOnlyTx(anyNode());
 
-        waitForCondition(() -> volatileTxState(commitPartitionLeaseholder, 
txId) != null, 10_000);
+        waitForCleanupCompletion(commitPartNodes, txId);
 
         triggerVacuum();
         assertTxStateVacuumized(txId, commitPartId, true);
@@ -498,7 +498,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
             if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
                 cleanupStarted.complete(null);
 
-                cleanupAllowedFut.join();
+                joinWithTimeout(cleanupAllowedFut);
             }
 
             return false;
@@ -512,7 +512,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         transferPrimary(cluster.runningNodes().collect(toSet()), 
commitPartGrpId, commitPartNodes::contains);
 
-        cleanupAllowedFut.complete(null);
+        assertTrue(cleanupAllowedFut.complete(null));
 
         cleanupAllowed[0] = true;
 
@@ -570,7 +570,8 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         Set<String> commitPartNodes = partitionAssignment(node, new 
TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
 
-        log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", 
commitPartitionLeaseholder.name(), commitPartNodes);
+        log.info("Test: Commit partition [part={}, leaseholder={}, 
hostingNodes={}].", commitPartGrpId, commitPartitionLeaseholder.name(),
+                commitPartNodes);
 
         view.upsert(tx, tuple);
 
@@ -578,17 +579,20 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
         boolean[] cleanupAllowed = new boolean[1];
 
-        commitPartitionLeaseholder.dropMessages((n, msg) -> {
+        // Cleanup may be triggered by the primary replica reelection as well.
+        runningNodes().filter(n -> 
commitPartNodes.contains(n.name())).forEach(nd -> nd.dropMessages((n, msg) -> {
             if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
                 cleanupStarted.complete(null);
 
-                cleanupAllowedFut.join();
+                log.warn("Test: cleanup started.");
 
-                return true;
+                joinWithTimeout(cleanupAllowedFut);
+
+                log.info("Test: cleanup resumed.");
             }
 
             return false;
-        });
+        }));
 
         Transaction roTxBefore = beginReadOnlyTx(anyNode());
 
@@ -596,6 +600,8 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
 
+        log.info("Test: state replicated.");
+
         assertThat(cleanupStarted, willCompleteSuccessfully());
 
         // Wait for volatile tx state vacuum. This is possible because tx 
finish is complete.
@@ -604,7 +610,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         log.info("Test: volatile state vacuumized");
 
-        cleanupAllowedFut.complete(null);
+        assertTrue(cleanupAllowedFut.complete(null));
 
         cleanupAllowed[0] = true;
 
@@ -614,11 +620,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         Transaction roTxAfter = beginReadOnlyTx(anyNode());
 
-        waitForCondition(() -> {
-            TxStateMeta txStateMeta = (TxStateMeta) 
volatileTxState(commitPartitionLeaseholder, txId);
-
-            return txStateMeta != null && 
txStateMeta.cleanupCompletionTimestamp() != null;
-        }, 10_000);
+        waitForCleanupCompletion(commitPartNodes, txId);
 
         log.info("Test: cleanup completed.");
 
@@ -689,13 +691,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         tx0.commitAsync();
 
-        // Check that the final tx state COMMITTED is saved to the persistent 
tx storage.
-        assertTrue(waitForCondition(() -> cluster.runningNodes().filter(n -> 
commitPartitionNodes.contains(n.name())).allMatch(n -> {
-            TransactionMeta meta = persistentTxState(n, txId0, commitPartId);
-
-            return meta != null && meta.txState() == COMMITTED;
-        }), 10_000));
-
+        // Cleanup starts not earlier than the finish command is applied to 
commit partition group.
         assertThat(cleanupStarted, willCompleteSuccessfully());
 
         // Stop the first transaction coordinator.
@@ -737,8 +733,6 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
      */
     @Test
     public void testRoReadTheCorrectDataInBetween() {
-        setTxResourceTtl(0);
-
         IgniteImpl node = anyNode();
 
         String tableName = TABLE_NAME + "_1";
@@ -884,6 +878,28 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         assertTrue(r);
     }
 
+    /**
+     * Wait for cleanup completion timestamp on any node of commit partition 
group.
+     *
+     * @param commitPartitionNodeNames Node names of nodes in commit partition 
group.
+     * @param txId Transaction id.
+     */
+    private void waitForCleanupCompletion(Set<String> 
commitPartitionNodeNames, UUID txId) throws InterruptedException {
+        Set<IgniteImpl> commitPartitionNodes = runningNodes().filter(n -> 
commitPartitionNodeNames.contains(n.name())).collect(toSet());
+
+        assertTrue(waitForCondition(() -> {
+            boolean res = false;
+
+            for (IgniteImpl node : commitPartitionNodes) {
+                TxStateMeta txStateMeta = (TxStateMeta) volatileTxState(node, 
txId);
+
+                res = res || txStateMeta != null && 
txStateMeta.cleanupCompletionTimestamp() != null;
+            }
+
+            return res;
+        }, 10_000));
+    }
+
     /**
      * Assert that volatile (and if needed, persistent) state of the given tx 
is vacuumized on all nodes of the cluster.
      *
@@ -1051,4 +1067,14 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
                 .findFirst()
                 .get();
     }
+
+    private void joinWithTimeout(CompletableFuture<?> future) {
+        future.orTimeout(60, TimeUnit.SECONDS)
+                .exceptionally(e -> {
+                    log.error("Could not wait for the future.", e);
+
+                    return null;
+                })
+                .join();
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 56f1813a78..1df76b7744 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -726,7 +726,9 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
                 commit ? COMMITTED : ABORTED,
                 old == null ? null : old.txCoordinatorId(),
                 old == null ? partId : old.commitPartitionId(),
-                commit ? commitTimestamp : null
+                commit ? commitTimestamp : null,
+                old == null ? null : old.initialVacuumObservationTimestamp(),
+                old == null ? null : old.cleanupCompletionTimestamp()
         ));
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index aa3aab5613..52fef603f9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -3922,7 +3922,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 txState,
                 old == null ? null : old.txCoordinatorId(),
                 old == null ? null : old.commitPartitionId(),
-                txState == COMMITTED ? commitTimestamp : null
+                txState == COMMITTED ? commitTimestamp : null,
+                old == null ? null : old.initialVacuumObservationTimestamp(),
+                old == null ? null : old.cleanupCompletionTimestamp()
         ));
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
index f73e9c9021..1676be8d35 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
@@ -57,12 +57,12 @@ public class FinishedTransactionBatchRequestHandler {
     public void start() {
         messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, 
correlationId) -> {
             if (msg instanceof FinishedTransactionsBatchMessage) {
-                processTxCleanup((FinishedTransactionsBatchMessage) msg);
+                
processFinishedTransactionsBatchMessage((FinishedTransactionsBatchMessage) msg);
             }
         });
     }
 
-    private void processTxCleanup(FinishedTransactionsBatchMessage 
closeCursorsMessage) {
+    private void 
processFinishedTransactionsBatchMessage(FinishedTransactionsBatchMessage 
closeCursorsMessage) {
         asyncExecutor.execute(() -> 
closeCursorsMessage.transactions().forEach(resourcesRegistry::close));
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index 7af5d71a0e..9796727c2c 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -139,7 +139,7 @@ public class TxCleanupRequestHandler {
                     if (ex == null) {
                         msg = prepareResponse();
                     } else {
-                        msg = prepareErrorResponse(ex);
+                        msg = prepareErrorResponse(txCleanupMessage.txId(), 
ex);
 
                         // Run durable cleanup for the partitions that we 
failed to cleanup properly.
                         // No need to wait on this future.
@@ -178,9 +178,10 @@ public class TxCleanupRequestHandler {
                 .build();
     }
 
-    private NetworkMessage prepareErrorResponse(Throwable th) {
+    private NetworkMessage prepareErrorResponse(UUID txId, Throwable th) {
         return FACTORY
                 .txCleanupMessageErrorResponse()
+                .txId(txId)
                 .throwable(th)
                 .timestampLong(clockService.nowLong())
                 .build();
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 7c43ebf3e7..dcf97ff5da 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -32,12 +32,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import 
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
 import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
 import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.util.CompletableFutures;
@@ -47,6 +50,9 @@ import org.jetbrains.annotations.Nullable;
  * Sends TX Cleanup request.
  */
 public class TxCleanupRequestSender {
+    /** Logger. */
+    private final IgniteLogger log = 
Loggers.forClass(TxCleanupRequestSender.class);
+
     /** Placement driver helper. */
     private final PlacementDriverHelper placementDriverHelper;
 
@@ -86,6 +92,12 @@ public class TxCleanupRequestSender {
                 if (result != null) {
                     onCleanupReplicated(result);
                 }
+
+                if (msg instanceof TxCleanupMessageErrorResponse) {
+                    TxCleanupMessageErrorResponse response = 
(TxCleanupMessageErrorResponse) msg;
+
+                    log.warn("Exception happened during transaction cleanup 
[txId={}].", response.throwable(), response.txId());
+                }
             }
         });
     }
@@ -168,6 +180,9 @@ public class TxCleanupRequestSender {
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
+        // Start tracking the partitions we want to learn the replication 
confirmation from.
+        writeIntentsReplicated.put(txId, new CleanupContext(new 
HashSet<>(partitionIds), commit ? TxState.COMMITTED : TxState.ABORTED));
+
         return placementDriverHelper.findPrimaryReplicas(partitionIds)
                 .thenCompose(partitionData -> {
                     cleanupPartitionsWithoutPrimary(commit, commitTimestamp, 
txId, partitionData.partitionsWithoutPrimary);
@@ -257,7 +272,7 @@ public class TxCleanupRequestSender {
          */
         private final TxState txState;
 
-        public CleanupContext(Set<TablePartitionId> partitions, TxState 
txState) {
+        private CleanupContext(Set<TablePartitionId> partitions, TxState 
txState) {
             this.partitions = partitions;
 
             this.txState = txState;
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
index 5be7788fa3..8539742de1 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.tx.message;
 
+import java.util.UUID;
 import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
 
@@ -25,6 +26,13 @@ import 
org.apache.ignite.internal.network.annotations.Transferable;
  */
 @Transferable(TxMessageGroup.TX_CLEANUP_MSG_ERR_RESPONSE)
 public interface TxCleanupMessageErrorResponse extends 
TxCleanupMessageResponse {
+    /**
+     * Transaction id.
+     *
+     * @return Transaction id.
+     */
+    UUID txId();
+
     /**
      * Returns a {@link Throwable} that was thrown during handling a lock 
release message.
      *

Reply via email to