This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f17393e231 IGNITE-21759 Txn state vacuum implemented. (#3469)
f17393e231 is described below

commit f17393e231d930342c59324b18b75529bd9823ba
Author: Alexander Lapin <[email protected]>
AuthorDate: Tue Mar 26 17:43:30 2024 +0200

    IGNITE-21759 Txn state vacuum implemented. (#3469)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |  5 ++
 .../rpc/impl/cli/GetLeaderRequestProcessor.java    |  1 -
 .../runner/app/ItIgniteNodeRestartTest.java        | 21 +++---
 .../org/apache/ignite/internal/app/IgniteImpl.java | 24 +++----
 .../exec/rel/TableScanNodeExecutionTest.java       | 10 ---
 .../apache/ignite/distributed/ItLockTableTest.java |  3 -
 ...xDistributedTestSingleNodeNoCleanupMessage.java |  3 -
 .../rebalance/ItRebalanceDistributedTest.java      | 10 ---
 .../ignite/internal/table/ItColocationTest.java    | 10 ---
 .../internal/table/ItTransactionRecoveryTest.java  |  4 +-
 .../apache/ignite/distributed/ItTxTestCluster.java | 61 +++++++++++-------
 .../table/impl/DummyInternalTableImpl.java         | 10 ---
 .../org/apache/ignite/internal/tx/TxManager.java   |  3 +
 .../org/apache/ignite/internal/tx/TxStateMeta.java | 15 +++--
 .../TransactionConfigurationSchema.java            |  6 ++
 .../impl/FinishedReadOnlyTransactionTracker.java   | 31 ++++++---
 ...anupManager.java => ResourceVacuumManager.java} | 74 ++++++++++++----------
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 16 ++---
 .../tx/impl/VolatileTxStateMetaStorage.java        | 65 +++++++++++++++++++
 .../apache/ignite/internal/tx/TxManagerTest.java   | 10 ---
 20 files changed, 225 insertions(+), 157 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 46e1d98a41..be9d5c2de7 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -200,6 +200,11 @@ public class FakeTxManager implements TxManager {
         return nullCompletedFuture();
     }
 
+    @Override
+    public void vacuum() {
+        // No-op.
+    }
+
     @Override
     public int finished() {
         return 0;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
index 70718b7604..1090447f81 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
@@ -92,7 +92,6 @@ public class GetLeaderRequestProcessor extends 
BaseCliRequestProcessor<GetLeader
             }
         }
         return RaftRpcFactory.DEFAULT //
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-21415 
Investigate correct RaftError
             .newResponse(msgFactory(), RaftError.UNKNOWN, "Unknown leader");
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 61a596bfef..69fa3187fa 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -180,7 +180,7 @@ import 
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -474,14 +474,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         TransactionInflights transactionInflights = new 
TransactionInflights(placementDriverManager.placementDriver());
 
