This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f17393e231 IGNITE-21759 Txn state vacuum implemented. (#3469)
f17393e231 is described below
commit f17393e231d930342c59324b18b75529bd9823ba
Author: Alexander Lapin <[email protected]>
AuthorDate: Tue Mar 26 17:43:30 2024 +0200
IGNITE-21759 Txn state vacuum implemented. (#3469)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 5 ++
.../rpc/impl/cli/GetLeaderRequestProcessor.java | 1 -
.../runner/app/ItIgniteNodeRestartTest.java | 21 +++---
.../org/apache/ignite/internal/app/IgniteImpl.java | 24 +++----
.../exec/rel/TableScanNodeExecutionTest.java | 10 ---
.../apache/ignite/distributed/ItLockTableTest.java | 3 -
...xDistributedTestSingleNodeNoCleanupMessage.java | 3 -
.../rebalance/ItRebalanceDistributedTest.java | 10 ---
.../ignite/internal/table/ItColocationTest.java | 10 ---
.../internal/table/ItTransactionRecoveryTest.java | 4 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 61 +++++++++++-------
.../table/impl/DummyInternalTableImpl.java | 10 ---
.../org/apache/ignite/internal/tx/TxManager.java | 3 +
.../org/apache/ignite/internal/tx/TxStateMeta.java | 15 +++--
.../TransactionConfigurationSchema.java | 6 ++
.../impl/FinishedReadOnlyTransactionTracker.java | 31 ++++++---
...anupManager.java => ResourceVacuumManager.java} | 74 ++++++++++++----------
.../ignite/internal/tx/impl/TxManagerImpl.java | 16 ++---
.../tx/impl/VolatileTxStateMetaStorage.java | 65 +++++++++++++++++++
.../apache/ignite/internal/tx/TxManagerTest.java | 10 ---
20 files changed, 225 insertions(+), 157 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 46e1d98a41..be9d5c2de7 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -200,6 +200,11 @@ public class FakeTxManager implements TxManager {
return nullCompletedFuture();
}
+ @Override
+ public void vacuum() {
+ // No-op.
+ }
+
@Override
public int finished() {
return 0;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
index 70718b7604..1090447f81 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
@@ -92,7 +92,6 @@ public class GetLeaderRequestProcessor extends
BaseCliRequestProcessor<GetLeader
}
}
return RaftRpcFactory.DEFAULT //
- // TODO: https://issues.apache.org/jira/browse/IGNITE-21415
Investigate correct RaftError
.newResponse(msgFactory(), RaftError.UNKNOWN, "Unknown leader");
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 61a596bfef..69fa3187fa 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -180,7 +180,7 @@ import
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -474,14 +474,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
TransactionInflights transactionInflights = new
TransactionInflights(placementDriverManager.placementDriver());
- ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
- name,
- resourcesRegistry,
- clusterSvc.topologyService(),
- clusterSvc.messagingService(),
- transactionInflights
- );
-
var txManager = new TxManagerImpl(
name,
txConfiguration,
@@ -496,10 +488,18 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
new TestLocalRwTxCounter(),
threadPoolsManager.partitionOperationsExecutor(),
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
);
+ ResourceVacuumManager resourceVacuumManager = new
ResourceVacuumManager(
+ name,
+ resourcesRegistry,
+ clusterSvc.topologyService(),
+ clusterSvc.messagingService(),
+ transactionInflights,
+ txManager
+ );
+
Consumer<LongFunction<CompletableFuture<?>>> registry = (c) ->
metaStorageMgr.registerRevisionUpdateListener(c::apply);
DataStorageModules dataStorageModules = new
DataStorageModules(ServiceLoader.load(DataStorageModule.class));
@@ -665,6 +665,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
cmgManager,
replicaMgr,
txManager,
+ resourceVacuumManager,
lowWatermark,
metaStorageMgr,
clusterCfgMgr,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 03d22bb7ec..e8c40b3b10 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -201,7 +201,7 @@ import
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -378,7 +378,7 @@ public class IgniteImpl implements Ignite {
private final IndexNodeFinishedRwTransactionsChecker
indexNodeFinishedRwTransactionsChecker;
/** Cleanup manager for tx resources. */
- private final ResourceCleanupManager resourceCleanupManager;
+ private final ResourceVacuumManager resourceVacuumManager;
/** Remote triggered resources registry. */
private final RemotelyTriggeredResourceRegistry resourcesRegistry;
@@ -681,14 +681,6 @@ public class IgniteImpl implements Ignite {
TransactionInflights transactionInflights = new
TransactionInflights(placementDriverMgr.placementDriver());
- resourceCleanupManager = new ResourceCleanupManager(
- name,
- resourcesRegistry,
- clusterSvc.topologyService(),
- messagingServiceReturningToStorageOperationsPool,
- transactionInflights
- );
-
// TODO: IGNITE-19344 - use nodeId that is validated on join (and
probably generated differently).
txManager = new TxManagerImpl(
name,
@@ -704,10 +696,18 @@ public class IgniteImpl implements Ignite {
indexNodeFinishedRwTransactionsChecker,
threadPoolsManager.partitionOperationsExecutor(),
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
);
+ resourceVacuumManager = new ResourceVacuumManager(
+ name,
+ resourcesRegistry,
+ clusterSvc.topologyService(),
+ messagingServiceReturningToStorageOperationsPool,
+ transactionInflights,
+ txManager
+ );
+
StorageUpdateConfiguration storageUpdateConfiguration =
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
lowWatermark = new LowWatermarkImpl(
@@ -1054,7 +1054,7 @@ public class IgniteImpl implements Ignite {
clientHandlerModule,
deploymentManager,
sql,
- resourceCleanupManager
+ resourceVacuumManager
);
// The system view manager comes last because
other components
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 798ed7df46..8e5408b90a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -74,7 +74,6 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -153,14 +152,6 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
- ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
- leaseholder,
- resourcesRegistry,
- clusterService.topologyService(),
- clusterService.messagingService(),
- transactionInflights
- );
-
TxManagerImpl txManager = new TxManagerImpl(
txConfiguration,
clusterService,
@@ -172,7 +163,6 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
() ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index b93a38fff4..c24e15e74a 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager.LockState;
import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -134,7 +133,6 @@ public class ItLockTableTest extends IgniteAbstractTest {
ClusterNode node,
PlacementDriver placementDriver,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager,
TransactionInflights transactionInflights
) {
return new TxManagerImpl(
@@ -152,7 +150,6 @@ public class ItLockTableTest extends IgniteAbstractTest {
() ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
);
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 951b19dd08..721690eae6 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -117,7 +116,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
ClusterNode node,
PlacementDriver placementDriver,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager,
TransactionInflights transactionInflights
) {
return new TxManagerImpl(
@@ -131,7 +129,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
() ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
) {
@Override
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 85452dafb0..2d6001828f 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -188,7 +188,6 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -1084,14 +1083,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
- ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
- name,
- resourcesRegistry,
- clusterService.topologyService(),
- clusterService.messagingService(),
- transactionInflights
- );
-
cfgStorage = new DistributedConfigurationStorage("test",
metaStorageManager);
clusterCfgGenerator = new
ConfigurationTreeGenerator(GcConfiguration.KEY);
@@ -1151,7 +1142,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
partitionIdleSafeTimePropagationPeriodMsSupplier,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 8f5ceab78a..a25c230ec5 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -100,7 +100,6 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -173,14 +172,6 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
- ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
- clusterNode.name(),
- resourcesRegistry,
- clusterService.topologyService(),
- clusterService.messagingService(),
- transactionInflights
- );
-
txManager = new TxManagerImpl(
txConfiguration,
clusterService,
@@ -192,7 +183,6 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
) {
@Override
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index 16f057613e..7d02bc4df6 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -22,7 +22,7 @@ import static
org.apache.ignite.internal.SessionUtils.executeUpdate;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static
org.apache.ignite.internal.tx.impl.ResourceCleanupManager.RESOURCE_CLEANUP_INTERVAL_MILLISECONDS_PROPERTY;
+import static
org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -98,7 +98,7 @@ import org.junit.jupiter.params.provider.ValueSource;
* Abandoned transactions integration tests.
*/
@ExtendWith(SystemPropertiesExtension.class)
-@WithSystemProperty(key = RESOURCE_CLEANUP_INTERVAL_MILLISECONDS_PROPERTY,
value = "500")
+@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY,
value = "500")
public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest {
private static final PlacementDriverMessagesFactory
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index f4435aa46d..2b88c02795 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -136,7 +136,7 @@ import
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -207,6 +207,8 @@ public class ItTxTestCluster {
protected Map<String, TxManager> txManagers;
+ private Map<String, ResourceVacuumManager> resourceCleanupManagers;
+
protected Map<String, TransactionInflights> txInflights;
protected Map<String, RemotelyTriggeredResourceRegistry> cursorRegistries;
@@ -215,6 +217,8 @@ public class ItTxTestCluster {
protected TxManager clientTxManager;
+ private ResourceVacuumManager clientResourceVacuumManager;
+
protected TransactionStateResolver clientTxStateResolver;
protected Map<String, List<RaftGroupService>> raftClients = new
HashMap<>();
@@ -344,6 +348,7 @@ public class ItTxTestCluster {
replicaManagers = new HashMap<>(nodes);
replicaServices = new HashMap<>(nodes);
txManagers = new HashMap<>(nodes);
+ resourceCleanupManagers = new HashMap<>(nodes);
txInflights = new HashMap<>(nodes);
cursorRegistries = new HashMap<>(nodes);
txStateStorages = new HashMap<>(nodes);
@@ -415,14 +420,6 @@ public class ItTxTestCluster {
txInflights.put(node.name(), transactionInflights);
- ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
- node.name(),
- resourcesRegistry,
- clusterService.topologyService(),
- clusterService.messagingService(),
- transactionInflights
- );
-
cursorRegistries.put(node.name(), resourcesRegistry);
TxManagerImpl txMgr = newTxManager(
@@ -433,14 +430,24 @@ public class ItTxTestCluster {
node,
placementDriver,
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
);
- txMgr.start();
+ ResourceVacuumManager resourceVacuumManager = new
ResourceVacuumManager(
+ node.name(),
+ resourcesRegistry,
+ clusterService.topologyService(),
+ clusterService.messagingService(),
+ transactionInflights,
+ txMgr
+ );
+ txMgr.start();
txManagers.put(node.name(), txMgr);
+ resourceVacuumManager.start();
+ resourceCleanupManagers.put(node.name(), resourceVacuumManager);
+
txStateStorages.put(node.name(), new TestTxStateStorage());
}
@@ -455,6 +462,7 @@ public class ItTxTestCluster {
} else {
// Collocated mode.
clientTxManager = txManagers.get(localNodeName);
+ clientResourceVacuumManager =
resourceCleanupManagers.get(localNodeName);
clientTransactionInflights = txInflights.get(localNodeName);
}
@@ -471,7 +479,6 @@ public class ItTxTestCluster {
ClusterNode node,
PlacementDriver placementDriver,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager,
TransactionInflights transactionInflights
) {
return new TxManagerImpl(
@@ -488,7 +495,6 @@ public class ItTxTestCluster {
new TestLocalRwTxCounter(),
partitionOperationsExecutor,
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
);
}
@@ -895,6 +901,16 @@ public class ItTxTestCluster {
}
}
+ if (resourceCleanupManagers != null) {
+ for (ResourceVacuumManager resourceVacuumManager :
resourceCleanupManagers.values()) {
+ resourceVacuumManager.stop();
+ }
+ }
+
+ if (clientResourceVacuumManager != null) {
+ clientResourceVacuumManager.stop();
+ }
+
if (txManagers != null) {
for (TxManager txMgr : txManagers.values()) {
txMgr.stop();
@@ -954,14 +970,6 @@ public class ItTxTestCluster {
clientTransactionInflights = new TransactionInflights(placementDriver);
- ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
- "client",
- resourceRegistry,
- client.topologyService(),
- client.messagingService(),
- clientTransactionInflights
- );
-
clientTxManager = new TxManagerImpl(
"client",
txConfiguration,
@@ -976,10 +984,18 @@ public class ItTxTestCluster {
new TestLocalRwTxCounter(),
partitionOperationsExecutor,
resourceRegistry,
- resourceCleanupManager,
clientTransactionInflights
);
+ clientResourceVacuumManager = new ResourceVacuumManager(
+ "client",
+ resourceRegistry,
+ client.topologyService(),
+ client.messagingService(),
+ clientTransactionInflights,
+ clientTxManager
+ );
+
clientTxStateResolver = new TransactionStateResolver(
clientTxManager,
clientClockService,
@@ -991,6 +1007,7 @@ public class ItTxTestCluster {
clientTxStateResolver.start();
clientTxManager.start();
+ clientResourceVacuumManager.start();
}
public Map<String, Loza> raftServers() {
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 72e96efef1..64b3e6a2d1 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -97,7 +97,6 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -468,14 +467,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
- ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
- LOCAL_NODE.name(),
- resourcesRegistry,
- clusterService.topologyService(),
- clusterService.messagingService(),
- transactionInflights
- );
-
var txManager = new TxManagerImpl(
txConfiguration,
clusterService,
@@ -487,7 +478,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 829d39f540..2d3ea54097 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -164,6 +164,9 @@ public interface TxManager extends IgniteComponent {
*/
CompletableFuture<Void> cleanup(String node, UUID txId);
+ /** Locally vacuums no longer needed transactional resources, like
txnState both persistent and volatile. */
+ void vacuum();
+
/**
* Returns a number of finished transactions.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index 7cb0917710..58bf55228b 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -41,6 +41,8 @@ public class TxStateMeta implements TransactionMeta {
private final HybridTimestamp commitTimestamp;
+ private final Long initialVacuumObservationTimestamp;
+
/**
* Constructor.
*
@@ -55,7 +57,7 @@ public class TxStateMeta implements TransactionMeta {
@Nullable TablePartitionId commitPartitionId,
@Nullable HybridTimestamp commitTimestamp
) {
- this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, 0);
+ this(txState, txCoordinatorId, commitPartitionId, commitTimestamp,
null);
}
/**
@@ -65,19 +67,20 @@ public class TxStateMeta implements TransactionMeta {
* @param txCoordinatorId Transaction coordinator id.
* @param commitPartitionId Commit partition replication group id.
* @param commitTimestamp Commit timestamp.
- * @param lastAbandonedMarkerTs Timestamp indicates when the transaction
is marked as abandoned.
+ * @param initialVacuumObservationTimestamp Initial vacuum observation
timestamp.
*/
- private TxStateMeta(
+ public TxStateMeta(
TxState txState,
@Nullable String txCoordinatorId,
@Nullable TablePartitionId commitPartitionId,
@Nullable HybridTimestamp commitTimestamp,
- long lastAbandonedMarkerTs
+ @Nullable Long initialVacuumObservationTimestamp
) {
this.txState = txState;
this.txCoordinatorId = txCoordinatorId;
this.commitPartitionId = commitPartitionId;
this.commitTimestamp = commitTimestamp;
+ this.initialVacuumObservationTimestamp =
initialVacuumObservationTimestamp;
}
/**
@@ -118,6 +121,10 @@ public class TxStateMeta implements TransactionMeta {
return commitTimestamp;
}
+ public @Nullable Long initialVacuumObservationTimestamp() {
+ return initialVacuumObservationTimestamp;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
index 84e447cb43..016a1d3b79 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.tx.configuration;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.annotation.Value;
@@ -44,4 +45,9 @@ public class TransactionConfigurationSchema {
@Range(min = 0)
@Value(hasDefault = true)
public final int attemptsObtainLock = 3;
+
+ /** Transaction resource time to live (ms), the minimum lifetime of a
transaction state. */
+ @Value(hasDefault = true)
+ @Range(min = 0)
+ public long txnResourceTtl = TimeUnit.SECONDS.toMillis(30);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
index bac0426475..2e9c9feb06 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
@@ -22,6 +22,8 @@ import static java.util.concurrent.CompletableFuture.allOf;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
@@ -32,6 +34,8 @@ import org.apache.ignite.network.TopologyService;
* Keeps track of all finished RO transactions.
*/
public class FinishedReadOnlyTransactionTracker {
+ private static final IgniteLogger LOG =
Loggers.forClass(FinishedReadOnlyTransactionTracker.class);
+
/** Tx messages factory. */
private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
@@ -65,18 +69,25 @@ public class FinishedReadOnlyTransactionTracker {
* Send close cursors batch message to all cluster nodes.
*/
public void broadcastClosedTransactions() {
- Collection<UUID> txToSend =
transactionInflights.finishedReadOnlyTransactions();
+ try {
+ Collection<UUID> txToSend =
transactionInflights.finishedReadOnlyTransactions();
+
+ if (!txToSend.isEmpty()) {
+ FinishedTransactionsBatchMessage message =
FACTORY.finishedTransactionsBatchMessage()
+ .transactions(txToSend)
+ .build();
- if (!txToSend.isEmpty()) {
- FinishedTransactionsBatchMessage message =
FACTORY.finishedTransactionsBatchMessage()
- .transactions(txToSend)
- .build();
+ CompletableFuture<?>[] messages = topologyService.allMembers()
+ .stream()
+ .map(clusterNode ->
sendCursorCleanupCommand(clusterNode, message))
+ .toArray(CompletableFuture[]::new);
+ allOf(messages).thenRun(() ->
transactionInflights.removeTxContexts(txToSend));
+ }
+ } catch (Throwable err) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-21829 Use
failure handler instead.
+ LOG.error("Error occurred during broadcasting closed
transactions.", err);
- CompletableFuture<?>[] messages = topologyService.allMembers()
- .stream()
- .map(clusterNode -> sendCursorCleanupCommand(clusterNode,
message))
- .toArray(CompletableFuture[]::new);
- allOf(messages).thenRun(() ->
transactionInflights.removeTxContexts(txToSend));
+ throw err;
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
similarity index 68%
rename from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
rename to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
index cb51853794..564a871414 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
@@ -22,7 +22,6 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -33,6 +32,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNodeResolver;
import org.apache.ignite.network.TopologyService;
@@ -40,17 +40,17 @@ import org.apache.ignite.network.TopologyService;
/**
* Manager responsible from cleaning up the transaction resources.
*/
-public class ResourceCleanupManager implements IgniteComponent {
+public class ResourceVacuumManager implements IgniteComponent {
/** The logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(ResourceCleanupManager.class);
+ private static final IgniteLogger LOG =
Loggers.forClass(ResourceVacuumManager.class);
- private static final int RESOURCE_CLEANUP_EXECUTOR_SIZE = 1;
+ private static final int RESOURCE_VACUUM_EXECUTOR_SIZE = 1;
/** System property name. */
- public static final String RESOURCE_CLEANUP_INTERVAL_MILLISECONDS_PROPERTY
= "RESOURCE_CLEANUP_INTERVAL_MILLISECONDS";
+ public static final String RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY
= "RESOURCE_VACUUM_INTERVAL_MILLISECONDS";
- private final int resourceCleanupIntervalMilliseconds =
IgniteSystemProperties
- .getInteger(RESOURCE_CLEANUP_INTERVAL_MILLISECONDS_PROPERTY,
30_000);
+ private final int resourceVacuumIntervalMilliseconds =
IgniteSystemProperties
+ .getInteger(RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY,
30_000);
private final FinishedReadOnlyTransactionTracker
finishedReadOnlyTransactionTracker;
@@ -61,12 +61,14 @@ public class ResourceCleanupManager implements
IgniteComponent {
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- private final ScheduledExecutorService resourceCleanupExecutor;
+ private final ScheduledExecutorService resourceVacuumExecutor;
private final RemotelyTriggeredResourceRegistry resourceRegistry;
private final ClusterNodeResolver clusterNodeResolver;
+ private final TxManager txManager;
+
/**
* Constructor.
*
@@ -75,19 +77,21 @@ public class ResourceCleanupManager implements
IgniteComponent {
* @param topologyService Topology service.
* @param messagingService Messaging service.
* @param transactionInflights Transaction inflights.
+ * @param txManager Transactional manager.
*/
- public ResourceCleanupManager(
+ public ResourceVacuumManager(
String nodeName,
RemotelyTriggeredResourceRegistry resourceRegistry,
TopologyService topologyService,
MessagingService messagingService,
- TransactionInflights transactionInflights
+ TransactionInflights transactionInflights,
+ TxManager txManager
) {
this.resourceRegistry = resourceRegistry;
this.clusterNodeResolver = topologyService;
- this.resourceCleanupExecutor = Executors.newScheduledThreadPool(
- RESOURCE_CLEANUP_EXECUTOR_SIZE,
- NamedThreadFactory.create(nodeName,
"resource-cleanup-executor", LOG)
+ this.resourceVacuumExecutor = Executors.newScheduledThreadPool(
+ RESOURCE_VACUUM_EXECUTOR_SIZE,
+ NamedThreadFactory.create(nodeName,
"resource-vacuum-executor", LOG)
);
this.finishedReadOnlyTransactionTracker = new
FinishedReadOnlyTransactionTracker(
topologyService,
@@ -95,22 +99,24 @@ public class ResourceCleanupManager implements
IgniteComponent {
transactionInflights
);
this.finishedTransactionBatchRequestHandler =
- new FinishedTransactionBatchRequestHandler(messagingService,
resourceRegistry, resourceCleanupExecutor);
+ new FinishedTransactionBatchRequestHandler(messagingService,
resourceRegistry, resourceVacuumExecutor);
+
+ this.txManager = txManager;
}
@Override
public CompletableFuture<Void> start() {
- resourceCleanupExecutor.scheduleAtFixedRate(
- this::runCleanupOperations,
+ resourceVacuumExecutor.scheduleAtFixedRate(
+ this::runVacuumOperations,
0,
- resourceCleanupIntervalMilliseconds,
+ resourceVacuumIntervalMilliseconds,
TimeUnit.MILLISECONDS
);
- resourceCleanupExecutor.scheduleAtFixedRate(
+ resourceVacuumExecutor.scheduleAtFixedRate(
finishedReadOnlyTransactionTracker::broadcastClosedTransactions,
0,
- resourceCleanupIntervalMilliseconds,
+ resourceVacuumIntervalMilliseconds,
TimeUnit.MILLISECONDS
);
@@ -123,23 +129,15 @@ public class ResourceCleanupManager implements
IgniteComponent {
public void stop() throws Exception {
busyLock.block();
- shutdownAndAwaitTermination(resourceCleanupExecutor, 10,
TimeUnit.SECONDS);
- }
-
- /**
- * Is called on the finish of read only transaction.
- *
- * @param id Transaction id.
- */
- void onReadOnlyTransactionFinished(UUID id) {
- finishedReadOnlyTransactionTracker.onTransactionFinished(id);
+ shutdownAndAwaitTermination(resourceVacuumExecutor, 10,
TimeUnit.SECONDS);
}
- private void runCleanupOperations() {
- inBusyLock(busyLock, this::cleanupOrphanTxResources);
+ private void runVacuumOperations() {
+ inBusyLock(busyLock, this::vacuumOrphanTxResources);
+ inBusyLock(busyLock, this::vacuumTxnResources);
}
- private void cleanupOrphanTxResources() {
+ private void vacuumOrphanTxResources() {
try {
Set<String> remoteHosts = resourceRegistry.registeredRemoteHosts();
@@ -149,9 +147,21 @@ public class ResourceCleanupManager implements
IgniteComponent {
}
}
} catch (Throwable err) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-21829 Use
failure handler instead.
LOG.error("Error occurred during the orphan resources closing.",
err);
throw err;
}
}
+
+ private void vacuumTxnResources() {
+ try {
+ txManager.vacuum();
+ } catch (Throwable err) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-21829 Use
failure handler instead.
+ LOG.error("Error occurred during txn resources vacuum.", err);
+
+ throw err;
+ }
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index c627246ec4..329fd6d3f3 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -196,9 +196,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
private final Executor partitionOperationsExecutor;
- /** Cleanup manager for tx resources. */
- private final ResourceCleanupManager resourceCleanupManager;
-
private final TransactionInflights transactionInflights;
/**
@@ -228,7 +225,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
LongSupplier idleSafeTimePropagationPeriodMsSupplier,
LocalRwTxCounter localRwTxCounter,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager,
TransactionInflights transactionInflights
) {
this(
@@ -245,7 +241,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
localRwTxCounter,
ForkJoinPool.commonPool(),
resourcesRegistry,
- resourceCleanupManager,
transactionInflights
);
}
@@ -281,7 +276,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
LocalRwTxCounter localRwTxCounter,
Executor partitionOperationsExecutor,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager,
TransactionInflights transactionInflights
) {
this.txConfig = txConfig;
@@ -295,7 +289,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
this.primaryReplicaEventListener = this::primaryReplicaEventListener;
this.localRwTxCounter = localRwTxCounter;
this.partitionOperationsExecutor = partitionOperationsExecutor;
- this.resourceCleanupManager = resourceCleanupManager;
this.transactionInflights = transactionInflights;
placementDriverHelper = new PlacementDriverHelper(placementDriver,
clockService);
@@ -771,6 +764,13 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
return txCleanupRequestSender.cleanup(node, txId);
}
+ @Override
+ public void vacuum() {
+ long vacuumObservationTimestamp = System.currentTimeMillis();
+
+ txStateVolatileStorage.vacuum(vacuumObservationTimestamp,
txConfig.txnResourceTtl().value());
+ }
+
@Override
public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable
runnable) {
return runAsync(runnable, writeIntentSwitchPool);
@@ -787,7 +787,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
UUID txId = txIdAndTimestamp.getTxId();
- resourceCleanupManager.onReadOnlyTransactionFinished(txId);
+ transactionInflights.markReadOnlyTxFinished(txId);
return readOnlyTxFuture;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
index ea9f810a23..a575318ebe 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
@@ -23,7 +23,10 @@ import static
org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.jetbrains.annotations.Nullable;
@@ -32,6 +35,7 @@ import org.jetbrains.annotations.Nullable;
* The class represents volatile transaction state storage that stores a
transaction state meta until the node stops.
*/
public class VolatileTxStateMetaStorage {
+ private static final IgniteLogger LOG =
Loggers.forClass(VolatileTxStateMetaStorage.class);
/** The local map for tx states. */
private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
@@ -100,4 +104,65 @@ public class VolatileTxStateMetaStorage {
public Collection<TxStateMeta> states() {
return txStateMap.values();
}
+
+ /**
+ * Locally vacuums no longer needed transactional resource.
+ * For each finished (COMMITTED or ABORTED) transactions:
+ * <ol>
+ * <li> Removes it from the volatile storage if txnResourcesTTL == 0
or if
+ * txnState.initialVacuumObservationTimestamp + txnResourcesTTL <
vacuumObservationTimestamp.</li>
+ * <li>Updates txnState.initialVacuumObservationTimestamp by setting
it to vacuumObservationTimestamp
+ * if it's not already initialized.</li>
+ * </ol>
+ *
+ * @param vacuumObservationTimestamp Timestamp of the vacuum attempt.
+ * @param txnResourceTtl Transactional resource time to live in
milliseconds.
+ */
+ public void vacuum(long vacuumObservationTimestamp, long txnResourceTtl) {
+ LOG.info("Vacuum started [vacuumObservationTimestamp={},
txnResourceTtl={}].", vacuumObservationTimestamp, txnResourceTtl);
+
+ AtomicInteger vacuumizedTxnsCount = new AtomicInteger(0);
+ AtomicInteger markedAsInitiallyDetectedTxnsCount = new
AtomicInteger(0);
+ AtomicInteger alreadyMarkedTxnsCount = new AtomicInteger(0);
+ AtomicInteger skippedFotFurtherProcessingUnfinishedTxnsCount = new
AtomicInteger(0);
+
+ txStateMap.forEach((txId, meta) -> {
+ txStateMap.computeIfPresent(txId, (txId0, meta0) -> {
+ if (TxState.isFinalState(meta0.txState())) {
+ if (txnResourceTtl == 0) {
+ vacuumizedTxnsCount.incrementAndGet();
+ return null;
+ } else if (meta0.initialVacuumObservationTimestamp() ==
null) {
+ markedAsInitiallyDetectedTxnsCount.incrementAndGet();
+ return new TxStateMeta(
+ meta0.txState(),
+ meta0.txCoordinatorId(),
+ meta0.commitPartitionId(),
+ meta0.commitTimestamp(),
+ vacuumObservationTimestamp
+ );
+ } else if (meta0.initialVacuumObservationTimestamp() +
txnResourceTtl < vacuumObservationTimestamp) {
+ vacuumizedTxnsCount.incrementAndGet();
+ return null;
+ } else {
+ alreadyMarkedTxnsCount.incrementAndGet();
+ return meta0;
+ }
+ } else {
+
skippedFotFurtherProcessingUnfinishedTxnsCount.incrementAndGet();
+ return meta0;
+ }
+ });
+ });
+
+ LOG.info("Vacuum finished [vacuumObservationTimestamp={},
txnResourceTtl={}, vacuumizedTxnsCount={},"
+ + " markedAsInitiallyDetectedTxnsCount={},
alreadyMarkedTxnsCount={}, skippedFotFurtherProcessingUnfinishedTxnsCount={}].",
+ vacuumObservationTimestamp,
+ txnResourceTtl,
+ vacuumizedTxnsCount,
+ markedAsInitiallyDetectedTxnsCount,
+ alreadyMarkedTxnsCount,
+ skippedFotFurtherProcessingUnfinishedTxnsCount
+ );
+ }
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index ada690c777..12d5ac964d 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -76,7 +76,6 @@ import
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.PrimaryReplicaExpiredException;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
-import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -146,14 +145,6 @@ public class TxManagerTest extends IgniteAbstractTest {
TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
- ResourceCleanupManager cleanupManager = new ResourceCleanupManager(
- LOCAL_NODE.name(),
- resourceRegistry,
- clusterService.topologyService(),
- clusterService.messagingService(),
- transactionInflights
- );
-
txManager = new TxManagerImpl(
txConfiguration,
clusterService,
@@ -165,7 +156,6 @@ public class TxManagerTest extends IgniteAbstractTest {
idleSafeTimePropagationPeriodMsSupplier,
localRwTxCounter,
resourceRegistry,
- cleanupManager,
transactionInflights
);