This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new eb5651679b IGNITE-22288 Fix the vacuum of persistent tx state before
tx cleanup completion (#3799)
eb5651679b is described below
commit eb5651679ba708b9f01ef34d9b1683b49dae22dd
Author: Denis Chudov <[email protected]>
AuthorDate: Thu May 30 09:29:55 2024 +0300
IGNITE-22288 Fix the vacuum of persistent tx state before tx cleanup
completion (#3799)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 4 +-
.../internal/table/ItTxResourcesVacuumTest.java | 1 +
.../replicator/PartitionReplicaListener.java | 14 +++--
.../replication/PartitionReplicaListenerTest.java | 8 +--
.../org/apache/ignite/internal/tx/TxManager.java | 7 ++-
.../tx/impl/PersistentTxStateVacuumizer.java | 63 +++++++++++++++++++---
.../internal/tx/impl/TxCleanupRequestSender.java | 48 +++++++++++------
.../ignite/internal/tx/impl/TxManagerImpl.java | 10 ++--
.../tx/impl/VolatileTxStateMetaStorage.java | 30 +++++++----
.../apache/ignite/internal/tx/TxCleanupTest.java | 6 +--
10 files changed, 140 insertions(+), 51 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 53af23ec3d..dc03d55bf6 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -190,6 +190,7 @@ public class FakeTxManager implements TxManager {
@Override
public CompletableFuture<Void> cleanup(
+ TablePartitionId commitPartitionId,
Map<TablePartitionId, String> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -200,6 +201,7 @@ public class FakeTxManager implements TxManager {
@Override
public CompletableFuture<Void> cleanup(
+ TablePartitionId commitPartitionId,
Collection<TablePartitionId> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -209,7 +211,7 @@ public class FakeTxManager implements TxManager {
}
@Override
- public CompletableFuture<Void> cleanup(String node, UUID txId) {
+ public CompletableFuture<Void> cleanup(TablePartitionId commitPartitionId,
String node, UUID txId) {
return nullCompletedFuture();
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index cd40ce6168..56ac729434 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -800,6 +800,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
if (expected == null) {
assertNull(actual);
} else {
+ assertNotNull(actual);
assertEquals(expected.stringValue("val"),
actual.stringValue("val"));
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index f2b1e092b7..b38ddbc09c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -421,6 +421,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
txManager.cleanup(
+ replicationGroupId,
txMeta.enlistedPartitions(),
txMeta.txState() == COMMITTED,
txMeta.commitTimestamp(),
@@ -507,7 +508,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
// Check whether a transaction has already been finished.
if (txMeta != null && isFinalState(txMeta.txState())) {
- return runCleanupOnNode(txId, senderId);
+ // Tx recovery message is processed on the commit partition.
+ return runCleanupOnNode(replicationGroupId, txId, senderId);
}
LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].",
txId, txMeta);
@@ -518,14 +520,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
/**
* Run cleanup on a node.
*
+ * @param commitPartitionId Commit partition id.
* @param txId Transaction id.
* @param nodeId Node id (inconsistent).
*/
- private CompletableFuture<Void> runCleanupOnNode(UUID txId, String nodeId)
{
+ private CompletableFuture<Void> runCleanupOnNode(TablePartitionId
commitPartitionId, UUID txId, String nodeId) {
// Get node id of the sender to send back cleanup requests.
String nodeConsistentId =
clusterNodeResolver.getConsistentIdById(nodeId);
- return nodeConsistentId == null ? nullCompletedFuture() :
txManager.cleanup(nodeConsistentId, txId);
+ return nodeConsistentId == null ? nullCompletedFuture() :
txManager.cleanup(commitPartitionId, nodeConsistentId, txId);
}
/**
@@ -540,13 +543,14 @@ public class PartitionReplicaListener implements
ReplicaListener {
// is sent in a common durable manner to a partition that have
initiated recovery.
return txManager.finish(
new HybridTimestampTracker(),
+ // Tx recovery is executed on the commit partition.
replicationGroupId,
false,
// Enlistment consistency token is not required for
the rollback, so it is 0L.
Map.of(replicationGroupId, new
IgniteBiTuple<>(clusterNodeResolver.getById(senderId), 0L)),
txId
)
- .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId));
+ .whenComplete((v, ex) -> runCleanupOnNode(replicationGroupId,
txId, senderId));
}
/**
@@ -1694,7 +1698,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return finishTransaction(enlistedPartitions.keySet(), txId, commit,
commitTimestamp)
.thenCompose(txResult ->
- txManager.cleanup(enlistedPartitions, commit,
commitTimestamp, txId)
+ txManager.cleanup(replicationGroupId,
enlistedPartitions, commit, commitTimestamp, txId)
.thenApply(v -> txResult)
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 015285ba95..73ec92e0d4 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -1669,7 +1669,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private CompletableFuture<?> beginAndAbortTx() {
- when(txManager.cleanup(any(Map.class), anyBoolean(), any(),
any())).thenReturn(nullCompletedFuture());
+ when(txManager.cleanup(any(), any(Map.class), anyBoolean(), any(),
any())).thenReturn(nullCompletedFuture());
HybridTimestamp beginTimestamp = clock.now();
UUID txId = transactionIdFor(beginTimestamp);
@@ -1731,7 +1731,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private CompletableFuture<?> beginAndCommitTx() {
- when(txManager.cleanup(any(Map.class), anyBoolean(), any(),
any())).thenReturn(nullCompletedFuture());
+ when(txManager.cleanup(any(), any(Map.class), anyBoolean(), any(),
any())).thenReturn(nullCompletedFuture());
HybridTimestamp beginTimestamp = clock.now();
UUID txId = transactionIdFor(beginTimestamp);
@@ -2072,7 +2072,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
doAnswer(invocation ->
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
doAnswer(invocation ->
nullCompletedFuture()).when(txManager).finish(any(), any(), anyBoolean(),
any(), any());
- doAnswer(invocation ->
nullCompletedFuture()).when(txManager).cleanup(anyString(), any());
+ doAnswer(invocation ->
nullCompletedFuture()).when(txManager).cleanup(any(), anyString(), any());
}
private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType
requestType, RwListenerInvocation listenerInvocation) {
@@ -2473,7 +2473,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.thenReturn(List.of(
tableSchema(CURRENT_SCHEMA_VERSION,
List.of(nullableColumn("col")))
));
- when(txManager.cleanup(any(Map.class), anyBoolean(), any(),
any())).thenReturn(nullCompletedFuture());
+ when(txManager.cleanup(any(), any(Map.class), anyBoolean(), any(),
any())).thenReturn(nullCompletedFuture());
AtomicReference<Boolean> committed = interceptFinishTxCommand();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index badc077651..de59c157a4 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -145,6 +145,7 @@ public interface TxManager extends IgniteComponent {
*
* <p>The nodes to send the request to are taken from the mapping
`partition id -> partition primary`.
*
+ * @param commitPartitionId Commit partition id.
* @param enlistedPartitions Map of partition groups to their primary
nodes.
* @param commit {@code true} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -152,6 +153,7 @@ public interface TxManager extends IgniteComponent {
* @return Completable future of Void.
*/
CompletableFuture<Void> cleanup(
+ TablePartitionId commitPartitionId,
Map<TablePartitionId, String> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -163,6 +165,7 @@ public interface TxManager extends IgniteComponent {
*
* <p>The nodes to sends the request to are calculated by the placement
driver.
*
+ * @param commitPartitionId Commit partition id.
* @param enlistedPartitions Enlisted partition groups.
* @param commit {@code true} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -170,6 +173,7 @@ public interface TxManager extends IgniteComponent {
* @return Completable future of Void.
*/
CompletableFuture<Void> cleanup(
+ TablePartitionId commitPartitionId,
Collection<TablePartitionId> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -179,11 +183,12 @@ public interface TxManager extends IgniteComponent {
/**
* Sends cleanup request to the nodes than initiated recovery.
*
+ * @param commitPartitionId Commit partition id.
* @param node Target node.
* @param txId Transaction id.
* @return Completable future of Void.
*/
- CompletableFuture<Void> cleanup(String node, UUID txId);
+ CompletableFuture<Void> cleanup(TablePartitionId commitPartitionId, String
node, UUID txId);
/**
* Locally vacuums no longer needed transactional resources, like txnState
both persistent and volatile.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
index 52beab9d7d..4ef589a24f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -17,17 +17,20 @@
package org.apache.ignite.internal.tx.impl;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -39,6 +42,7 @@ import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissExcepti
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
/**
* Implements the logic of persistent tx states vacuum.
@@ -79,12 +83,13 @@ public class PersistentTxStateVacuumizer {
/**
* Vacuum persistent tx states.
*
- * @param txIds Transaction ids to vacuum; map of commit partition ids to
sets of tx ids.
- * @return A future, result is the set of successfully vacuumized txn
states.
+ * @param txIds Transaction ids to vacuum; map of commit partition ids to
sets of {@link VacuumizableTx}.
+ * @return A future, result is the set of successfully processed txn
states and count of persistent states that were vacuumized.
*/
- public CompletableFuture<Set<UUID>>
vacuumPersistentTxStates(Map<TablePartitionId, Set<UUID>> txIds) {
+ public CompletableFuture<PersistentTxStateVacuumResult>
vacuumPersistentTxStates(Map<TablePartitionId, Set<VacuumizableTx>> txIds) {
Set<UUID> successful = ConcurrentHashMap.newKeySet();
List<CompletableFuture<?>> futures = new ArrayList<>();
+ AtomicInteger vacuumizedPersistentTxnStatesCount = new AtomicInteger();
HybridTimestamp now = clockService.now();
txIds.forEach((commitPartitionId, txs) -> {
@@ -95,15 +100,29 @@ public class PersistentTxStateVacuumizer {
// timestamp) would be updated there, and then this
operation would be called from there.
// Also, we are going to send the vacuum request only
to the local node.
if (replicaMeta != null &&
localNode.id().equals(replicaMeta.getLeaseholderId())) {
+ // Filter txns to define those which persistent
states can be vacuumized.
+ Set<UUID> filteredTxIds = new HashSet<>();
+
+ for (VacuumizableTx v : txs) {
+ if (v.cleanupCompletionTimestamp == null) {
+ // If there is no cleanup completion
timestamp, add the tx id to successful set to remove the
+ // volatile state. Persistent state should
be preserved until tx cleanup is complete.
+ successful.add(v.txId);
+ } else {
+ filteredTxIds.add(v.txId);
+ }
+ }
+
VacuumTxStateReplicaRequest request =
TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
.groupId(commitPartitionId)
- .transactionIds(txs)
+ .transactionIds(filteredTxIds)
.build();
return replicaService.invoke(localNode,
request).whenComplete((v, e) -> {
if (e == null) {
- successful.addAll(txs);
+ successful.addAll(filteredTxIds);
+
vacuumizedPersistentTxnStatesCount.addAndGet(filteredTxIds.size());
// We can log the exceptions without
further handling because failed requests' txns are not added
// to the set of successful and will be
retried. PrimaryReplicaMissException can be considered as
// a part of regular flow and doesn't need
to be logged.
@@ -114,7 +133,7 @@ public class PersistentTxStateVacuumizer {
}
});
} else {
- successful.addAll(txs);
+ successful.addAll(txs.stream().map(v ->
v.txId).collect(toSet()));
return nullCompletedFuture();
}
@@ -124,6 +143,36 @@ public class PersistentTxStateVacuumizer {
});
return allOf(futures.toArray(new CompletableFuture[0]))
- .handle((unused, unusedEx) -> successful);
+ .handle((unused, unusedEx) -> new
PersistentTxStateVacuumResult(successful,
vacuumizedPersistentTxnStatesCount.get()));
+ }
+
+ /**
+ * Class representing the vaccumizable tx context, actually a pair of the
tx id and the cleanup completion timestamp.
+ */
+ public static class VacuumizableTx {
+ final UUID txId;
+
+ @Nullable
+ final Long cleanupCompletionTimestamp;
+
+ VacuumizableTx(UUID txId, @Nullable Long cleanupCompletionTimestamp) {
+ this.txId = txId;
+ this.cleanupCompletionTimestamp = cleanupCompletionTimestamp;
+ }
+ }
+
+ /**
+ * Result of the persistent tx state vacuum operation.
+ */
+ public static class PersistentTxStateVacuumResult {
+ /** Transaction IDs that are processed by the persistent vacuumizer
and can be removed from volatile storage. */
+ final Set<UUID> txnsToVacuum;
+
+ final int vacuumizedPersistentTxnStatesCount;
+
+ public PersistentTxStateVacuumResult(Set<UUID> txnsToVacuum, int
vacuumizedPersistentTxnStatesCount) {
+ this.txnsToVacuum = txnsToVacuum;
+ this.vacuumizedPersistentTxnStatesCount =
vacuumizedPersistentTxnStatesCount;
+ }
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 82cfee4fab..3e434e9b28 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -110,19 +110,19 @@ public class TxCleanupRequestSender {
});
if (ctx != null && ctx.partitions.isEmpty()) {
- markTxnCleanupReplicated(info.txId(), ctx.txState);
+ markTxnCleanupReplicated(info.txId(), ctx.txState,
ctx.commitPartitionId);
writeIntentsReplicated.remove(info.txId());
}
}
- private void markTxnCleanupReplicated(UUID txId, TxState state) {
+ private void markTxnCleanupReplicated(UUID txId, TxState state,
TablePartitionId commitPartitionId) {
long cleanupCompletionTimestamp = System.currentTimeMillis();
txStateVolatileStorage.updateMeta(txId, oldMeta ->
new TxStateMeta(oldMeta == null ? state : oldMeta.txState(),
oldMeta == null ? null : oldMeta.txCoordinatorId(),
- oldMeta == null ? null : oldMeta.commitPartitionId(),
+ commitPartitionId,
oldMeta == null ? null : oldMeta.commitTimestamp(),
oldMeta == null ? null :
oldMeta.initialVacuumObservationTimestamp(),
cleanupCompletionTimestamp)
@@ -132,17 +132,19 @@ public class TxCleanupRequestSender {
/**
* Sends unlock request to the nodes than initiated recovery.
*
+ * @param commitPartitionId Commit partition id.
* @param node Target node.
* @param txId Transaction id.
* @return Completable future of Void.
*/
- public CompletableFuture<Void> cleanup(String node, UUID txId) {
- return sendCleanupMessageWithRetries(false, null, txId, node, null);
+ public CompletableFuture<Void> cleanup(TablePartitionId commitPartitionId,
String node, UUID txId) {
+ return sendCleanupMessageWithRetries(commitPartitionId, false, null,
txId, node, null);
}
/**
* Sends cleanup request to the primary nodes of each one of {@code
partitions}.
*
+ * @param commitPartitionId Commit partition id.
* @param enlistedPartitions Map of enlisted partition group to the
initial primary node.
* @param commit {@code true} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -150,24 +152,29 @@ public class TxCleanupRequestSender {
* @return Completable future of Void.
*/
public CompletableFuture<Void> cleanup(
+ TablePartitionId commitPartitionId,
Map<TablePartitionId, String> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
// Start tracking the partitions we want to learn the replication
confirmation from.
- writeIntentsReplicated.put(txId, new
CleanupContext(enlistedPartitions.keySet(), commit ? TxState.COMMITTED :
TxState.ABORTED));
+ writeIntentsReplicated.put(
+ txId,
+ new CleanupContext(commitPartitionId,
enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED)
+ );
Map<String, Set<TablePartitionId>> partitions = new HashMap<>();
enlistedPartitions.forEach((partitionId, nodeId) ->
partitions.computeIfAbsent(nodeId, node -> new
HashSet<>()).add(partitionId));
- return cleanupPartitions(partitions, commit, commitTimestamp, txId);
+ return cleanupPartitions(commitPartitionId, partitions, commit,
commitTimestamp, txId);
}
/**
* Gets primary nodes for each of the provided {@code partitions} and
sends cleanup request to each one.
*
+ * @param commitPartitionId Commit partition id.
* @param partitionIds Collection of enlisted partition groups.
* @param commit {@code true} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -175,23 +182,29 @@ public class TxCleanupRequestSender {
* @return Completable future of Void.
*/
public CompletableFuture<Void> cleanup(
+ TablePartitionId commitPartitionId,
Collection<TablePartitionId> partitionIds,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
// Start tracking the partitions we want to learn the replication
confirmation from.
- writeIntentsReplicated.put(txId, new CleanupContext(new
HashSet<>(partitionIds), commit ? TxState.COMMITTED : TxState.ABORTED));
+ writeIntentsReplicated.put(
+ txId,
+ new CleanupContext(commitPartitionId, new
HashSet<>(partitionIds), commit ? TxState.COMMITTED : TxState.ABORTED)
+ );
return placementDriverHelper.findPrimaryReplicas(partitionIds)
.thenCompose(partitionData -> {
- cleanupPartitionsWithoutPrimary(commit, commitTimestamp,
txId, partitionData.partitionsWithoutPrimary);
+ cleanupPartitionsWithoutPrimary(commitPartitionId, commit,
commitTimestamp, txId,
+ partitionData.partitionsWithoutPrimary);
- return cleanupPartitions(partitionData.partitionsByNode,
commit, commitTimestamp, txId);
+ return cleanupPartitions(commitPartitionId,
partitionData.partitionsByNode, commit, commitTimestamp, txId);
});
}
private void cleanupPartitionsWithoutPrimary(
+ TablePartitionId commitPartitionId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
@@ -200,10 +213,11 @@ public class TxCleanupRequestSender {
// For the partitions without primary, we need to wait until a new
primary is found.
// Then we can proceed with the common cleanup flow.
placementDriverHelper.awaitPrimaryReplicas(noPrimaryFound)
- .thenCompose(partitionsByNode ->
cleanupPartitions(partitionsByNode, commit, commitTimestamp, txId));
+ .thenCompose(partitionsByNode ->
cleanupPartitions(commitPartitionId, partitionsByNode, commit, commitTimestamp,
txId));
}
private CompletableFuture<Void> cleanupPartitions(
+ TablePartitionId commitPartitionId,
Map<String, Set<TablePartitionId>> partitionsByNode,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -215,13 +229,14 @@ public class TxCleanupRequestSender {
String node = entry.getKey();
Set<TablePartitionId> nodePartitions = entry.getValue();
- cleanupFutures.add(sendCleanupMessageWithRetries(commit,
commitTimestamp, txId, node, nodePartitions));
+
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit,
commitTimestamp, txId, node, nodePartitions));
}
return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
}
private CompletableFuture<Void> sendCleanupMessageWithRetries(
+ TablePartitionId commitPartitionId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
@@ -244,12 +259,12 @@ public class TxCleanupRequestSender {
if (partitions == null) {
// If we don't have any partition, which is
the recovery case,
// just try again with the same node.
- return sendCleanupMessageWithRetries(commit,
commitTimestamp, txId, node, partitions);
+ return
sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId,
node, partitions);
}
// Run a cleanup that finds new primaries for the
given partitions.
// This covers the case when a partition primary
died and we still want to switch write intents.
- return cleanup(partitions, commit,
commitTimestamp, txId);
+ return cleanup(commitPartitionId, partitions,
commit, commitTimestamp, txId);
}
return CompletableFuture.<Void>failedFuture(throwable);
@@ -261,6 +276,7 @@ public class TxCleanupRequestSender {
}
private static class CleanupContext {
+ private final TablePartitionId commitPartitionId;
/**
* The partitions the we have not received write intent replication
confirmation for.
@@ -272,9 +288,9 @@ public class TxCleanupRequestSender {
*/
private final TxState txState;
- private CleanupContext(Set<TablePartitionId> partitions, TxState
txState) {
+ private CleanupContext(TablePartitionId commitPartitionId,
Set<TablePartitionId> partitions, TxState txState) {
+ this.commitPartitionId = commitPartitionId;
this.partitions = partitions;
-
this.txState = txState;
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index e3ee44521b..077920549f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -822,27 +822,29 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
@Override
public CompletableFuture<Void> cleanup(
+ TablePartitionId commitPartitionId,
Map<TablePartitionId, String> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
- return txCleanupRequestSender.cleanup(enlistedPartitions, commit,
commitTimestamp, txId);
+ return txCleanupRequestSender.cleanup(commitPartitionId,
enlistedPartitions, commit, commitTimestamp, txId);
}
@Override
public CompletableFuture<Void> cleanup(
+ TablePartitionId commitPartitionId,
Collection<TablePartitionId> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
- return txCleanupRequestSender.cleanup(enlistedPartitions, commit,
commitTimestamp, txId);
+ return txCleanupRequestSender.cleanup(commitPartitionId,
enlistedPartitions, commit, commitTimestamp, txId);
}
@Override
- public CompletableFuture<Void> cleanup(String node, UUID txId) {
- return txCleanupRequestSender.cleanup(node, txId);
+ public CompletableFuture<Void> cleanup(TablePartitionId commitPartitionId,
String node, UUID txId) {
+ return txCleanupRequestSender.cleanup(commitPartitionId, node, txId);
}
@Override
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
index d0e65a9e43..49ec7bc2da 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
+import
org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.PersistentTxStateVacuumResult;
+import
org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VacuumizableTx;
import org.jetbrains.annotations.Nullable;
/**
@@ -115,11 +117,19 @@ public class VolatileTxStateMetaStorage {
}
/**
- * Locally vacuums no longer needed transactional resource.
+ * Locally vacuums no longer needed transactional resource. Also calls
{@code persistentVacuumOp} to vacuum some persistent states.
* For each finished (COMMITTED or ABORTED) transactions:
* <ol>
- * <li> Removes it from the volatile storage if txnResourcesTTL == 0
or if
- * txnState.initialVacuumObservationTimestamp + txnResourcesTTL <
vacuumObservationTimestamp.</li>
+ * <li> Takes every suitable txn state from the volatile storage if
txnResourcesTTL == 0 or if
+ * max(txnState.initialVacuumObservationTimestamp,
txnState.cleanupCompletionTimestamp) + txnResourcesTTL <
+ * vacuumObservationTimestamp. If txnState.cleanupCompletionTimestamp
is {@code null}, then only
+ * txnState.initialVacuumObservationTimestamp is used.</li>
+ * <li>If txnState.commitPartitionId is {@code null}, then only
volatile state is vacuumized, if not {@code null} this
+ * means we are probably on commit partition and this txnState should
be passed to {@code persistentVacuumOp} to vacuumize the
+ * persistent state as well. Only after such txn states are processed
by {@code persistentVacuumOp} we can remove the volatile
+ * txn state. Only states which are marked by {@code
persistentVacuumOp} as sucessfully vacuumized and
+ * {@code cleanupCompletionTimestamp} is the same that was passed to
{@code persistentVacuumOp} can be removed, to prevent
+ * races.</li>
* <li>Updates txnState.initialVacuumObservationTimestamp by setting
it to vacuumObservationTimestamp
* if it's not already initialized.</li>
* </ol>
@@ -133,7 +143,7 @@ public class VolatileTxStateMetaStorage {
public CompletableFuture<Void> vacuum(
long vacuumObservationTimestamp,
long txnResourceTtl,
- Function<Map<TablePartitionId, Set<UUID>>,
CompletableFuture<Set<UUID>>> persistentVacuumOp
+ Function<Map<TablePartitionId, Set<VacuumizableTx>>,
CompletableFuture<PersistentTxStateVacuumResult>> persistentVacuumOp
) {
LOG.info("Vacuum started [vacuumObservationTimestamp={},
txnResourceTtl={}].", vacuumObservationTimestamp, txnResourceTtl);
@@ -142,7 +152,7 @@ public class VolatileTxStateMetaStorage {
AtomicInteger alreadyMarkedTxnsCount = new AtomicInteger(0);
AtomicInteger skippedForFurtherProcessingUnfinishedTxnsCount = new
AtomicInteger(0);
- Map<TablePartitionId, Set<UUID>> txIds = new HashMap<>();
+ Map<TablePartitionId, Set<VacuumizableTx>> txIds = new HashMap<>();
Map<UUID, Long> cleanupCompletionTimestamps = new HashMap<>();
txStateMap.forEach((txId, meta) -> {
@@ -166,8 +176,8 @@ public class VolatileTxStateMetaStorage {
return null;
} else {
- Set<UUID> ids =
txIds.computeIfAbsent(meta0.commitPartitionId(), k -> new HashSet<>());
- ids.add(txId);
+ Set<VacuumizableTx> ids =
txIds.computeIfAbsent(meta0.commitPartitionId(), k -> new HashSet<>());
+ ids.add(new VacuumizableTx(txId,
cleanupCompletionTimestamp));
if (cleanupCompletionTimestamp != null) {
cleanupCompletionTimestamps.put(txId,
cleanupCompletionTimestamp);
@@ -189,8 +199,8 @@ public class VolatileTxStateMetaStorage {
});
return persistentVacuumOp.apply(txIds)
- .thenAccept(successful -> {
- for (UUID txId : successful) {
+ .thenAccept(vacuumResult -> {
+ for (UUID txId : vacuumResult.txnsToVacuum) {
txStateMap.compute(txId, (k, v) -> {
if (v == null) {
return null;
@@ -218,7 +228,7 @@ public class VolatileTxStateMetaStorage {
vacuumObservationTimestamp,
txnResourceTtl,
vacuumizedTxnsCount,
- successful.size(),
+ vacuumResult.vacuumizedPersistentTxnStatesCount,
markedAsInitiallyDetectedTxnsCount,
alreadyMarkedTxnsCount,
skippedForFurtherProcessingUnfinishedTxnsCount
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index 918f849558..bee98ad32e 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -146,7 +146,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
HybridTimestamp commitTimestamp = clock.now();
- CompletableFuture<Void> cleanup =
cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId);
+ CompletableFuture<Void> cleanup =
cleanupRequestSender.cleanup(tablePartitionId1, partitions, true,
commitTimestamp, txId);
assertThat(cleanup, willCompleteSuccessfully());
@@ -182,7 +182,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
HybridTimestamp commitTimestamp = clock.now();
- CompletableFuture<Void> cleanup =
cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId);
+ CompletableFuture<Void> cleanup =
cleanupRequestSender.cleanup(tablePartitionId1, partitions, true,
commitTimestamp, txId);
assertThat(cleanup, willCompleteSuccessfully());
@@ -216,7 +216,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
HybridTimestamp commitTimestamp = clock.now();
- CompletableFuture<Void> cleanup =
cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId);
+ CompletableFuture<Void> cleanup =
cleanupRequestSender.cleanup(tablePartitionId1, partitions, true,
commitTimestamp, txId);
assertThat(cleanup, willCompleteSuccessfully());