This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4291a209a21 IGNITE-27721 Replace TX cleanup futures with counter-based 
inflights tracker (#7565)
4291a209a21 is described below

commit 4291a209a210ecba93167de0945f021cb0a9382b
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Feb 20 17:40:27 2026 +0200

    IGNITE-27721 Replace TX cleanup futures with counter-based inflights 
tracker (#7565)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |   6 +-
 .../partition/replicator/FuturesCleanupResult.java |  24 +-
 ...xDistributedTestSingleNodeNoCleanupMessage.java |  14 +-
 .../internal/table/ItOperationRetryTest.java       |   2 +-
 .../replicator/PartitionReplicaListener.java       | 319 ++++++++++-----------
 .../replication/PartitionReplicaListenerTest.java  |   5 +-
 .../ZonePartitionReplicaListenerTest.java          |   5 +-
 .../org/apache/ignite/internal/tx/TxManager.java   |   8 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |   5 +-
 9 files changed, 188 insertions(+), 200 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 139c939ae3b..4955eb80d04 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -230,8 +232,8 @@ public class FakeTxManager implements TxManager {
     }
 
     @Override
-    public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable 
runnable) {
-        return CompletableFuture.runAsync(runnable);
+    public Executor writeIntentSwitchExecutor() {
+        return ForkJoinPool.commonPool();
     }
 
     @Override
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/FuturesCleanupResult.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/FuturesCleanupResult.java
index 6996b41c39c..e913fef6d74 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/FuturesCleanupResult.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/FuturesCleanupResult.java
@@ -21,30 +21,14 @@ package org.apache.ignite.internal.partition.replicator;
  * Contains result of cleanup futures await.
  */
 public class FuturesCleanupResult {
-    private final boolean hadReadFutures;
-    private final boolean hadUpdateFutures;
-    private final boolean forceCleanup;
+    private final boolean cleanupNeeded;
 
     /** Constructor. */
-    public FuturesCleanupResult(boolean hadReadFutures, boolean 
hadUpdateFutures, boolean forceCleanup) {
-        this.hadReadFutures = hadReadFutures;
-        this.hadUpdateFutures = hadUpdateFutures;
-        this.forceCleanup = forceCleanup;
-    }
-
-    public boolean hadReadFutures() {
-        return hadReadFutures;
-    }
-
-    public boolean hadUpdateFutures() {
-        return hadUpdateFutures;
-    }
-
-    public boolean forceCleanup() {
-        return forceCleanup;
+    public FuturesCleanupResult(boolean cleanupNeeded) {
+        this.cleanupNeeded = cleanupNeeded;
     }
 
     public boolean shouldApplyWriteIntent() {
-        return hadUpdateFutures() || forceCleanup();
+        return cleanupNeeded;
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index b3cc95884c5..f332642e032 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.distributed;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static 
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -159,12 +160,11 @@ public class 
ItTxDistributedTestSingleNodeNoCleanupMessage extends TxAbstractTes
                         new TestMetricManager()
                 ) {
                     @Override
-                    public CompletableFuture<Void> 
executeWriteIntentSwitchAsync(Runnable runnable) {
-                        CompletableFuture<Void> cleanupFuture = 
super.executeWriteIntentSwitchAsync(runnable);
-
-                        cleanupFutures.add(cleanupFuture);
-
-                        return cleanupFuture;
+                    public Executor writeIntentSwitchExecutor() {
+                        return r -> {
+                            CompletableFuture<Void> cleanupFuture = 
runAsync(r, super.writeIntentSwitchExecutor());
+                            cleanupFutures.add(cleanupFuture);
+                        };
                     }
                 };
             }
@@ -231,7 +231,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                                     txManager.lockManager()
                             );
 
-                            FuturesCleanupResult cleanupResult = new 
FuturesCleanupResult(false, false, false);
+                            FuturesCleanupResult cleanupResult = new 
FuturesCleanupResult(false);
                             return completedFuture(new 
ReplicaResult(cleanupResult, null));
                         }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
index 657c744e0aa..147eab42b50 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
@@ -167,7 +167,7 @@ public class ItOperationRetryTest extends 
ClusterPerTestIntegrationTest {
         txCoordInhibitor.stopInhibit();
         startNode(leaseholderNodeIdx);
 
-        // Waiting unitl lease will be granted for the group and the 
coordinator should be able to get the primary replica for the retry.
+        // Waiting until lease will be granted for the group and the 
coordinator should be able to get the primary replica for the retry.
         waitAndGetPrimaryReplica(transactionCoordinatorNode,  
partitionGroupId);
 
         // Finally we expect the upsert will succeed eventually because of 
retry with new leaseholder enlisted.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 26872817b0d..ff0706ad805 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
@@ -78,6 +79,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
@@ -288,7 +291,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
     private final Supplier<Map<Integer, IndexLocker>> indexesLockers;
 
-    private final ConcurrentMap<UUID, TxCleanupReadyFutureList> 
txCleanupReadyFutures = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, TxCleanupReadyState> 
txCleanupReadyFutures = new ConcurrentHashMap<>();
 
     /** Cleanup futures. */
     private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap 
= new ConcurrentHashMap<>();
@@ -502,11 +505,9 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         if (request instanceof ReadWriteSingleRowReplicaRequest) {
             var req = (ReadWriteSingleRowReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
-
             return appendTxCommand(
                     req.transactionId(),
-                    opId,
+                    req.requestType(),
                     req.full(),
                     () -> processSingleEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -514,11 +515,9 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
             var req = (ReadWriteSingleRowPkReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
-
             return appendTxCommand(
                     req.transactionId(),
-                    opId,
+                    req.requestType(),
                     req.full(),
                     () -> processSingleEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -526,11 +525,9 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
             var req = (ReadWriteMultiRowReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
-
             return appendTxCommand(
                     req.transactionId(),
-                    opId,
+                    req.requestType(),
                     req.full(),
                     () -> processMultiEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -538,11 +535,9 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
             var req = (ReadWriteMultiRowPkReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
-
             return appendTxCommand(
                     req.transactionId(),
-                    opId,
+                    req.requestType(),
                     req.full(),
                     () -> processMultiEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -550,11 +545,9 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
             var req = (ReadWriteSwapRowReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
-
             return appendTxCommand(
                     req.transactionId(),
-                    opId,
+                    req.requestType(),
                     req.full(),
                     () -> processTwoEntriesAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -573,10 +566,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                     .txLabel(req.txLabel())
                     .build());
 
-            var opId = new OperationId(senderId, req.timestamp().longValue(), 
RW_SCAN);
-
             // Implicit RW scan can be committed locally on a last batch or 
error.
-            return appendTxCommand(req.transactionId(), opId, false, () -> 
processScanRetrieveBatchAction(req))
+            return appendTxCommand(req.transactionId(), RW_SCAN, false, () -> 
processScanRetrieveBatchAction(req))
                     .thenCompose(rows -> {
                         if (allElementsAreNull(rows)) {
                             return completedFuture(rows);
@@ -1430,7 +1421,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             ));
         }
 
-        return awaitCleanupReadyFutures(request.txId(), request.commit())
+        return awaitCleanupReadyFutures(request.txId())
                 .thenApply(res -> {
                     if (res.shouldApplyWriteIntent()) {
                         applyWriteIntentSwitchCommandLocally(request);
@@ -1440,42 +1431,36 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                 });
     }
 
-    private CompletableFuture<FuturesCleanupResult> 
awaitCleanupReadyFutures(UUID txId, boolean commit) {
-        List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
-        List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
+    private CompletableFuture<FuturesCleanupResult> 
awaitCleanupReadyFutures(UUID txId) {
+        AtomicBoolean cleanupNeeded = new AtomicBoolean(true);
+        AtomicReference<CompletableFuture<Void>> cleanupReadyFutureRef = new 
AtomicReference<>(nullCompletedFuture());
 
-        AtomicBoolean forceCleanup = new AtomicBoolean(true);
-
-        txCleanupReadyFutures.compute(txId, (id, txOps) -> {
-            if (txOps == null) {
-                return null;
-            }
-
-            // Cleanup futures (both read and update) are empty in two cases:
+        txCleanupReadyFutures.compute(txId, (id, txCleanupState) -> {
+            // Cleanup operations (both read and update) aren't registered in 
two cases:
             // - there were no actions in the transaction
             // - write intent switch is being executed on the new primary (the 
primary has changed after write intent appeared)
             // Both cases are expected to happen extremely rarely so we are 
fine to force the write intent switch.
 
             // The reason for the forced switch is that otherwise write 
intents would not be switched (if there is no volatile state and
-            // FuturesCleanupResult.hadUpdateFutures() returns false).
-            forceCleanup.set(txOps.isEmpty());
+            // txCleanupState.hadWrites() returns false).
+            boolean forceCleanup = txCleanupState == null || 
!txCleanupState.hadAnyOperations();
 
-            txOps.futures.forEach((opId, future) -> {
-                if (opId.requestType.isRwRead()) {
-                    txReadFutures.add(future);
-                } else {
-                    txUpdateFutures.add(future);
-                }
-            });
+            if (txCleanupState == null) {
+                return null;
+            }
+
+            cleanupNeeded.set(txCleanupState.hadWrites() || forceCleanup);
 
-            txOps.clear();
+            CompletableFuture<Void> fut = 
txCleanupState.lockAndAwaitInflights();
+            cleanupReadyFutureRef.set(fut);
 
-            return null;
+            return txCleanupState;
         });
 
-        return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId)
-                .thenCompose(v -> allOfFuturesExceptionIgnored(txReadFutures, 
commit, txId))
-                .thenApply(v -> new 
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty(), 
forceCleanup.get()));
+        return cleanupReadyFutureRef.get()
+                .thenApplyAsync(v -> new 
FuturesCleanupResult(cleanupNeeded.get()), 
txManager.writeIntentSwitchExecutor())
+                // TODO https://issues.apache.org/jira/browse/IGNITE-27904 
proper cleanup.
+                .whenComplete((v, e) -> txCleanupReadyFutures.remove(txId));
     }
 
     private void 
applyWriteIntentSwitchCommandLocally(WriteIntentSwitchReplicaRequestBase 
request) {
@@ -1487,25 +1472,6 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         );
     }
 
-    /**
-     * Creates a future that waits all transaction operations are completed.
-     *
-     * @param txFutures Transaction operation futures.
-     * @param commit If {@code true} this is a commit otherwise a rollback.
-     * @param txId Transaction id.
-     * @return The future completes when all futures in passed list are 
completed.
-     */
-    private static CompletableFuture<Void> 
allOfFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures, boolean 
commit, UUID txId) {
-        return allOf(txFutures.toArray(new CompletableFuture<?>[0]))
-                .exceptionally(e -> {
-                    assert !commit :
-                            "Transaction is committing, but an operation has 
completed with exception [txId=" + txId
-                                    + ", err=" + e.getMessage() + ']';
-
-                    return null;
-                });
-    }
-
     private void releaseTxLocks(UUID txId) {
         lockManager.releaseAll(txId);
     }
@@ -1582,14 +1548,14 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
      * Appends an operation to prevent the race between commit/rollback and 
the operation execution.
      *
      * @param txId Transaction id.
-     * @param opId Operation id.
+     * @param requestType Request type.
      * @param full {@code True} if a full transaction and can be immediately 
committed.
      * @param op Operation closure.
      * @return A future object representing the result of the given operation.
      */
     private <T> CompletableFuture<T> appendTxCommand(
             UUID txId,
-            OperationId opId,
+            RequestType requestType,
             boolean full,
             Supplier<CompletableFuture<T>> op
     ) {
@@ -1600,30 +1566,30 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             });
         }
 
-        var cleanupReadyFut = new CompletableFuture<Void>();
+        AtomicBoolean inflightStarted = new AtomicBoolean(false);
 
-        txCleanupReadyFutures.compute(txId, (id, txOps) -> {
+        TxCleanupReadyState txCleanupReadyState = 
txCleanupReadyFutures.compute(txId, (id, txCleanupState) -> {
             // First check whether the transaction has already been finished.
             // And complete cleanupReadyFut with exception if it is the case.
             TxStateMeta txStateMeta = txManager.stateMeta(txId);
 
             if (txStateMeta == null || isFinalState(txStateMeta.txState()) || 
txStateMeta.txState() == FINISHING) {
-                cleanupReadyFut.completeExceptionally(new Exception());
-
-                return txOps;
+                // Don't start inflight.
+                return txCleanupState;
             }
 
-            // Otherwise collect cleanupReadyFut in the transaction's futures.
-            if (txOps == null) {
-                txOps = new TxCleanupReadyFutureList();
+            // Otherwise start new inflight in txCleanupState.
+            if (txCleanupState == null) {
+                txCleanupState = new TxCleanupReadyState();
             }
 
-            txOps.putOrReplaceFuture(opId, cleanupReadyFut);
+            boolean started = txCleanupState.startInflight(requestType);
+            inflightStarted.set(started);
 
-            return txOps;
+            return txCleanupState;
         });
 
-        if (cleanupReadyFut.isCompletedExceptionally()) {
+        if (!inflightStarted.get()) {
             TxStateMeta txStateMeta = txManager.stateMeta(txId);
 
             TxState txState = txStateMeta == null ? null : 
txStateMeta.txState();
@@ -1639,26 +1605,25 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
         CompletableFuture<T> fut = op.get();
 
+        // If inflightStarted then txCleanupReadyState is not null.
+        requireNonNull(txCleanupReadyState, "txCleanupReadyState cannot be 
null here.");
+
         fut.whenComplete((v, th) -> {
             if (th != null) {
-                cleanupReadyFut.completeExceptionally(th);
+                txCleanupReadyState.completeInflight(txId);
             } else {
                 if (v instanceof ReplicaResult) {
                     ReplicaResult res = (ReplicaResult) v;
 
                     if (res.applyResult().replicationFuture() != null) {
                         
res.applyResult().replicationFuture().whenComplete((v0, th0) -> {
-                            if (th0 != null) {
-                                cleanupReadyFut.completeExceptionally(th0);
-                            } else {
-                                cleanupReadyFut.complete(null);
-                            }
+                            txCleanupReadyState.completeInflight(txId);
                         });
                     } else {
-                        cleanupReadyFut.complete(null);
+                        txCleanupReadyState.completeInflight(txId);
                     }
                 } else {
-                    cleanupReadyFut.complete(null);
+                    txCleanupReadyState.completeInflight(txId);
                 }
             }
         });
@@ -3458,19 +3423,24 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             // and a concurrent RO transaction resolves the same row, hence 
computeIfAbsent.
 
             // We don't need to take the partition snapshots read lock, see 
#INTERNAL_DOC_PLACEHOLDER why.
-            return txManager.executeWriteIntentSwitchAsync(() -> 
inBusyLock(busyLock,
-                    () -> storageUpdateHandler.switchWriteIntents(
-                            txId,
-                            txState == COMMITTED,
-                            commitTimestamp,
-                            indexIdsAtRwTxBeginTsOrNull(txId)
+            return runAsync(
+                            () -> inBusyLock(
+                                    busyLock,
+                                    () -> 
storageUpdateHandler.switchWriteIntents(
+                                            txId,
+                                            txState == COMMITTED,
+                                            commitTimestamp,
+                                            indexIdsAtRwTxBeginTsOrNull(txId)
+                                    )
+                            ),
+                            txManager.writeIntentSwitchExecutor()
                     )
-            )).whenComplete((unused, e) -> {
-                if (e != null && 
!ReplicatorRecoverableExceptions.isRecoverable(e)) {
-                    LOG.warn("Failed to complete transaction cleanup command 
{}", e,
-                            formatTxInfo(txId, txManager));
-                }
-            });
+                    .whenComplete((unused, e) -> {
+                        if (e != null && 
!ReplicatorRecoverableExceptions.isRecoverable(e)) {
+                            LOG.warn("Failed to complete transaction cleanup 
command {}", e,
+                                    formatTxInfo(txId, txManager));
+                        }
+                    });
         });
 
         future.handle((v, e) -> rowCleanupMap.remove(rowId, future));
@@ -3647,22 +3617,103 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
     }
 
     /**
-     * Class that stores a list of futures for operations that has happened in 
a specific transaction. Also, the class has a property
-     * {@code state} that represents a transaction state.
+     * Class that stores a counter of inflight operations for a transaction.
+     *
+     * <p>Synchronization model:
+     * <ul>
+     *     <li>{@code hadAnyOperations}, {@code hadWrites} — plain fields, 
only accessed inside {@code compute()} critical section.</li>
+     *     <li>{@code inflightOperationsCount} — {@link AtomicInteger}, 
cross-thread safe.</li>
+     *     <li>{@code completionFuture} — volatile, written from {@code 
compute()}, read cross-thread.
+     *         Non-null value also serves as the "locked" indicator (no new 
inflights accepted).</li>
+     * </ul>
      */
-    private static class TxCleanupReadyFutureList {
-        final Map<OperationId, CompletableFuture<?>> futures = new HashMap<>();
+    private static class TxCleanupReadyState {
+        // Only accessed inside compute() critical section.
+        boolean hadAnyOperations = false;
+        boolean hadWrites = false;
+
+        final AtomicInteger inflightOperationsCount = new AtomicInteger(0);
+
+        // Non-null means locked (no new inflights accepted). Written from 
compute(), read cross-thread.
+        volatile CompletableFuture<Void> completionFuture = null;
 
-        boolean isEmpty() {
-            return futures.isEmpty();
+        // Should be called inside critical section on transaction.
+        boolean hadAnyOperations() {
+            return hadAnyOperations;
         }
 
-        void clear() {
-            futures.clear();
+        // Should be called inside critical section on transaction.
+        boolean hadWrites() {
+            return hadWrites;
         }
 
-        void putOrReplaceFuture(OperationId opId, CompletableFuture<?> fut) {
-            futures.put(opId, fut);
+        // Should be called inside critical section on transaction.
+        CompletableFuture<Void> lockAndAwaitInflights() {
+            CompletableFuture<Void> f = completionFuture;
+
+            if (f != null) {
+                return f; // Already locked.
+            }
+
+            if (inflightOperationsCount.get() == 0) {
+                f = nullCompletedFuture();
+                completionFuture = f;
+                return f;
+            }
+
+            f = new CompletableFuture<>();
+            completionFuture = f;
+
+            // Recheck: a cross-thread completeInflight() may have decremented 
to 0
+            // before seeing completionFuture != null.
+            if (inflightOperationsCount.get() == 0) {
+                f.complete(null);
+            }
+
+            return f;
+        }
+
+        // Should be called inside critical section on transaction.
+        boolean startInflight(RequestType requestType) {
+            if (completionFuture != null) {
+                return false;
+            }
+
+            hadAnyOperations = true;
+
+            if (requestType.isWrite()) {
+                hadWrites = true;
+            }
+
+            inflightOperationsCount.incrementAndGet();
+
+            return true;
+        }
+
+        // Cross-thread.
+        void completeInflight(UUID txId) {
+            int remaining = inflightOperationsCount.decrementAndGet();
+
+            if (remaining < 0) {
+                LOG.error("Removed inflight when there were no inflights 
[txId={}]", txId);
+            }
+
+            if (remaining == 0) {
+                completeFutureIfAny();
+            }
+        }
+
+        private void completeFutureIfAny() {
+            CompletableFuture<Void> f = completionFuture;
+
+            if (f == null || f.isDone()) {
+                return;
+            }
+
+            // Double check inflightOperationsCount after locked, because we 
are outside of critical section.
+            if (inflightOperationsCount.get() == 0) {
+                f.complete(null);
+            }
         }
     }
 
@@ -3874,58 +3925,4 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
     public void cleanupLocally(UUID txId, boolean commit, @Nullable 
HybridTimestamp commitTimestamp) {
         storageUpdateHandler.switchWriteIntents(txId, commit, commitTimestamp, 
null);
     }
-
-    /**
-     * Operation unique identifier.
-     */
-    private static class OperationId {
-        /** Operation node initiator id. */
-        private final UUID initiatorId;
-
-        /** Timestamp. */
-        private final long ts;
-
-        /** Request typ. */
-        private final RequestType requestType;
-
-        /**
-         * The constructor.
-         *
-         * @param initiatorId Sender node id.
-         * @param ts Timestamp.
-         */
-        public OperationId(UUID initiatorId, long ts, RequestType requestType) 
{
-            this.initiatorId = initiatorId;
-            this.ts = ts;
-            this.requestType = requestType;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            OperationId that = (OperationId) o;
-
-            if (ts != that.ts) {
-                return false;
-            }
-            if (requestType != that.requestType) {
-                return false;
-            }
-            return initiatorId.equals(that.initiatorId);
-        }
-
-        @Override
-        public int hashCode() {
-            int result = initiatorId.hashCode();
-            result = 31 * result + Long.hashCode(ts);
-            result = 31 * result + requestType.hashCode();
-            return result;
-        }
-    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 8fc68517c4c..b284ce1a622 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -105,6 +105,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -2027,7 +2028,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             return null;
         }).when(txManager).updateTxMeta(any(), any());
 
-        doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
+        Executor wise = Runnable::run;
+
+        doAnswer(invocation -> 
wise).when(txManager).writeIntentSwitchExecutor();
 
         doAnswer(invocation -> nullCompletedFuture())
                 .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), anyBoolean(), any(), any());
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index ecffcacf052..ecc3139b0ce 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -85,6 +85,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -964,7 +965,9 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
             return null;
         }).when(txManager).updateTxMeta(any(), any());
 
-        doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
+        Executor wise = Runnable::run;
+
+        doAnswer(invocation -> 
wise).when(txManager).writeIntentSwitchExecutor();
 
         doAnswer(invocation -> nullCompletedFuture())
                 .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), anyBoolean(), any(), any());
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 49c9d151b41..e2e2cb6f619 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -159,12 +160,11 @@ public interface TxManager extends IgniteComponent {
     LockManager lockManager();
 
     /**
-     * Execute write intent switch asynchronously.
+     * Executor that writes intent switch asynchronously.
      *
-     * @param runnable Write intent switch action.
-     * @return Future that completes once the write intent switch action 
finishes.
+     * @return Executor.
      */
-    CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable runnable);
+    Executor writeIntentSwitchExecutor();
 
     /**
      * Finishes a one-phase committed transaction. This method doesn't contain 
any distributed communication.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 573f855f6ae..0e4dcb9e88c 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -21,7 +21,6 @@ import static java.lang.Math.toIntExact;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.function.Function.identity;
@@ -1199,8 +1198,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
     }
 
     @Override
-    public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable 
runnable) {
-        return runAsync(runnable, writeIntentSwitchPool);
+    public Executor writeIntentSwitchExecutor() {
+        return writeIntentSwitchPool;
     }
 
     void onCompleteReadOnlyTransaction(boolean commitIntent, TxIdAndTimestamp 
txIdAndTimestamp) {


Reply via email to