This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e1e208a6ab IGNITE-20953 Move cleanup out of the TX locks (#2870)
e1e208a6ab is described below

commit e1e208a6ab968b54664e975fab6837fa02a74ee6
Author: Cyrill <[email protected]>
AuthorDate: Mon Nov 27 10:59:27 2023 +0300

    IGNITE-20953 Move cleanup out of the TX locks (#2870)
---
 .../replicator/PartitionReplicaListener.java       | 86 ++++++++++++++--------
 1 file changed, 54 insertions(+), 32 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 543967ff77..ed862682fa 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1710,11 +1710,38 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         markFinished(request.txId(), txState, request.commitTimestamp());
 
+        return awaitCleanupReadyFutures(request.txId(), request.commit())
+                .thenCompose(res -> {
+                    if (res.hadUpdateFutures()) {
+                        HybridTimestamp commandTimestamp = hybridClock.now();
+
+                        return reliableCatalogVersionFor(commandTimestamp)
+                                .thenCompose(catalogVersion ->
+                                        applyCleanupCommand(
+                                                request.txId(),
+                                                request.commit(),
+                                                request.commitTimestamp(),
+                                                request.commitTimestampLong(),
+                                                catalogVersion
+                                        ))
+                                .thenApply(unused -> res);
+                    } else {
+                        return completedFuture(res);
+                    }
+                })
+                .thenAccept(res -> {
+                    if (res.hadUpdateFutures() || res.hadReadFutures()) {
+                        releaseTxLocks(request.txId());
+                    }
+                });
+    }
+
+    private CompletableFuture<FuturesCleanupResult> 
awaitCleanupReadyFutures(UUID txId, boolean commit) {
         List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
         List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
 
         // TODO https://issues.apache.org/jira/browse/IGNITE-18617
-        txCleanupReadyFutures.compute(request.txId(), (id, txOps) -> {
+        txCleanupReadyFutures.compute(txId, (id, txOps) -> {
             if (txOps == null) {
                 return null;
             }
@@ -1732,32 +1759,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return txOps;
         });
 
-        if (txUpdateFutures.isEmpty()) {
-            if (!txReadFutures.isEmpty()) {
-                return allOffFuturesExceptionIgnored(txReadFutures, request)
-                        .thenRun(() -> releaseTxLocks(request.txId()));
-            }
-
-            return completedFuture(null);
-        }
-
-        return allOffFuturesExceptionIgnored(txUpdateFutures, 
request).thenCompose(v -> {
-            HybridTimestamp commandTimestamp = hybridClock.now();
-
-            return reliableCatalogVersionFor(commandTimestamp)
-                    .thenCompose(catalogVersion -> {
-                        applyCleanupCommand(
-                                request.txId(),
-                                request.commit(),
-                                request.commitTimestamp(),
-                                request.commitTimestampLong(),
-                                catalogVersion
-                        );
-
-                        return allOffFuturesExceptionIgnored(txReadFutures, 
request)
-                                .thenRun(() -> releaseTxLocks(request.txId()));
-                    });
-        });
+        return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId)
+                .thenCompose(v -> allOfFuturesExceptionIgnored(txReadFutures, 
commit, txId))
+                .thenApply(v -> new 
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty()));
     }
 
     private CompletableFuture<Void> applyCleanupCommand(
@@ -1803,15 +1807,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * Creates a future that waits all transaction operations are completed.
      *
      * @param txFutures Transaction operation futures.
-     * @param request Cleanup request.
+     * @param commit If {@code true} this is a commit otherwise a rollback.
+     * @param txId Transaction id.
      * @return The future completes when all futures in passed list are 
completed.
      */
-    private static CompletableFuture<Void> 
allOffFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures,
-            TxCleanupReplicaRequest request) {
+    private static CompletableFuture<Void> 
allOfFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures, boolean 
commit, UUID txId) {
         return allOf(txFutures.toArray(new CompletableFuture<?>[0]))
                 .exceptionally(e -> {
-                    assert !request.commit() :
-                            "Transaction is committing, but an operation has 
completed with exception [txId=" + request.txId()
+                    assert !commit :
+                            "Transaction is committing, but an operation has 
completed with exception [txId=" + txId
                                     + ", err=" + e.getMessage() + ']';
 
                     return null;
@@ -3757,4 +3761,22 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .finish(request.finish())
                 .build();
     }
+
+    private static class FuturesCleanupResult {
+        private final boolean hadReadFutures;
+        private final boolean hadUpdateFutures;
+
+        public FuturesCleanupResult(boolean hadReadFutures, boolean 
hadUpdateFutures) {
+            this.hadReadFutures = hadReadFutures;
+            this.hadUpdateFutures = hadUpdateFutures;
+        }
+
+        public boolean hadReadFutures() {
+            return hadReadFutures;
+        }
+
+        public boolean hadUpdateFutures() {
+            return hadUpdateFutures;
+        }
+    }
 }

Reply via email to