This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bdde52ff81 IGNITE-20395 Clean up write intents for RW transaction on 
primary (#2679)
bdde52ff81 is described below

commit bdde52ff81dfc7f114148bfad62fdb235055f52d
Author: Cyrill <[email protected]>
AuthorDate: Fri Oct 20 10:31:29 2023 +0300

    IGNITE-20395 Clean up write intents for RW transaction on primary (#2679)
---
 ...xDistributedTestSingleNodeNoCleanupMessage.java |  17 ---
 .../replicator/PartitionReplicaListener.java       | 117 +++++++++++++++++----
 2 files changed, 99 insertions(+), 35 deletions(-)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 9591e0c604..f7451b9167 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -72,9 +72,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends ItTxDistribut
     /** A list of background cleanup futures. */
     private final List<CompletableFuture<?>> cleanupFutures = new 
CopyOnWriteArrayList<>();
 
-    /** A flag to drop async cleanup actions.  */
-    private volatile boolean ignoreAsyncCleanup;
-
     /**
      * The constructor.
      *
@@ -115,9 +112,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends ItTxDistribut
                 ) {
                     @Override
                     public CompletableFuture<Void> 
executeCleanupAsync(Runnable runnable) {
-                        if (ignoreAsyncCleanup) {
-                            return completedFuture(null);
-                        }
                         CompletableFuture<Void> cleanupFuture = 
super.executeCleanupAsync(runnable);
 
                         cleanupFutures.add(cleanupFuture);
@@ -213,14 +207,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends ItTxDistribut
         super.testTransactionAlreadyCommitted();
     }
 
-    @Disabled("IGNITE-20395")
-    @Test
-    @Override
-    public void testBalance() throws InterruptedException {
-        super.testBalance();
-    }
-
-    @Disabled("IGNITE-20395")
     @Test
     public void testTwoReadWriteTransactions() throws TransactionException {
         Tuple key = makeKey(1);
@@ -228,9 +214,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends ItTxDistribut
         assertFalse(accounts.recordView().delete(null, key));
         assertNull(accounts.recordView().get(null, key));
 
-        // Disable background cleanup to avoid a race.
-        ignoreAsyncCleanup = true;
-
         InternalTransaction tx1 = (InternalTransaction) 
igniteTransactions.begin();
         accounts.recordView().upsert(tx1, makeValue(1, 100.));
         tx1.commit();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 745f83a2b9..c7b238fcfa 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -177,6 +177,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private static final int ATTEMPTS_TO_CLEANUP_REPLICA = 5;
 
+    private static final CompletableFuture<?> COMPLETED_EMPTY = 
completedFuture(null);
+
     /** Factory to create RAFT command messages. */
     private static final TableMessagesFactory MSG_FACTORY = new 
TableMessagesFactory();
 
@@ -232,6 +234,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private final ConcurrentMap<UUID, TxCleanupReadyFutureList> 
txCleanupReadyFutures = new ConcurrentHashMap<>();
 
+    /** Cleanup futures. */
+    private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap 
= new ConcurrentHashMap<>();
+
     private final SchemaCompatValidator schemaCompatValidator;
 
     /** Instance of the local node. */
@@ -1880,6 +1885,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     Map<UUID, BinaryRowMessage> rowIdsToDelete = new 
HashMap<>();
                     // TODO:IGNITE-20669 Replace the result to BitSet.
                     Collection<BinaryRow> result = new ArrayList<>();
+                    List<RowId> rows = new ArrayList<>();
 
                     for (int i = 0; i < searchRows.size(); i++) {
                         RowId lockedRowId = deleteExactLockFuts[i].join();
@@ -1888,6 +1894,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             rowIdsToDelete.put(lockedRowId.uuid(), null);
 
                             result.add(new NullBinaryRow());
+
+                            rows.add(lockedRowId);
                         } else {
                             result.add(null);
                         }
@@ -1898,6 +1906,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     return 
validateOperationAgainstSchema(request.transactionId())
+                            .thenCompose(catalogVersion -> awaitCleanup(rows, 
catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateAllCommand(
                                             request,
@@ -2013,11 +2022,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 return allOf(rowIdFuts).thenCompose(ignore -> {
                     List<BinaryRowMessage> searchRowMessages = 
request.binaryRowMessages();
                     Map<UUID, BinaryRowMessage> rowsToUpdate = 
IgniteUtils.newHashMap(searchRowMessages.size());
+                    List<RowId> rows = new ArrayList<>();
 
                     for (int i = 0; i < searchRowMessages.size(); i++) {
                         RowId lockedRow = rowIdFuts[i].join().get1();
 
                         rowsToUpdate.put(lockedRow.uuid(), 
searchRowMessages.get(i));
+
+                        rows.add(lockedRow);
                     }
 
                     if (rowsToUpdate.isEmpty()) {
@@ -2025,6 +2037,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     return 
validateOperationAgainstSchema(request.transactionId())
+                            .thenCompose(catalogVersion -> awaitCleanup(rows, 
catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateAllCommand(
                                             request,
@@ -2121,6 +2134,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     Map<UUID, BinaryRowMessage> rowIdsToDelete = new 
HashMap<>();
                     // TODO:IGNITE-20669 Replace the result to BitSet.
                     Collection<BinaryRow> result = new ArrayList<>();
+                    List<RowId> rows = new ArrayList<>();
 
                     for (CompletableFuture<RowId> lockFut : rowIdLockFuts) {
                         RowId lockedRowId = lockFut.join();
@@ -2128,6 +2142,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         if (lockedRowId != null) {
                             rowIdsToDelete.put(lockedRowId.uuid(), null);
 
+                            rows.add(lockedRowId);
+
                             result.add(new NullBinaryRow());
                         } else {
                             result.add(null);
@@ -2139,6 +2155,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     return 
validateOperationAgainstSchema(request.transactionId())
+                            .thenCompose(catalogVersion -> awaitCleanup(rows, 
catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateAllCommand(
                                             rowIdsToDelete,
@@ -2251,7 +2268,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 true,
                                 null,
                                 null,
-                                cmd.lastCommitTimestamp());
+                                null);
 
                         updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
                     }
@@ -2276,7 +2293,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             false,
                             null,
                             cmd.safeTime(),
-                            cmd.lastCommitTimestamp());
+                            null);
 
                     return null;
                 });
@@ -2362,7 +2379,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     true,
                                     null,
                                     null,
-                                    cmd.lastCommitTimestamps());
+                                    emptyMap());
 
                             
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
                         }
@@ -2389,7 +2406,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     true,
                                     null,
                                     null,
-                                    cmd.lastCommitTimestamps());
+                                    emptyMap());
 
                             
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
                         }
@@ -2412,7 +2429,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             false,
                             null,
                             cmd.safeTime(),
-                            cmd.lastCommitTimestamps());
+                            emptyMap());
 
                     return null;
                 });
