This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bdde52ff81 IGNITE-20395 Clean up write intents for RW transaction on
primary (#2679)
bdde52ff81 is described below
commit bdde52ff81dfc7f114148bfad62fdb235055f52d
Author: Cyrill <[email protected]>
AuthorDate: Fri Oct 20 10:31:29 2023 +0300
IGNITE-20395 Clean up write intents for RW transaction on primary (#2679)
---
...xDistributedTestSingleNodeNoCleanupMessage.java | 17 ---
.../replicator/PartitionReplicaListener.java | 117 +++++++++++++++++----
2 files changed, 99 insertions(+), 35 deletions(-)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 9591e0c604..f7451b9167 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -72,9 +72,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends ItTxDistribut
/** A list of background cleanup futures. */
private final List<CompletableFuture<?>> cleanupFutures = new
CopyOnWriteArrayList<>();
- /** A flag to drop async cleanup actions. */
- private volatile boolean ignoreAsyncCleanup;
-
/**
* The constructor.
*
@@ -115,9 +112,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends ItTxDistribut
) {
@Override
public CompletableFuture<Void>
executeCleanupAsync(Runnable runnable) {
- if (ignoreAsyncCleanup) {
- return completedFuture(null);
- }
CompletableFuture<Void> cleanupFuture =
super.executeCleanupAsync(runnable);
cleanupFutures.add(cleanupFuture);
@@ -213,14 +207,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends ItTxDistribut
super.testTransactionAlreadyCommitted();
}
- @Disabled("IGNITE-20395")
- @Test
- @Override
- public void testBalance() throws InterruptedException {
- super.testBalance();
- }
-
- @Disabled("IGNITE-20395")
@Test
public void testTwoReadWriteTransactions() throws TransactionException {
Tuple key = makeKey(1);
@@ -228,9 +214,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends ItTxDistribut
assertFalse(accounts.recordView().delete(null, key));
assertNull(accounts.recordView().get(null, key));
- // Disable background cleanup to avoid a race.
- ignoreAsyncCleanup = true;
-
InternalTransaction tx1 = (InternalTransaction)
igniteTransactions.begin();
accounts.recordView().upsert(tx1, makeValue(1, 100.));
tx1.commit();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 745f83a2b9..c7b238fcfa 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -177,6 +177,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
private static final int ATTEMPTS_TO_CLEANUP_REPLICA = 5;
+ private static final CompletableFuture<?> COMPLETED_EMPTY =
completedFuture(null);
+
/** Factory to create RAFT command messages. */
private static final TableMessagesFactory MSG_FACTORY = new
TableMessagesFactory();
@@ -232,6 +234,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
private final ConcurrentMap<UUID, TxCleanupReadyFutureList>
txCleanupReadyFutures = new ConcurrentHashMap<>();
+ /** Cleanup futures. */
+ private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap
= new ConcurrentHashMap<>();
+
private final SchemaCompatValidator schemaCompatValidator;
/** Instance of the local node. */
@@ -1880,6 +1885,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
Map<UUID, BinaryRowMessage> rowIdsToDelete = new
HashMap<>();
// TODO:IGNITE-20669 Replace the result to BitSet.
Collection<BinaryRow> result = new ArrayList<>();
+ List<RowId> rows = new ArrayList<>();
for (int i = 0; i < searchRows.size(); i++) {
RowId lockedRowId = deleteExactLockFuts[i].join();
@@ -1888,6 +1894,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
rowIdsToDelete.put(lockedRowId.uuid(), null);
result.add(new NullBinaryRow());
+
+ rows.add(lockedRowId);
} else {
result.add(null);
}
@@ -1898,6 +1906,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return
validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(catalogVersion -> awaitCleanup(rows,
catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateAllCommand(
request,
@@ -2013,11 +2022,14 @@ public class PartitionReplicaListener implements
ReplicaListener {
return allOf(rowIdFuts).thenCompose(ignore -> {
List<BinaryRowMessage> searchRowMessages =
request.binaryRowMessages();
Map<UUID, BinaryRowMessage> rowsToUpdate =
IgniteUtils.newHashMap(searchRowMessages.size());
+ List<RowId> rows = new ArrayList<>();
for (int i = 0; i < searchRowMessages.size(); i++) {
RowId lockedRow = rowIdFuts[i].join().get1();
rowsToUpdate.put(lockedRow.uuid(),
searchRowMessages.get(i));
+
+ rows.add(lockedRow);
}
if (rowsToUpdate.isEmpty()) {
@@ -2025,6 +2037,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return
validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(catalogVersion -> awaitCleanup(rows,
catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateAllCommand(
request,
@@ -2121,6 +2134,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
Map<UUID, BinaryRowMessage> rowIdsToDelete = new
HashMap<>();
// TODO:IGNITE-20669 Replace the result to BitSet.
Collection<BinaryRow> result = new ArrayList<>();
+ List<RowId> rows = new ArrayList<>();
for (CompletableFuture<RowId> lockFut : rowIdLockFuts) {
RowId lockedRowId = lockFut.join();
@@ -2128,6 +2142,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (lockedRowId != null) {
rowIdsToDelete.put(lockedRowId.uuid(), null);
+ rows.add(lockedRowId);
+
result.add(new NullBinaryRow());
} else {
result.add(null);
@@ -2139,6 +2155,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return
validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(catalogVersion -> awaitCleanup(rows,
catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateAllCommand(
rowIdsToDelete,
@@ -2251,7 +2268,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
true,
null,
null,
- cmd.lastCommitTimestamp());
+ null);
updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
}
@@ -2276,7 +2293,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
false,
null,
cmd.safeTime(),
- cmd.lastCommitTimestamp());
+ null);
return null;
});
@@ -2362,7 +2379,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
true,
null,
null,
- cmd.lastCommitTimestamps());
+ emptyMap());
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
}
@@ -2389,7 +2406,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
true,
null,
null,
- cmd.lastCommitTimestamps());
+ emptyMap());
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
}
@@ -2412,7 +2429,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
false,
null,
cmd.safeTime(),
- cmd.lastCommitTimestamps());
+ emptyMap());
return null;
});
@@ -2495,6 +2512,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return
validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(catalogVersion ->
awaitCleanup(validatedRowId, catalogVersion))
.thenCompose(
catalogVersion ->
applyUpdateCommand(
request,
@@ -2550,6 +2568,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return lockFut
.thenCompose(rowIdLock ->
validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(catalogVersion ->
awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion ->
applyUpdateCommand(
request,
@@ -2581,6 +2600,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return lockFut
.thenCompose(rowIdLock ->
validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(catalogVersion ->
awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion ->
applyUpdateCommand(
request,
@@ -2608,6 +2628,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForUpdate(searchRow, rowId, txId)
.thenCompose(rowIdLock ->
validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(catalogVersion ->
awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion ->
applyUpdateCommand(
request,
@@ -2635,6 +2656,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForUpdate(searchRow, rowId, txId)
.thenCompose(rowIdLock ->
validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(catalogVersion ->
awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion ->
applyUpdateCommand(
request,
@@ -2696,6 +2718,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForDelete(row, rowId, txId)
.thenCompose(rowLock ->
validateOperationAgainstSchema(request.transactionId()))
+ .thenCompose(catalogVersion -> awaitCleanup(rowId,
catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
request.commitPartitionId().asTablePartitionId(),
@@ -2719,6 +2742,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForDelete(row, rowId, txId)
.thenCompose(ignored ->
validateOperationAgainstSchema(request.transactionId()))
+ .thenCompose(catalogVersion -> awaitCleanup(rowId,
catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
request.commitPartitionId().asTablePartitionId(),
@@ -2741,6 +2765,48 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
}
+ /**
+ * Wait for the async cleanup of the provided row to finish.
+ *
+ * @param rowId Row Ids of existing row that the transaction affects.
+ * @param result The value that the returned future will wrap.
+ *
+ * @param <T> Type of the {@code result}.
+ */
+ private <T> CompletableFuture<T> awaitCleanup(@Nullable RowId rowId, T
result) {
+ return (rowId == null ? COMPLETED_EMPTY :
rowCleanupMap.getOrDefault(rowId, COMPLETED_EMPTY))
+ .thenApply(ignored -> result);
+ }
+
+ /**
+ * Wait for the async cleanup of the provided rows to finish.
+ *
+ * @param rowIds Row Ids of existing rows that the transaction affects.
+ * @param result The value that the returned future will wrap.
+ *
+ * @param <T> Type of the {@code result}.
+ */
+ private <T> CompletableFuture<T> awaitCleanup(Collection<RowId> rowIds, T
result) {
+ if (rowCleanupMap.isEmpty()) {
+ return completedFuture(result);
+ }
+
+ List<CompletableFuture<?>> list = new ArrayList<>(rowIds.size());
+
+ for (RowId rowId : rowIds) {
+ CompletableFuture<?> completableFuture = rowCleanupMap.get(rowId);
+ if (completableFuture != null) {
+ list.add(completableFuture);
+ }
+ }
+ if (list.isEmpty()) {
+ return completedFuture(result);
+ }
+
+ return allOf(list.toArray(new CompletableFuture[0]))
+ .thenApply(unused -> result);
+ }
+
/**
* Extracts a binary tuple corresponding to a part of the row comprised of
PK columns.
*
@@ -2920,6 +2986,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return validateOperationAgainstSchema(txId)
+ .thenCompose(catalogVersion ->
awaitCleanup(rowIdLock.get1(), catalogVersion))
.thenCompose(
catalogVersion ->
applyUpdateCommand(
commitPartitionId.asTablePartitionId(),
@@ -3085,13 +3152,21 @@ public class PartitionReplicaListener implements
ReplicaListener {
return inBusyLockAsync(busyLock, () ->
resolveWriteIntentReadability(readResult, timestamp)
.thenApply(writeIntentReadable ->
- inBusyLock(busyLock, () ->
- // TODO: If the write intent is
readable - it's either committed or aborted.
- // Local (primary) cleanup should be
done first.
- //
https://issues.apache.org/jira/browse/IGNITE-20395
- // Once the cleanup is performed, the
timestamp should be added to
- // `new
TimedBinaryRow(readResult.binaryRow())`.
- writeIntentReadable ? new
TimedBinaryRow(readResult.binaryRow()) : lastCommitted.get()
+ inBusyLock(busyLock, () -> {
+ if (writeIntentReadable) {
+ // Even though this readResult
is still a write intent entry in the storage
+ // (therefore it contains
txId), we already know it relates to a committed transaction
+ // and will be cleaned up by
an asynchronous task
+ // started in
scheduleTransactionRowAsyncCleanup().
+ // So it's safe to assume that
that this is the latest committed entry.
+ HybridTimestamp
commitTimestamp =
+
txManager.stateMeta(readResult.transactionId()).commitTimestamp();
+
+ return new
TimedBinaryRow(readResult.binaryRow(), commitTimestamp);
+ }
+
+ return lastCommitted.get();
+ }
)
)
);
@@ -3122,13 +3197,19 @@ public class PartitionReplicaListener implements
ReplicaListener {
// Both normal cleanup and single row cleanup are using
txsPendingRowIds map to store write intents.
// So we don't need a separate method to handle single row case.
- txManager.executeCleanupAsync(() ->
- inBusyLock(busyLock, () ->
storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED,
commitTimestamp))
- ).exceptionally(e -> {
- LOG.warn("Failed to complete transaction cleanup command [txId=" +
txId + ']', e);
-
- return null;
+ CompletableFuture<?> future = rowCleanupMap.computeIfAbsent(rowId, k
-> {
+ // The cleanup for this row has already been triggered. For
example, we are resolving a write intent for an RW transaction
+ // and a concurrent RO transaction resolves the same row, hence
computeIfAbsent.
+ return txManager.executeCleanupAsync(() ->
+ inBusyLock(busyLock, () ->
storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED,
commitTimestamp))
+ ).whenComplete((unused, e) -> {
+ if (e != null) {
+ LOG.warn("Failed to complete transaction cleanup command
[txId=" + txId + ']', e);
+ }
+ });
});
+
+ future.handle((v, e) -> rowCleanupMap.remove(rowId, future));
}
/**