-        ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
-                name,
-                resourcesRegistry,
-                clusterSvc.topologyService(),
-                clusterSvc.messagingService(),
-                transactionInflights
-        );
-
         var txManager = new TxManagerImpl(
                 name,
                 txConfiguration,
@@ -496,10 +488,18 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 new TestLocalRwTxCounter(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 resourcesRegistry,
-                resourceCleanupManager,
                 transactionInflights
         );
 
+        ResourceVacuumManager resourceVacuumManager = new 
ResourceVacuumManager(
+                name,
+                resourcesRegistry,
+                clusterSvc.topologyService(),
+                clusterSvc.messagingService(),
+                transactionInflights,
+                txManager
+        );
+
         Consumer<LongFunction<CompletableFuture<?>>> registry = (c) -> 
metaStorageMgr.registerRevisionUpdateListener(c::apply);
 
         DataStorageModules dataStorageModules = new 
DataStorageModules(ServiceLoader.load(DataStorageModule.class));
@@ -665,6 +665,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 cmgManager,
                 replicaMgr,
                 txManager,
+                resourceVacuumManager,
                 lowWatermark,
                 metaStorageMgr,
                 clusterCfgMgr,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 03d22bb7ec..e8c40b3b10 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -201,7 +201,7 @@ import 
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -378,7 +378,7 @@ public class IgniteImpl implements Ignite {
     private final IndexNodeFinishedRwTransactionsChecker 
indexNodeFinishedRwTransactionsChecker;
 
     /** Cleanup manager for tx resources. */
-    private final ResourceCleanupManager resourceCleanupManager;
+    private final ResourceVacuumManager resourceVacuumManager;
 
     /** Remote triggered resources registry. */
     private final RemotelyTriggeredResourceRegistry resourcesRegistry;
@@ -681,14 +681,6 @@ public class IgniteImpl implements Ignite {
 
         TransactionInflights transactionInflights = new 
TransactionInflights(placementDriverMgr.placementDriver());
 
-        resourceCleanupManager = new ResourceCleanupManager(
-                name,
-                resourcesRegistry,
-                clusterSvc.topologyService(),
-                messagingServiceReturningToStorageOperationsPool,
-                transactionInflights
-        );
-
         // TODO: IGNITE-19344 - use nodeId that is validated on join (and 
probably generated differently).
         txManager = new TxManagerImpl(
                 name,
@@ -704,10 +696,18 @@ public class IgniteImpl implements Ignite {
                 indexNodeFinishedRwTransactionsChecker,
                 threadPoolsManager.partitionOperationsExecutor(),
                 resourcesRegistry,
-                resourceCleanupManager,
                 transactionInflights
         );
 
+        resourceVacuumManager = new ResourceVacuumManager(
+                name,
+                resourcesRegistry,
+                clusterSvc.topologyService(),
+                messagingServiceReturningToStorageOperationsPool,
+                transactionInflights,
+                txManager
+        );
+
         StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
 
         lowWatermark = new LowWatermarkImpl(
@@ -1054,7 +1054,7 @@ public class IgniteImpl implements Ignite {
                                     clientHandlerModule,
                                     deploymentManager,
                                     sql,
-                                    resourceCleanupManager
+                                    resourceVacuumManager
                             );
 
                             // The system view manager comes last because 
other components
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 798ed7df46..8e5408b90a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -74,7 +74,6 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -153,14 +152,6 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
 
             TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
 
-            ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
-                    leaseholder,
-                    resourcesRegistry,
-                    clusterService.topologyService(),
-                    clusterService.messagingService(),
-                    transactionInflights
-            );
-
             TxManagerImpl txManager = new TxManagerImpl(
                     txConfiguration,
                     clusterService,
@@ -172,7 +163,6 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                     () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                     new TestLocalRwTxCounter(),
                     resourcesRegistry,
-                    resourceCleanupManager,
                     transactionInflights
             );
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index b93a38fff4..c24e15e74a 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager.LockState;
 import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -134,7 +133,6 @@ public class ItLockTableTest extends IgniteAbstractTest {
                     ClusterNode node,
                     PlacementDriver placementDriver,
                     RemotelyTriggeredResourceRegistry resourcesRegistry,
-                    ResourceCleanupManager resourceCleanupManager,
                     TransactionInflights transactionInflights
             ) {
                 return new TxManagerImpl(
@@ -152,7 +150,6 @@ public class ItLockTableTest extends IgniteAbstractTest {
                         () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                         new TestLocalRwTxCounter(),
                         resourcesRegistry,
-                        resourceCleanupManager,
                         transactionInflights
                 );
             }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 951b19dd08..721690eae6 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -117,7 +116,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                     ClusterNode node,
                     PlacementDriver placementDriver,
                     RemotelyTriggeredResourceRegistry resourcesRegistry,
-                    ResourceCleanupManager resourceCleanupManager,
                     TransactionInflights transactionInflights
             ) {
                 return new TxManagerImpl(
@@ -131,7 +129,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                         () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                         new TestLocalRwTxCounter(),
                         resourcesRegistry,
-                        resourceCleanupManager,
                         transactionInflights
                 ) {
                     @Override
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 85452dafb0..2d6001828f 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -188,7 +188,6 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -1084,14 +1083,6 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
             TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
 
-            ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
-                    name,
-                    resourcesRegistry,
-                    clusterService.topologyService(),
-                    clusterService.messagingService(),
-                    transactionInflights
-            );
-
             cfgStorage = new DistributedConfigurationStorage("test", 
metaStorageManager);
 
             clusterCfgGenerator = new 
ConfigurationTreeGenerator(GcConfiguration.KEY);
@@ -1151,7 +1142,6 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     partitionIdleSafeTimePropagationPeriodMsSupplier,
                     new TestLocalRwTxCounter(),
                     resourcesRegistry,
-                    resourceCleanupManager,
                     transactionInflights
             );
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 8f5ceab78a..a25c230ec5 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -100,7 +100,6 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -173,14 +172,6 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
 
         TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
 
-        ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
-                clusterNode.name(),
-                resourcesRegistry,
-                clusterService.topologyService(),
-                clusterService.messagingService(),
-                transactionInflights
-        );
-
         txManager = new TxManagerImpl(
                 txConfiguration,
                 clusterService,
@@ -192,7 +183,6 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                 new TestLocalRwTxCounter(),
                 resourcesRegistry,
-                resourceCleanupManager,
                 transactionInflights
         ) {
             @Override
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index 16f057613e..7d02bc4df6 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -22,7 +22,7 @@ import static 
org.apache.ignite.internal.SessionUtils.executeUpdate;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static 
org.apache.ignite.internal.tx.impl.ResourceCleanupManager.RESOURCE_CLEANUP_INTERVAL_MILLISECONDS_PROPERTY;
+import static 
org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
 import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -98,7 +98,7 @@ import org.junit.jupiter.params.provider.ValueSource;
  * Abandoned transactions integration tests.
  */
 @ExtendWith(SystemPropertiesExtension.class)
-@WithSystemProperty(key = RESOURCE_CLEANUP_INTERVAL_MILLISECONDS_PROPERTY, 
value = "500")
+@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, 
value = "500")
 public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest {
     private static final PlacementDriverMessagesFactory 
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index f4435aa46d..2b88c02795 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -136,7 +136,7 @@ import 
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -207,6 +207,8 @@ public class ItTxTestCluster {
 
     protected Map<String, TxManager> txManagers;
 
+    private Map<String, ResourceVacuumManager> resourceCleanupManagers;
+
     protected Map<String, TransactionInflights> txInflights;
 
     protected Map<String, RemotelyTriggeredResourceRegistry> cursorRegistries;
@@ -215,6 +217,8 @@ public class ItTxTestCluster {
 
     protected TxManager clientTxManager;
 
+    private ResourceVacuumManager clientResourceVacuumManager;
+
     protected TransactionStateResolver clientTxStateResolver;
 
     protected Map<String, List<RaftGroupService>> raftClients = new 
HashMap<>();
@@ -344,6 +348,7 @@ public class ItTxTestCluster {
         replicaManagers = new HashMap<>(nodes);
         replicaServices = new HashMap<>(nodes);
         txManagers = new HashMap<>(nodes);
+        resourceCleanupManagers = new HashMap<>(nodes);
         txInflights = new HashMap<>(nodes);
         cursorRegistries = new HashMap<>(nodes);
         txStateStorages = new HashMap<>(nodes);
@@ -415,14 +420,6 @@ public class ItTxTestCluster {
 
             txInflights.put(node.name(), transactionInflights);
 
-            ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
-                    node.name(),
-                    resourcesRegistry,
-                    clusterService.topologyService(),
-                    clusterService.messagingService(),
-                    transactionInflights
-            );
-
             cursorRegistries.put(node.name(), resourcesRegistry);
 
             TxManagerImpl txMgr = newTxManager(
@@ -433,14 +430,24 @@ public class ItTxTestCluster {
                     node,
                     placementDriver,
                     resourcesRegistry,
-                    resourceCleanupManager,
                     transactionInflights
             );
 
-            txMgr.start();
+            ResourceVacuumManager resourceVacuumManager = new 
ResourceVacuumManager(
+                    node.name(),
+                    resourcesRegistry,
+                    clusterService.topologyService(),
+                    clusterService.messagingService(),
+                    transactionInflights,
+                    txMgr
+            );
 
+            txMgr.start();
             txManagers.put(node.name(), txMgr);
 
+            resourceVacuumManager.start();
+            resourceCleanupManagers.put(node.name(), resourceVacuumManager);
+
             txStateStorages.put(node.name(), new TestTxStateStorage());
         }
 
@@ -455,6 +462,7 @@ public class ItTxTestCluster {
         } else {
             // Collocated mode.
             clientTxManager = txManagers.get(localNodeName);
+            clientResourceVacuumManager = 
resourceCleanupManagers.get(localNodeName);
             clientTransactionInflights = txInflights.get(localNodeName);
         }
 
@@ -471,7 +479,6 @@ public class ItTxTestCluster {
             ClusterNode node,
             PlacementDriver placementDriver,
             RemotelyTriggeredResourceRegistry resourcesRegistry,
-            ResourceCleanupManager resourceCleanupManager,
             TransactionInflights transactionInflights
     ) {
         return new TxManagerImpl(
@@ -488,7 +495,6 @@ public class ItTxTestCluster {
                 new TestLocalRwTxCounter(),
                 partitionOperationsExecutor,
                 resourcesRegistry,
-                resourceCleanupManager,
                 transactionInflights
         );
     }
@@ -895,6 +901,16 @@ public class ItTxTestCluster {
             }
         }
 
+        if (resourceCleanupManagers != null) {
+            for (ResourceVacuumManager resourceVacuumManager : 
resourceCleanupManagers.values()) {
+                resourceVacuumManager.stop();
+            }
+        }
+
+        if (clientResourceVacuumManager != null) {
+            clientResourceVacuumManager.stop();
+        }
+
         if (txManagers != null) {
             for (TxManager txMgr : txManagers.values()) {
                 txMgr.stop();
@@ -954,14 +970,6 @@ public class ItTxTestCluster {
 
         clientTransactionInflights = new TransactionInflights(placementDriver);
 
-        ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
-                "client",
-                resourceRegistry,
-                client.topologyService(),
-                client.messagingService(),
-                clientTransactionInflights
-        );
-
         clientTxManager = new TxManagerImpl(
                 "client",
                 txConfiguration,
@@ -976,10 +984,18 @@ public class ItTxTestCluster {
                 new TestLocalRwTxCounter(),
                 partitionOperationsExecutor,
                 resourceRegistry,
-                resourceCleanupManager,
                 clientTransactionInflights
         );
 
+        clientResourceVacuumManager = new ResourceVacuumManager(
+                "client",
+                resourceRegistry,
+                client.topologyService(),
+                client.messagingService(),
+                clientTransactionInflights,
+                clientTxManager
+        );
+
         clientTxStateResolver = new TransactionStateResolver(
                 clientTxManager,
                 clientClockService,
@@ -991,6 +1007,7 @@ public class ItTxTestCluster {
 
         clientTxStateResolver.start();
         clientTxManager.start();
+        clientResourceVacuumManager.start();
     }
 
     public Map<String, Loza> raftServers() {
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 72e96efef1..64b3e6a2d1 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -97,7 +97,6 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -468,14 +467,6 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
 
-        ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
-                LOCAL_NODE.name(),
-                resourcesRegistry,
-                clusterService.topologyService(),
-                clusterService.messagingService(),
-                transactionInflights
-        );
-
         var txManager = new TxManagerImpl(
                 txConfiguration,
                 clusterService,
@@ -487,7 +478,6 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                 new TestLocalRwTxCounter(),
                 resourcesRegistry,
-                resourceCleanupManager,
                 transactionInflights
         );
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 829d39f540..2d3ea54097 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -164,6 +164,9 @@ public interface TxManager extends IgniteComponent {
      */
     CompletableFuture<Void> cleanup(String node, UUID txId);
 
+    /** Locally vacuums no longer needed transactional resources, like 
txnState both persistent and volatile. */
+    void vacuum();
+
     /**
      * Returns a number of finished transactions.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index 7cb0917710..58bf55228b 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -41,6 +41,8 @@ public class TxStateMeta implements TransactionMeta {
 
     private final HybridTimestamp commitTimestamp;
 
+    private final Long initialVacuumObservationTimestamp;
+
     /**
      * Constructor.
      *
@@ -55,7 +57,7 @@ public class TxStateMeta implements TransactionMeta {
             @Nullable TablePartitionId commitPartitionId,
             @Nullable HybridTimestamp commitTimestamp
     ) {
-        this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, 0);
+        this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, 
null);
     }
 
     /**
@@ -65,19 +67,20 @@ public class TxStateMeta implements TransactionMeta {
      * @param txCoordinatorId Transaction coordinator id.
      * @param commitPartitionId Commit partition replication group id.
      * @param commitTimestamp Commit timestamp.
-     * @param lastAbandonedMarkerTs Timestamp indicates when the transaction 
is marked as abandoned.
+     * @param initialVacuumObservationTimestamp Initial vacuum observation 
timestamp.
      */
-    private TxStateMeta(
+    public TxStateMeta(
             TxState txState,
             @Nullable String txCoordinatorId,
             @Nullable TablePartitionId commitPartitionId,
             @Nullable HybridTimestamp commitTimestamp,
-            long lastAbandonedMarkerTs
+            @Nullable Long initialVacuumObservationTimestamp
     ) {
         this.txState = txState;
         this.txCoordinatorId = txCoordinatorId;
         this.commitPartitionId = commitPartitionId;
         this.commitTimestamp = commitTimestamp;
+        this.initialVacuumObservationTimestamp = 
initialVacuumObservationTimestamp;
     }
 
     /**
@@ -118,6 +121,10 @@ public class TxStateMeta implements TransactionMeta {
         return commitTimestamp;
     }
 
+    public @Nullable Long initialVacuumObservationTimestamp() {
+        return initialVacuumObservationTimestamp;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
index 84e447cb43..016a1d3b79 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.tx.configuration;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.configuration.annotation.Value;
@@ -44,4 +45,9 @@ public class TransactionConfigurationSchema {
     @Range(min = 0)
     @Value(hasDefault = true)
     public final int attemptsObtainLock = 3;
+
+    /** Transaction resource time to live (ms), the minimum lifetime of a 
transaction state. */
+    @Value(hasDefault = true)
+    @Range(min = 0)
+    public long txnResourceTtl = TimeUnit.SECONDS.toMillis(30);
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
index bac0426475..2e9c9feb06 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
@@ -22,6 +22,8 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
@@ -32,6 +34,8 @@ import org.apache.ignite.network.TopologyService;
  * Keeps track of all finished RO transactions.
  */
 public class FinishedReadOnlyTransactionTracker {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FinishedReadOnlyTransactionTracker.class);
+
     /** Tx messages factory. */
     private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
 
@@ -65,18 +69,25 @@ public class FinishedReadOnlyTransactionTracker {
      * Send close cursors batch message to all cluster nodes.
      */
     public void broadcastClosedTransactions() {
-        Collection<UUID> txToSend = 
transactionInflights.finishedReadOnlyTransactions();
+        try {
+            Collection<UUID> txToSend = 
transactionInflights.finishedReadOnlyTransactions();
+
+            if (!txToSend.isEmpty()) {
+                FinishedTransactionsBatchMessage message = 
FACTORY.finishedTransactionsBatchMessage()
+                        .transactions(txToSend)
+                        .build();
 
-        if (!txToSend.isEmpty()) {
-            FinishedTransactionsBatchMessage message = 
FACTORY.finishedTransactionsBatchMessage()
-                    .transactions(txToSend)
-                    .build();
+                CompletableFuture<?>[] messages = topologyService.allMembers()
+                        .stream()
+                        .map(clusterNode -> 
sendCursorCleanupCommand(clusterNode, message))
+                        .toArray(CompletableFuture[]::new);
+                allOf(messages).thenRun(() -> 
transactionInflights.removeTxContexts(txToSend));
+            }
+        } catch (Throwable err) {
+            // TODO https://issues.apache.org/jira/browse/IGNITE-21829 Use 
failure handler instead.
+            LOG.error("Error occurred during broadcasting closed 
transactions.", err);
 
-            CompletableFuture<?>[] messages = topologyService.allMembers()
-                    .stream()
-                    .map(clusterNode -> sendCursorCleanupCommand(clusterNode, 
message))
-                    .toArray(CompletableFuture[]::new);
-            allOf(messages).thenRun(() -> 
transactionInflights.removeTxContexts(txToSend));
+            throw err;
         }
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
similarity index 68%
rename from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
rename to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
index cb51853794..564a871414 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -33,6 +32,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNodeResolver;
 import org.apache.ignite.network.TopologyService;
@@ -40,17 +40,17 @@ import org.apache.ignite.network.TopologyService;
 /**
  * Manager responsible from cleaning up the transaction resources.
  */
-public class ResourceCleanupManager implements IgniteComponent {
+public class ResourceVacuumManager implements IgniteComponent {
     /** The logger. */
-    private static final IgniteLogger LOG = 
Loggers.forClass(ResourceCleanupManager.class);
+    private static final IgniteLogger LOG = 
Loggers.forClass(ResourceVacuumManager.class);
 
-    private static final int RESOURCE_CLEANUP_EXECUTOR_SIZE = 1;
+    private static final int RESOURCE_VACUUM_EXECUTOR_SIZE = 1;
 
     /** System property name. */
-    public static final String RESOURCE_CLEANUP_INTERVAL_MILLISECONDS_PROPERTY 
= "RESOURCE_CLEANUP_INTERVAL_MILLISECONDS";
+    public static final String RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY 
= "RESOURCE_VACUUM_INTERVAL_MILLISECONDS";
 
-    private final int resourceCleanupIntervalMilliseconds = 
IgniteSystemProperties
-            .getInteger(RESOURCE_CLEANUP_INTERVAL_MILLISECONDS_PROPERTY, 
30_000);
+    private final int resourceVacuumIntervalMilliseconds = 
IgniteSystemProperties
+            .getInteger(RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, 
30_000);
 
     private final FinishedReadOnlyTransactionTracker 
finishedReadOnlyTransactionTracker;
 
@@ -61,12 +61,14 @@ public class ResourceCleanupManager implements 
IgniteComponent {
 
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    private final ScheduledExecutorService resourceCleanupExecutor;
+    private final ScheduledExecutorService resourceVacuumExecutor;
 
     private final RemotelyTriggeredResourceRegistry resourceRegistry;
 
     private final ClusterNodeResolver clusterNodeResolver;
 
+    private final TxManager txManager;
+
     /**
      * Constructor.
      *
@@ -75,19 +77,21 @@ public class ResourceCleanupManager implements 
IgniteComponent {
      * @param topologyService Topology service.
      * @param messagingService Messaging service.
      * @param transactionInflights Transaction inflights.
+     * @param txManager Transactional manager.
      */
-    public ResourceCleanupManager(
+    public ResourceVacuumManager(
             String nodeName,
             RemotelyTriggeredResourceRegistry resourceRegistry,
             TopologyService topologyService,
             MessagingService messagingService,
-            TransactionInflights transactionInflights
+            TransactionInflights transactionInflights,
+            TxManager txManager
     ) {
         this.resourceRegistry = resourceRegistry;
         this.clusterNodeResolver = topologyService;
-        this.resourceCleanupExecutor = Executors.newScheduledThreadPool(
-                RESOURCE_CLEANUP_EXECUTOR_SIZE,
-                NamedThreadFactory.create(nodeName, 
"resource-cleanup-executor", LOG)
+        this.resourceVacuumExecutor = Executors.newScheduledThreadPool(
+                RESOURCE_VACUUM_EXECUTOR_SIZE,
+                NamedThreadFactory.create(nodeName, 
"resource-vacuum-executor", LOG)
         );
         this.finishedReadOnlyTransactionTracker = new 
FinishedReadOnlyTransactionTracker(
                 topologyService,
@@ -95,22 +99,24 @@ public class ResourceCleanupManager implements 
IgniteComponent {
                 transactionInflights
         );
         this.finishedTransactionBatchRequestHandler =
-                new FinishedTransactionBatchRequestHandler(messagingService, 
resourceRegistry, resourceCleanupExecutor);
+                new FinishedTransactionBatchRequestHandler(messagingService, 
resourceRegistry, resourceVacuumExecutor);
+
+        this.txManager = txManager;
     }
 
     @Override
     public CompletableFuture<Void> start() {
-        resourceCleanupExecutor.scheduleAtFixedRate(
-                this::runCleanupOperations,
+        resourceVacuumExecutor.scheduleAtFixedRate(
+                this::runVacuumOperations,
                 0,
-                resourceCleanupIntervalMilliseconds,
+                resourceVacuumIntervalMilliseconds,
                 TimeUnit.MILLISECONDS
         );
 
-        resourceCleanupExecutor.scheduleAtFixedRate(
+        resourceVacuumExecutor.scheduleAtFixedRate(
                 
finishedReadOnlyTransactionTracker::broadcastClosedTransactions,
                 0,
-                resourceCleanupIntervalMilliseconds,
+                resourceVacuumIntervalMilliseconds,
                 TimeUnit.MILLISECONDS
         );
 
@@ -123,23 +129,15 @@ public class ResourceCleanupManager implements 
IgniteComponent {
     public void stop() throws Exception {
         busyLock.block();
 
-        shutdownAndAwaitTermination(resourceCleanupExecutor, 10, 
TimeUnit.SECONDS);
-    }
-
-    /**
-     * Is called on the finish of read only transaction.
-     *
-     * @param id Transaction id.
-     */
-    void onReadOnlyTransactionFinished(UUID id) {
-        finishedReadOnlyTransactionTracker.onTransactionFinished(id);
+        shutdownAndAwaitTermination(resourceVacuumExecutor, 10, 
TimeUnit.SECONDS);
     }
 
-    private void runCleanupOperations() {
-        inBusyLock(busyLock, this::cleanupOrphanTxResources);
+    private void runVacuumOperations() {
+        inBusyLock(busyLock, this::vacuumOrphanTxResources);
+        inBusyLock(busyLock, this::vacuumTxnResources);
     }
 
-    private void cleanupOrphanTxResources() {
+    private void vacuumOrphanTxResources() {
         try {
             Set<String> remoteHosts = resourceRegistry.registeredRemoteHosts();
 
@@ -149,9 +147,21 @@ public class ResourceCleanupManager implements 
IgniteComponent {
                 }
             }
         } catch (Throwable err) {
+            // TODO https://issues.apache.org/jira/browse/IGNITE-21829 Use 
failure handler instead.
             LOG.error("Error occurred during the orphan resources closing.", 
err);
 
             throw err;
         }
     }
+
+    private void vacuumTxnResources() {
+        try {
+            txManager.vacuum();
+        } catch (Throwable err) {
+            // TODO https://issues.apache.org/jira/browse/IGNITE-21829 Use 
failure handler instead.
+            LOG.error("Error occurred during txn resources vacuum.", err);
+
+            throw err;
+        }
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index c627246ec4..329fd6d3f3 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -196,9 +196,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
     private final Executor partitionOperationsExecutor;
 
-    /** Cleanup manager for tx resources. */
-    private final ResourceCleanupManager resourceCleanupManager;
-
     private final TransactionInflights transactionInflights;
 
     /**
@@ -228,7 +225,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             LongSupplier idleSafeTimePropagationPeriodMsSupplier,
             LocalRwTxCounter localRwTxCounter,
             RemotelyTriggeredResourceRegistry resourcesRegistry,
-            ResourceCleanupManager resourceCleanupManager,
             TransactionInflights transactionInflights
     ) {
         this(
@@ -245,7 +241,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 localRwTxCounter,
                 ForkJoinPool.commonPool(),
                 resourcesRegistry,
-                resourceCleanupManager,
                 transactionInflights
         );
     }
@@ -281,7 +276,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             LocalRwTxCounter localRwTxCounter,
             Executor partitionOperationsExecutor,
             RemotelyTriggeredResourceRegistry resourcesRegistry,
-            ResourceCleanupManager resourceCleanupManager,
             TransactionInflights transactionInflights
     ) {
         this.txConfig = txConfig;
@@ -295,7 +289,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         this.primaryReplicaEventListener = this::primaryReplicaEventListener;
         this.localRwTxCounter = localRwTxCounter;
         this.partitionOperationsExecutor = partitionOperationsExecutor;
-        this.resourceCleanupManager = resourceCleanupManager;
         this.transactionInflights = transactionInflights;
 
         placementDriverHelper = new PlacementDriverHelper(placementDriver, 
clockService);
@@ -771,6 +764,13 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         return txCleanupRequestSender.cleanup(node, txId);
     }
 
+    @Override
+    public void vacuum() {
+        long vacuumObservationTimestamp = System.currentTimeMillis();
+
+        txStateVolatileStorage.vacuum(vacuumObservationTimestamp, 
txConfig.txnResourceTtl().value());
+    }
+
     @Override
     public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable 
runnable) {
         return runAsync(runnable, writeIntentSwitchPool);
@@ -787,7 +787,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
         UUID txId = txIdAndTimestamp.getTxId();
 
-        resourceCleanupManager.onReadOnlyTransactionFinished(txId);
+        transactionInflights.markReadOnlyTxFinished(txId);
 
         return readOnlyTxFuture;
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
index ea9f810a23..a575318ebe 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
@@ -23,7 +23,10 @@ import static 
org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.jetbrains.annotations.Nullable;
@@ -32,6 +35,7 @@ import org.jetbrains.annotations.Nullable;
  * The class represents volatile transaction state storage that stores a 
transaction state meta until the node stops.
  */
 public class VolatileTxStateMetaStorage {
+    private static final IgniteLogger LOG = 
Loggers.forClass(VolatileTxStateMetaStorage.class);
     /** The local map for tx states. */
     private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
 
@@ -100,4 +104,65 @@ public class VolatileTxStateMetaStorage {
     public Collection<TxStateMeta> states() {
         return txStateMap.values();
     }
+
+    /**
+     * Locally vacuums no longer needed transactional resource.
+     * For each finished (COMMITTED or ABORTED) transactions:
+     * <ol>
+     *     <li> Removes it from the volatile storage if txnResourcesTTL == 0 
or if
+     *     txnState.initialVacuumObservationTimestamp + txnResourcesTTL < 
vacuumObservationTimestamp.</li>
+     *     <li>Updates txnState.initialVacuumObservationTimestamp by setting 
it to vacuumObservationTimestamp
+     *     if it's not already initialized.</li>
+     * </ol>
+     *
+     * @param vacuumObservationTimestamp Timestamp of the vacuum attempt.
+     * @param txnResourceTtl Transactional resource time to live in 
milliseconds.
+     */
+    public void vacuum(long vacuumObservationTimestamp, long txnResourceTtl) {
+        LOG.info("Vacuum started [vacuumObservationTimestamp={}, 
txnResourceTtl={}].", vacuumObservationTimestamp, txnResourceTtl);
+
+        AtomicInteger vacuumizedTxnsCount = new AtomicInteger(0);
+        AtomicInteger markedAsInitiallyDetectedTxnsCount = new 
AtomicInteger(0);
+        AtomicInteger alreadyMarkedTxnsCount = new AtomicInteger(0);
+        AtomicInteger skippedFotFurtherProcessingUnfinishedTxnsCount = new 
AtomicInteger(0);
+
+        txStateMap.forEach((txId, meta) -> {
+            txStateMap.computeIfPresent(txId, (txId0, meta0) -> {
+                if (TxState.isFinalState(meta0.txState())) {
+                    if (txnResourceTtl == 0) {
+                        vacuumizedTxnsCount.incrementAndGet();
+                        return null;
+                    } else if (meta0.initialVacuumObservationTimestamp() == 
null) {
+                        markedAsInitiallyDetectedTxnsCount.incrementAndGet();
+                        return new TxStateMeta(
+                                meta0.txState(),
+                                meta0.txCoordinatorId(),
+                                meta0.commitPartitionId(),
+                                meta0.commitTimestamp(),
+                                vacuumObservationTimestamp
+                        );
+                    } else if (meta0.initialVacuumObservationTimestamp() + 
txnResourceTtl < vacuumObservationTimestamp) {
+                        vacuumizedTxnsCount.incrementAndGet();
+                        return null;
+                    } else {
+                        alreadyMarkedTxnsCount.incrementAndGet();
+                        return meta0;
+                    }
+                } else {
+                    
skippedFotFurtherProcessingUnfinishedTxnsCount.incrementAndGet();
+                    return meta0;
+                }
+            });
+        });
+
+        LOG.info("Vacuum finished [vacuumObservationTimestamp={}, 
txnResourceTtl={}, vacuumizedTxnsCount={},"
+                + " markedAsInitiallyDetectedTxnsCount={}, 
alreadyMarkedTxnsCount={}, skippedFotFurtherProcessingUnfinishedTxnsCount={}].",
+                vacuumObservationTimestamp,
+                txnResourceTtl,
+                vacuumizedTxnsCount,
+                markedAsInitiallyDetectedTxnsCount,
+                alreadyMarkedTxnsCount,
+                skippedFotFurtherProcessingUnfinishedTxnsCount
+        );
+    }
 }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index ada690c777..12d5ac964d 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -76,7 +76,6 @@ import 
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.PrimaryReplicaExpiredException;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -146,14 +145,6 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
 
-        ResourceCleanupManager cleanupManager = new ResourceCleanupManager(
-                LOCAL_NODE.name(),
-                resourceRegistry,
-                clusterService.topologyService(),
-                clusterService.messagingService(),
-                transactionInflights
-        );
-
         txManager = new TxManagerImpl(
                 txConfiguration,
                 clusterService,
@@ -165,7 +156,6 @@ public class TxManagerTest extends IgniteAbstractTest {
                 idleSafeTimePropagationPeriodMsSupplier,
                 localRwTxCounter,
                 resourceRegistry,
-                cleanupManager,
                 transactionInflights
         );
 

Reply via email to