This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eb5651679b IGNITE-22288 Fix the vacuum of persistent tx state before 
tx cleanup completion (#3799)
eb5651679b is described below

commit eb5651679ba708b9f01ef34d9b1683b49dae22dd
Author: Denis Chudov <[email protected]>
AuthorDate: Thu May 30 09:29:55 2024 +0300

    IGNITE-22288 Fix the vacuum of persistent tx state before tx cleanup 
completion (#3799)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |  4 +-
 .../internal/table/ItTxResourcesVacuumTest.java    |  1 +
 .../replicator/PartitionReplicaListener.java       | 14 +++--
 .../replication/PartitionReplicaListenerTest.java  |  8 +--
 .../org/apache/ignite/internal/tx/TxManager.java   |  7 ++-
 .../tx/impl/PersistentTxStateVacuumizer.java       | 63 +++++++++++++++++++---
 .../internal/tx/impl/TxCleanupRequestSender.java   | 48 +++++++++++------
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 10 ++--
 .../tx/impl/VolatileTxStateMetaStorage.java        | 30 +++++++----
 .../apache/ignite/internal/tx/TxCleanupTest.java   |  6 +--
 10 files changed, 140 insertions(+), 51 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 53af23ec3d..dc03d55bf6 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -190,6 +190,7 @@ public class FakeTxManager implements TxManager {
 
     @Override
     public CompletableFuture<Void> cleanup(
+            TablePartitionId commitPartitionId,
             Map<TablePartitionId, String> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -200,6 +201,7 @@ public class FakeTxManager implements TxManager {
 
     @Override
     public CompletableFuture<Void> cleanup(
+            TablePartitionId commitPartitionId,
             Collection<TablePartitionId> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -209,7 +211,7 @@ public class FakeTxManager implements TxManager {
     }
 
     @Override
-    public CompletableFuture<Void> cleanup(String node, UUID txId) {
+    public CompletableFuture<Void> cleanup(TablePartitionId commitPartitionId, 
String node, UUID txId) {
         return nullCompletedFuture();
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index cd40ce6168..56ac729434 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -800,6 +800,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         if (expected == null) {
             assertNull(actual);
         } else {
+            assertNotNull(actual);
             assertEquals(expected.stringValue("val"), 
actual.stringValue("val"));
         }
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index f2b1e092b7..b38ddbc09c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -421,6 +421,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
 
                 txManager.cleanup(
+                        replicationGroupId,
                         txMeta.enlistedPartitions(),
                         txMeta.txState() == COMMITTED,
                         txMeta.commitTimestamp(),
@@ -507,7 +508,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         // Check whether a transaction has already been finished.
         if (txMeta != null && isFinalState(txMeta.txState())) {
-            return runCleanupOnNode(txId, senderId);
+            // Tx recovery message is processed on the commit partition.
+            return runCleanupOnNode(replicationGroupId, txId, senderId);
         }
 
         LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", 
txId, txMeta);
@@ -518,14 +520,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /**
      * Run cleanup on a node.
      *
+     * @param commitPartitionId Commit partition id.
      * @param txId Transaction id.
      * @param nodeId Node id (inconsistent).
      */
-    private CompletableFuture<Void> runCleanupOnNode(UUID txId, String nodeId) 
{
+    private CompletableFuture<Void> runCleanupOnNode(TablePartitionId 
commitPartitionId, UUID txId, String nodeId) {
         // Get node id of the sender to send back cleanup requests.
         String nodeConsistentId = 
clusterNodeResolver.getConsistentIdById(nodeId);
 
-        return nodeConsistentId == null ? nullCompletedFuture() : 
txManager.cleanup(nodeConsistentId, txId);
+        return nodeConsistentId == null ? nullCompletedFuture() : 
txManager.cleanup(commitPartitionId, nodeConsistentId, txId);
     }
 
     /**
@@ -540,13 +543,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         // is sent in a common durable manner to a partition that have 
initiated recovery.
         return txManager.finish(
                         new HybridTimestampTracker(),
+                        // Tx recovery is executed on the commit partition.
                         replicationGroupId,
                         false,
                         // Enlistment consistency token is not required for 
the rollback, so it is 0L.
                         Map.of(replicationGroupId, new 
IgniteBiTuple<>(clusterNodeResolver.getById(senderId), 0L)),
                         txId
                 )
-                .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId));
+                .whenComplete((v, ex) -> runCleanupOnNode(replicationGroupId, 
txId, senderId));
     }
 
     /**
@@ -1694,7 +1698,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         return finishTransaction(enlistedPartitions.keySet(), txId, commit, 
commitTimestamp)
                 .thenCompose(txResult ->
-                        txManager.cleanup(enlistedPartitions, commit, 
commitTimestamp, txId)
+                        txManager.cleanup(replicationGroupId, 
enlistedPartitions, commit, commitTimestamp, txId)
                                 .thenApply(v -> txResult)
                 );
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 015285ba95..73ec92e0d4 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -1669,7 +1669,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<?> beginAndAbortTx() {
-        when(txManager.cleanup(any(Map.class), anyBoolean(), any(), 
any())).thenReturn(nullCompletedFuture());
+        when(txManager.cleanup(any(), any(Map.class), anyBoolean(), any(), 
any())).thenReturn(nullCompletedFuture());
 
         HybridTimestamp beginTimestamp = clock.now();
         UUID txId = transactionIdFor(beginTimestamp);
@@ -1731,7 +1731,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<?> beginAndCommitTx() {
-        when(txManager.cleanup(any(Map.class), anyBoolean(), any(), 
any())).thenReturn(nullCompletedFuture());
+        when(txManager.cleanup(any(), any(Map.class), anyBoolean(), any(), 
any())).thenReturn(nullCompletedFuture());
 
         HybridTimestamp beginTimestamp = clock.now();
         UUID txId = transactionIdFor(beginTimestamp);
@@ -2072,7 +2072,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
 
         doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).finish(any(), any(), anyBoolean(), 
any(), any());
-        doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).cleanup(anyString(), any());
+        doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).cleanup(any(), anyString(), any());
     }
 
     private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType 
requestType, RwListenerInvocation listenerInvocation) {
@@ -2473,7 +2473,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .thenReturn(List.of(
                         tableSchema(CURRENT_SCHEMA_VERSION, 
List.of(nullableColumn("col")))
                 ));
-        when(txManager.cleanup(any(Map.class), anyBoolean(), any(), 
any())).thenReturn(nullCompletedFuture());
+        when(txManager.cleanup(any(), any(Map.class), anyBoolean(), any(), 
any())).thenReturn(nullCompletedFuture());
 
         AtomicReference<Boolean> committed = interceptFinishTxCommand();
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index badc077651..de59c157a4 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -145,6 +145,7 @@ public interface TxManager extends IgniteComponent {
      *
      * <p>The nodes to send the request to are taken from the mapping 
`partition id -> partition primary`.
      *
+     * @param commitPartitionId Commit partition id.
      * @param enlistedPartitions Map of partition groups to their primary 
nodes.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -152,6 +153,7 @@ public interface TxManager extends IgniteComponent {
      * @return Completable future of Void.
      */
     CompletableFuture<Void> cleanup(
+            TablePartitionId commitPartitionId,
             Map<TablePartitionId, String> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -163,6 +165,7 @@ public interface TxManager extends IgniteComponent {
      *
      * <p>The nodes to sends the request to are calculated by the placement 
driver.
      *
+     * @param commitPartitionId Commit partition id.
      * @param enlistedPartitions Enlisted partition groups.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -170,6 +173,7 @@ public interface TxManager extends IgniteComponent {
      * @return Completable future of Void.
      */
     CompletableFuture<Void> cleanup(
+            TablePartitionId commitPartitionId,
             Collection<TablePartitionId> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -179,11 +183,12 @@ public interface TxManager extends IgniteComponent {
     /**
      * Sends cleanup request to the nodes than initiated recovery.
      *
+     * @param commitPartitionId Commit partition id.
      * @param node Target node.
      * @param txId Transaction id.
      * @return Completable future of Void.
      */
-    CompletableFuture<Void> cleanup(String node, UUID txId);
+    CompletableFuture<Void> cleanup(TablePartitionId commitPartitionId, String 
node, UUID txId);
 
     /**
      * Locally vacuums no longer needed transactional resources, like txnState 
both persistent and volatile.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
index 52beab9d7d..4ef589a24f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -17,17 +17,20 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.util.CompletableFutures.allOf;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -39,6 +42,7 @@ import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissExcepti
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Implements the logic of persistent tx states vacuum.
@@ -79,12 +83,13 @@ public class PersistentTxStateVacuumizer {
     /**
      * Vacuum persistent tx states.
      *
-     * @param txIds Transaction ids to vacuum; map of commit partition ids to 
sets of tx ids.
-     * @return A future, result is the set of successfully vacuumized txn 
states.
+     * @param txIds Transaction ids to vacuum; map of commit partition ids to 
sets of {@link VacuumizableTx}.
+     * @return A future, result is the set of successfully processed txn 
states and count of persistent states that were vacuumized.
      */
-    public CompletableFuture<Set<UUID>> 
vacuumPersistentTxStates(Map<TablePartitionId, Set<UUID>> txIds) {
+    public CompletableFuture<PersistentTxStateVacuumResult> 
vacuumPersistentTxStates(Map<TablePartitionId, Set<VacuumizableTx>> txIds) {
         Set<UUID> successful = ConcurrentHashMap.newKeySet();
         List<CompletableFuture<?>> futures = new ArrayList<>();
+        AtomicInteger vacuumizedPersistentTxnStatesCount = new AtomicInteger();
         HybridTimestamp now = clockService.now();
 
         txIds.forEach((commitPartitionId, txs) -> {
@@ -95,15 +100,29 @@ public class PersistentTxStateVacuumizer {
                         // timestamp) would be updated there, and then this 
operation would be called from there.
                         // Also, we are going to send the vacuum request only 
to the local node.
                         if (replicaMeta != null && 
localNode.id().equals(replicaMeta.getLeaseholderId())) {
+                            // Filter txns to define those which persistent 
states can be vacuumized.
+                            Set<UUID> filteredTxIds = new HashSet<>();
+
+                            for (VacuumizableTx v : txs) {
+                                if (v.cleanupCompletionTimestamp == null) {
+                                    // If there is no cleanup completion 
timestamp, add the tx id to successful set to remove the
+                                    // volatile state. Persistent state should 
be preserved until tx cleanup is complete.
+                                    successful.add(v.txId);
+                                } else {
+                                    filteredTxIds.add(v.txId);
+                                }
+                            }
+
                             VacuumTxStateReplicaRequest request = 
TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
                                     
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
                                     .groupId(commitPartitionId)
-                                    .transactionIds(txs)
+                                    .transactionIds(filteredTxIds)
                                     .build();
 
                             return replicaService.invoke(localNode, 
request).whenComplete((v, e) -> {
                                 if (e == null) {
-                                    successful.addAll(txs);
+                                    successful.addAll(filteredTxIds);
+                                    
vacuumizedPersistentTxnStatesCount.addAndGet(filteredTxIds.size());
                                     // We can log the exceptions without 
further handling because failed requests' txns are not added
                                     // to the set of successful and will be 
retried. PrimaryReplicaMissException can be considered as
                                     // a part of regular flow and doesn't need 
to be logged.
@@ -114,7 +133,7 @@ public class PersistentTxStateVacuumizer {
                                 }
                             });
                         } else {
-                            successful.addAll(txs);
+                            successful.addAll(txs.stream().map(v -> 
v.txId).collect(toSet()));
 
                             return nullCompletedFuture();
                         }
@@ -124,6 +143,36 @@ public class PersistentTxStateVacuumizer {
         });
 
         return allOf(futures.toArray(new CompletableFuture[0]))
-                .handle((unused, unusedEx) -> successful);
+                .handle((unused, unusedEx) -> new 
PersistentTxStateVacuumResult(successful, 
vacuumizedPersistentTxnStatesCount.get()));
+    }
+
+    /**
+     * Class representing the vaccumizable tx context, actually a pair of the 
tx id and the cleanup completion timestamp.
+     */
+    public static class VacuumizableTx {
+        final UUID txId;
+
+        @Nullable
+        final Long cleanupCompletionTimestamp;
+
+        VacuumizableTx(UUID txId, @Nullable Long cleanupCompletionTimestamp) {
+            this.txId = txId;
+            this.cleanupCompletionTimestamp = cleanupCompletionTimestamp;
+        }
+    }
+
+    /**
+     * Result of the persistent tx state vacuum operation.
+     */
+    public static class PersistentTxStateVacuumResult {
+        /** Transaction IDs that are processed by the persistent vacuumizer 
and can be removed from volatile storage. */
+        final Set<UUID> txnsToVacuum;
+
+        final int vacuumizedPersistentTxnStatesCount;
+
+        public PersistentTxStateVacuumResult(Set<UUID> txnsToVacuum, int 
vacuumizedPersistentTxnStatesCount) {
+            this.txnsToVacuum = txnsToVacuum;
+            this.vacuumizedPersistentTxnStatesCount = 
vacuumizedPersistentTxnStatesCount;
+        }
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 82cfee4fab..3e434e9b28 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -110,19 +110,19 @@ public class TxCleanupRequestSender {
         });
 
         if (ctx != null && ctx.partitions.isEmpty()) {
-            markTxnCleanupReplicated(info.txId(), ctx.txState);
+            markTxnCleanupReplicated(info.txId(), ctx.txState, 
ctx.commitPartitionId);
 
             writeIntentsReplicated.remove(info.txId());
         }
     }
 
-    private void markTxnCleanupReplicated(UUID txId, TxState state) {
+    private void markTxnCleanupReplicated(UUID txId, TxState state, 
TablePartitionId commitPartitionId) {
         long cleanupCompletionTimestamp = System.currentTimeMillis();
 
         txStateVolatileStorage.updateMeta(txId, oldMeta ->
                 new TxStateMeta(oldMeta == null ? state : oldMeta.txState(),
                         oldMeta == null ? null : oldMeta.txCoordinatorId(),
-                        oldMeta == null ? null : oldMeta.commitPartitionId(),
+                        commitPartitionId,
                         oldMeta == null ? null : oldMeta.commitTimestamp(),
                         oldMeta == null ? null : 
oldMeta.initialVacuumObservationTimestamp(),
                         cleanupCompletionTimestamp)
@@ -132,17 +132,19 @@ public class TxCleanupRequestSender {
     /**
      * Sends unlock request to the nodes than initiated recovery.
      *
+     * @param commitPartitionId Commit partition id.
      * @param node Target node.
      * @param txId Transaction id.
      * @return Completable future of Void.
      */
-    public CompletableFuture<Void> cleanup(String node, UUID txId) {
-        return sendCleanupMessageWithRetries(false, null, txId, node, null);
+    public CompletableFuture<Void> cleanup(TablePartitionId commitPartitionId, 
String node, UUID txId) {
+        return sendCleanupMessageWithRetries(commitPartitionId, false, null, 
txId, node, null);
     }
 
     /**
      * Sends cleanup request to the primary nodes of each one of {@code 
partitions}.
      *
+     * @param commitPartitionId Commit partition id.
      * @param enlistedPartitions Map of enlisted partition group to the 
initial primary node.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -150,24 +152,29 @@ public class TxCleanupRequestSender {
      * @return Completable future of Void.
      */
     public CompletableFuture<Void> cleanup(
+            TablePartitionId commitPartitionId,
             Map<TablePartitionId, String> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
         // Start tracking the partitions we want to learn the replication 
confirmation from.
-        writeIntentsReplicated.put(txId, new 
CleanupContext(enlistedPartitions.keySet(), commit ? TxState.COMMITTED : 
TxState.ABORTED));
+        writeIntentsReplicated.put(
+                txId,
+                new CleanupContext(commitPartitionId, 
enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED)
+        );
 
         Map<String, Set<TablePartitionId>> partitions = new HashMap<>();
         enlistedPartitions.forEach((partitionId, nodeId) ->
                 partitions.computeIfAbsent(nodeId, node -> new 
HashSet<>()).add(partitionId));
 
-        return cleanupPartitions(partitions, commit, commitTimestamp, txId);
+        return cleanupPartitions(commitPartitionId, partitions, commit, 
commitTimestamp, txId);
     }
 
     /**
      * Gets primary nodes for each of the provided {@code partitions} and 
sends cleanup request to each one.
      *
+     * @param commitPartitionId Commit partition id.
      * @param partitionIds Collection of enlisted partition groups.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -175,23 +182,29 @@ public class TxCleanupRequestSender {
      * @return Completable future of Void.
      */
     public CompletableFuture<Void> cleanup(
+            TablePartitionId commitPartitionId,
             Collection<TablePartitionId> partitionIds,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
         // Start tracking the partitions we want to learn the replication 
confirmation from.
-        writeIntentsReplicated.put(txId, new CleanupContext(new 
HashSet<>(partitionIds), commit ? TxState.COMMITTED : TxState.ABORTED));
+        writeIntentsReplicated.put(
+                txId,
+                new CleanupContext(commitPartitionId, new 
HashSet<>(partitionIds), commit ? TxState.COMMITTED : TxState.ABORTED)
+        );
 
         return placementDriverHelper.findPrimaryReplicas(partitionIds)
                 .thenCompose(partitionData -> {
-                    cleanupPartitionsWithoutPrimary(commit, commitTimestamp, 
txId, partitionData.partitionsWithoutPrimary);
+                    cleanupPartitionsWithoutPrimary(commitPartitionId, commit, 
commitTimestamp, txId,
+                            partitionData.partitionsWithoutPrimary);
 
-                    return cleanupPartitions(partitionData.partitionsByNode, 
commit, commitTimestamp, txId);
+                    return cleanupPartitions(commitPartitionId, 
partitionData.partitionsByNode, commit, commitTimestamp, txId);
                 });
     }
 
     private void cleanupPartitionsWithoutPrimary(
+            TablePartitionId commitPartitionId,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
@@ -200,10 +213,11 @@ public class TxCleanupRequestSender {
         // For the partitions without primary, we need to wait until a new 
primary is found.
         // Then we can proceed with the common cleanup flow.
         placementDriverHelper.awaitPrimaryReplicas(noPrimaryFound)
-                .thenCompose(partitionsByNode -> 
cleanupPartitions(partitionsByNode, commit, commitTimestamp, txId));
+                .thenCompose(partitionsByNode -> 
cleanupPartitions(commitPartitionId, partitionsByNode, commit, commitTimestamp, 
txId));
     }
 
     private CompletableFuture<Void> cleanupPartitions(
+            TablePartitionId commitPartitionId,
             Map<String, Set<TablePartitionId>> partitionsByNode,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -215,13 +229,14 @@ public class TxCleanupRequestSender {
             String node = entry.getKey();
             Set<TablePartitionId> nodePartitions = entry.getValue();
 
-            cleanupFutures.add(sendCleanupMessageWithRetries(commit, 
commitTimestamp, txId, node, nodePartitions));
+            
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit, 
commitTimestamp, txId, node, nodePartitions));
         }
 
         return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
     }
 
     private CompletableFuture<Void> sendCleanupMessageWithRetries(
+            TablePartitionId commitPartitionId,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
@@ -244,12 +259,12 @@ public class TxCleanupRequestSender {
                             if (partitions == null) {
                                 // If we don't have any partition, which is 
the recovery case,
                                 // just try again with the same node.
-                                return sendCleanupMessageWithRetries(commit, 
commitTimestamp, txId, node, partitions);
+                                return 
sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, 
node, partitions);
                             }
 
                             // Run a cleanup that finds new primaries for the 
given partitions.
                             // This covers the case when a partition primary 
died and we still want to switch write intents.
-                            return cleanup(partitions, commit, 
commitTimestamp, txId);
+                            return cleanup(commitPartitionId, partitions, 
commit, commitTimestamp, txId);
                         }
 
                         return CompletableFuture.<Void>failedFuture(throwable);
@@ -261,6 +276,7 @@ public class TxCleanupRequestSender {
     }
 
     private static class CleanupContext {
+        private final TablePartitionId commitPartitionId;
 
         /**
          * The partitions the we have not received write intent replication 
confirmation for.
@@ -272,9 +288,9 @@ public class TxCleanupRequestSender {
          */
         private final TxState txState;
 
-        private CleanupContext(Set<TablePartitionId> partitions, TxState 
txState) {
+        private CleanupContext(TablePartitionId commitPartitionId, 
Set<TablePartitionId> partitions, TxState txState) {
+            this.commitPartitionId = commitPartitionId;
             this.partitions = partitions;
-
             this.txState = txState;
         }
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index e3ee44521b..077920549f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -822,27 +822,29 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
     @Override
     public CompletableFuture<Void> cleanup(
+            TablePartitionId commitPartitionId,
             Map<TablePartitionId, String> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
-        return txCleanupRequestSender.cleanup(enlistedPartitions, commit, 
commitTimestamp, txId);
+        return txCleanupRequestSender.cleanup(commitPartitionId, 
enlistedPartitions, commit, commitTimestamp, txId);
     }
 
     @Override
     public CompletableFuture<Void> cleanup(
+            TablePartitionId commitPartitionId,
             Collection<TablePartitionId> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
-        return txCleanupRequestSender.cleanup(enlistedPartitions, commit, 
commitTimestamp, txId);
+        return txCleanupRequestSender.cleanup(commitPartitionId, 
enlistedPartitions, commit, commitTimestamp, txId);
     }
 
     @Override
-    public CompletableFuture<Void> cleanup(String node, UUID txId) {
-        return txCleanupRequestSender.cleanup(node, txId);
+    public CompletableFuture<Void> cleanup(TablePartitionId commitPartitionId, 
String node, UUID txId) {
+        return txCleanupRequestSender.cleanup(commitPartitionId, node, txId);
     }
 
     @Override
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
index d0e65a9e43..49ec7bc2da 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
+import 
org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.PersistentTxStateVacuumResult;
+import 
org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VacuumizableTx;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -115,11 +117,19 @@ public class VolatileTxStateMetaStorage {
     }
 
     /**
-     * Locally vacuums no longer needed transactional resource.
+     * Locally vacuums no longer needed transactional resource. Also calls 
{@code persistentVacuumOp} to vacuum some persistent states.
      * For each finished (COMMITTED or ABORTED) transactions:
      * <ol>
-     *     <li> Removes it from the volatile storage if txnResourcesTTL == 0 
or if
-     *     txnState.initialVacuumObservationTimestamp + txnResourcesTTL < 
vacuumObservationTimestamp.</li>
+     *     <li> Takes every suitable txn state from the volatile storage if 
txnResourcesTTL == 0 or if
+     *     max(txnState.initialVacuumObservationTimestamp, 
txnState.cleanupCompletionTimestamp) + txnResourcesTTL <
+     *     vacuumObservationTimestamp. If txnState.cleanupCompletionTimestamp 
is {@code null}, then only
+     *     txnState.initialVacuumObservationTimestamp is used.</li>
+     *     <li>If txnState.commitPartitionId is {@code null}, then only 
volatile state is vacuumized, if not {@code null} this
+     *     means we are probably on commit partition and this txnState should 
be passed to {@code persistentVacuumOp} to vacuumize the
+     *     persistent state as well. Only after such txn states are processed 
by {@code persistentVacuumOp} we can remove the volatile
+     *     txn state. Only states which are marked by {@code 
persistentVacuumOp} as sucessfully vacuumized and
+     *     {@code cleanupCompletionTimestamp} is the same that was passed to 
{@code persistentVacuumOp} can be removed, to prevent
+     *     races.</li>
      *     <li>Updates txnState.initialVacuumObservationTimestamp by setting 
it to vacuumObservationTimestamp
      *     if it's not already initialized.</li>
      * </ol>
@@ -133,7 +143,7 @@ public class VolatileTxStateMetaStorage {
     public CompletableFuture<Void> vacuum(
             long vacuumObservationTimestamp,
             long txnResourceTtl,
-            Function<Map<TablePartitionId, Set<UUID>>, 
CompletableFuture<Set<UUID>>> persistentVacuumOp
+            Function<Map<TablePartitionId, Set<VacuumizableTx>>, 
CompletableFuture<PersistentTxStateVacuumResult>> persistentVacuumOp
     ) {
         LOG.info("Vacuum started [vacuumObservationTimestamp={}, 
txnResourceTtl={}].", vacuumObservationTimestamp, txnResourceTtl);
 
@@ -142,7 +152,7 @@ public class VolatileTxStateMetaStorage {
         AtomicInteger alreadyMarkedTxnsCount = new AtomicInteger(0);
         AtomicInteger skippedForFurtherProcessingUnfinishedTxnsCount = new 
AtomicInteger(0);
 
-        Map<TablePartitionId, Set<UUID>> txIds = new HashMap<>();
+        Map<TablePartitionId, Set<VacuumizableTx>> txIds = new HashMap<>();
         Map<UUID, Long> cleanupCompletionTimestamps = new HashMap<>();
 
         txStateMap.forEach((txId, meta) -> {
@@ -166,8 +176,8 @@ public class VolatileTxStateMetaStorage {
 
                                 return null;
                             } else {
-                                Set<UUID> ids = 
txIds.computeIfAbsent(meta0.commitPartitionId(), k -> new HashSet<>());
-                                ids.add(txId);
+                                Set<VacuumizableTx> ids = 
txIds.computeIfAbsent(meta0.commitPartitionId(), k -> new HashSet<>());
+                                ids.add(new VacuumizableTx(txId, 
cleanupCompletionTimestamp));
 
                                 if (cleanupCompletionTimestamp != null) {
                                     cleanupCompletionTimestamps.put(txId, 
cleanupCompletionTimestamp);
@@ -189,8 +199,8 @@ public class VolatileTxStateMetaStorage {
         });
 
         return persistentVacuumOp.apply(txIds)
-                .thenAccept(successful -> {
-                    for (UUID txId : successful) {
+                .thenAccept(vacuumResult -> {
+                    for (UUID txId : vacuumResult.txnsToVacuum) {
                         txStateMap.compute(txId, (k, v) -> {
                             if (v == null) {
                                 return null;
@@ -218,7 +228,7 @@ public class VolatileTxStateMetaStorage {
                             vacuumObservationTimestamp,
                             txnResourceTtl,
                             vacuumizedTxnsCount,
-                            successful.size(),
+                            vacuumResult.vacuumizedPersistentTxnStatesCount,
                             markedAsInitiallyDetectedTxnsCount,
                             alreadyMarkedTxnsCount,
                             skippedForFurtherProcessingUnfinishedTxnsCount
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index 918f849558..bee98ad32e 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -146,7 +146,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
 
         HybridTimestamp commitTimestamp = clock.now();
 
-        CompletableFuture<Void> cleanup = 
cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId);
+        CompletableFuture<Void> cleanup = 
cleanupRequestSender.cleanup(tablePartitionId1, partitions, true, 
commitTimestamp, txId);
 
         assertThat(cleanup, willCompleteSuccessfully());
 
@@ -182,7 +182,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
 
         HybridTimestamp commitTimestamp = clock.now();
 
-        CompletableFuture<Void> cleanup = 
cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId);
+        CompletableFuture<Void> cleanup = 
cleanupRequestSender.cleanup(tablePartitionId1, partitions, true, 
commitTimestamp, txId);
 
         assertThat(cleanup, willCompleteSuccessfully());
 
@@ -216,7 +216,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
 
         HybridTimestamp commitTimestamp = clock.now();
 
-        CompletableFuture<Void> cleanup = 
cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId);
+        CompletableFuture<Void> cleanup = 
cleanupRequestSender.cleanup(tablePartitionId1, partitions, true, 
commitTimestamp, txId);
 
         assertThat(cleanup, willCompleteSuccessfully());
 


Reply via email to