@@ -2495,6 +2512,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 }
 
                                 return 
validateOperationAgainstSchema(request.transactionId())
+                                        .thenCompose(catalogVersion -> 
awaitCleanup(validatedRowId, catalogVersion))
                                         .thenCompose(
                                                 catalogVersion -> 
applyUpdateCommand(
                                                         request,
@@ -2550,6 +2568,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(rowIdLock -> 
validateOperationAgainstSchema(request.transactionId())
+                                    .thenCompose(catalogVersion -> 
awaitCleanup(rowId, catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> 
applyUpdateCommand(
                                                     request,
@@ -2581,6 +2600,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(rowIdLock -> 
validateOperationAgainstSchema(request.transactionId())
+                                    .thenCompose(catalogVersion -> 
awaitCleanup(rowId, catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> 
applyUpdateCommand(
                                                     request,
@@ -2608,6 +2628,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(rowIdLock -> 
validateOperationAgainstSchema(request.transactionId())
+                                    .thenCompose(catalogVersion -> 
awaitCleanup(rowId, catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> 
applyUpdateCommand(
                                                     request,
@@ -2635,6 +2656,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(rowIdLock -> 
validateOperationAgainstSchema(request.transactionId())
+                                    .thenCompose(catalogVersion -> 
awaitCleanup(rowId, catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> 
applyUpdateCommand(
                                                     request,
@@ -2696,6 +2718,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForDelete(row, rowId, txId)
                             .thenCompose(rowLock -> 
validateOperationAgainstSchema(request.transactionId()))
+                            .thenCompose(catalogVersion -> awaitCleanup(rowId, 
catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateCommand(
                                             
request.commitPartitionId().asTablePartitionId(),
@@ -2719,6 +2742,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForDelete(row, rowId, txId)
                             .thenCompose(ignored -> 
validateOperationAgainstSchema(request.transactionId()))
+                            .thenCompose(catalogVersion -> awaitCleanup(rowId, 
catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateCommand(
                                             
request.commitPartitionId().asTablePartitionId(),
@@ -2741,6 +2765,48 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         }
     }
 
+    /**
+     *  Wait for the async cleanup of the provided row to finish.
+     *
+     * @param rowId Row Ids of existing row that the transaction affects.
+     * @param result The value that the returned future will wrap.
+     *
+     * @param <T> Type of the {@code result}.
+     */
+    private <T> CompletableFuture<T> awaitCleanup(@Nullable RowId rowId, T 
result) {
+        return (rowId == null ? COMPLETED_EMPTY : 
rowCleanupMap.getOrDefault(rowId, COMPLETED_EMPTY))
+                .thenApply(ignored -> result);
+    }
+
+    /**
+     * Wait for the async cleanup of the provided rows to finish.
+     *
+     * @param rowIds Row Ids of existing rows that the transaction affects.
+     * @param result The value that the returned future will wrap.
+     *
+     * @param <T> Type of the {@code result}.
+     */
+    private <T> CompletableFuture<T> awaitCleanup(Collection<RowId> rowIds, T 
result) {
+        if (rowCleanupMap.isEmpty()) {
+            return completedFuture(result);
+        }
+
+        List<CompletableFuture<?>> list = new ArrayList<>(rowIds.size());
+
+        for (RowId rowId : rowIds) {
+            CompletableFuture<?> completableFuture = rowCleanupMap.get(rowId);
+            if (completableFuture != null) {
+                list.add(completableFuture);
+            }
+        }
+        if (list.isEmpty()) {
+            return completedFuture(result);
+        }
+
+        return allOf(list.toArray(new CompletableFuture[0]))
+                .thenApply(unused -> result);
+    }
+
     /**
      * Extracts a binary tuple corresponding to a part of the row comprised of 
PK columns.
      *
@@ -2920,6 +2986,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             }
 
                             return validateOperationAgainstSchema(txId)
+                                    .thenCompose(catalogVersion -> 
awaitCleanup(rowIdLock.get1(), catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> 
applyUpdateCommand(
                                                     
commitPartitionId.asTablePartitionId(),
@@ -3085,13 +3152,21 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return inBusyLockAsync(busyLock, () ->
                 resolveWriteIntentReadability(readResult, timestamp)
                         .thenApply(writeIntentReadable ->
-                                inBusyLock(busyLock, () ->
-                                        // TODO: If the write intent is 
readable - it's either committed or aborted.
-                                        //  Local (primary) cleanup should be 
done first.
-                                        //  
https://issues.apache.org/jira/browse/IGNITE-20395
-                                        //  Once the cleanup is performed, the 
timestamp should be added to
-                                        //  `new 
TimedBinaryRow(readResult.binaryRow())`.
-                                        writeIntentReadable ? new 
TimedBinaryRow(readResult.binaryRow()) : lastCommitted.get()
+                                inBusyLock(busyLock, () -> {
+                                            if (writeIntentReadable) {
+                                                // Even though this readResult 
is still a write intent entry in the storage
+                                                // (therefore it contains 
txId), we already know it relates to a committed transaction
+                                                // and will be cleaned up by 
an asynchronous task
+                                                // started in 
scheduleTransactionRowAsyncCleanup().
+                                                // So it's safe to assume that 
that this is the latest committed entry.
+                                                HybridTimestamp 
commitTimestamp =
+                                                        
txManager.stateMeta(readResult.transactionId()).commitTimestamp();
+
+                                                return new 
TimedBinaryRow(readResult.binaryRow(), commitTimestamp);
+                                            }
+
+                                            return lastCommitted.get();
+                                        }
                                 )
                         )
         );
@@ -3122,13 +3197,19 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         // Both normal cleanup and single row cleanup are using 
txsPendingRowIds map to store write intents.
         // So we don't need a separate method to handle single row case.
-        txManager.executeCleanupAsync(() ->
-                inBusyLock(busyLock, () -> 
storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED, 
commitTimestamp))
-        ).exceptionally(e -> {
-            LOG.warn("Failed to complete transaction cleanup command [txId=" + 
txId + ']', e);
-
-            return null;
+        CompletableFuture<?> future = rowCleanupMap.computeIfAbsent(rowId, k 
-> {
+            // The cleanup for this row has already been triggered. For 
example, we are resolving a write intent for an RW transaction
+            // and a concurrent RO transaction resolves the same row, hence 
computeIfAbsent.
+            return txManager.executeCleanupAsync(() ->
+                    inBusyLock(busyLock, () -> 
storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED, 
commitTimestamp))
+            ).whenComplete((unused, e) -> {
+                if (e != null) {
+                    LOG.warn("Failed to complete transaction cleanup command 
[txId=" + txId + ']', e);
+                }
+            });
         });
+
+        future.handle((v, e) -> rowCleanupMap.remove(rowId, future));
     }
 
     /**

Reply via email to