This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4291a209a21 IGNITE-27721 Replace TX cleanup futures with counter-based
inflights tracker (#7565)
4291a209a21 is described below
commit 4291a209a210ecba93167de0945f021cb0a9382b
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Feb 20 17:40:27 2026 +0200
IGNITE-27721 Replace TX cleanup futures with counter-based inflights
tracker (#7565)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 6 +-
.../partition/replicator/FuturesCleanupResult.java | 24 +-
...xDistributedTestSingleNodeNoCleanupMessage.java | 14 +-
.../internal/table/ItOperationRetryTest.java | 2 +-
.../replicator/PartitionReplicaListener.java | 319 ++++++++++-----------
.../replication/PartitionReplicaListenerTest.java | 5 +-
.../ZonePartitionReplicaListenerTest.java | 5 +-
.../org/apache/ignite/internal/tx/TxManager.java | 8 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 5 +-
9 files changed, 188 insertions(+), 200 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 139c939ae3b..4955eb80d04 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -230,8 +232,8 @@ public class FakeTxManager implements TxManager {
}
@Override
- public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable
runnable) {
- return CompletableFuture.runAsync(runnable);
+ public Executor writeIntentSwitchExecutor() {
+ return ForkJoinPool.commonPool();
}
@Override
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/FuturesCleanupResult.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/FuturesCleanupResult.java
index 6996b41c39c..e913fef6d74 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/FuturesCleanupResult.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/FuturesCleanupResult.java
@@ -21,30 +21,14 @@ package org.apache.ignite.internal.partition.replicator;
* Contains result of cleanup futures await.
*/
public class FuturesCleanupResult {
- private final boolean hadReadFutures;
- private final boolean hadUpdateFutures;
- private final boolean forceCleanup;
+ private final boolean cleanupNeeded;
/** Constructor. */
- public FuturesCleanupResult(boolean hadReadFutures, boolean
hadUpdateFutures, boolean forceCleanup) {
- this.hadReadFutures = hadReadFutures;
- this.hadUpdateFutures = hadUpdateFutures;
- this.forceCleanup = forceCleanup;
- }
-
- public boolean hadReadFutures() {
- return hadReadFutures;
- }
-
- public boolean hadUpdateFutures() {
- return hadUpdateFutures;
- }
-
- public boolean forceCleanup() {
- return forceCleanup;
+ public FuturesCleanupResult(boolean cleanupNeeded) {
+ this.cleanupNeeded = cleanupNeeded;
}
public boolean shouldApplyWriteIntent() {
- return hadUpdateFutures() || forceCleanup();
+ return cleanupNeeded;
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index b3cc95884c5..f332642e032 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.distributed;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -159,12 +160,11 @@ public class
ItTxDistributedTestSingleNodeNoCleanupMessage extends TxAbstractTes
new TestMetricManager()
) {
@Override
- public CompletableFuture<Void>
executeWriteIntentSwitchAsync(Runnable runnable) {
- CompletableFuture<Void> cleanupFuture =
super.executeWriteIntentSwitchAsync(runnable);
-
- cleanupFutures.add(cleanupFuture);
-
- return cleanupFuture;
+ public Executor writeIntentSwitchExecutor() {
+ return r -> {
+ CompletableFuture<Void> cleanupFuture =
runAsync(r, super.writeIntentSwitchExecutor());
+ cleanupFutures.add(cleanupFuture);
+ };
}
};
}
@@ -231,7 +231,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
txManager.lockManager()
);
- FuturesCleanupResult cleanupResult = new
FuturesCleanupResult(false, false, false);
+ FuturesCleanupResult cleanupResult = new
FuturesCleanupResult(false);
return completedFuture(new
ReplicaResult(cleanupResult, null));
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
index 657c744e0aa..147eab42b50 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
@@ -167,7 +167,7 @@ public class ItOperationRetryTest extends
ClusterPerTestIntegrationTest {
txCoordInhibitor.stopInhibit();
startNode(leaseholderNodeIdx);
- // Waiting unitl lease will be granted for the group and the
coordinator should be able to get the primary replica for the retry.
+ // Waiting until lease will be granted for the group and the
coordinator should be able to get the primary replica for the retry.
waitAndGetPrimaryReplica(transactionCoordinatorNode,
partitionGroupId);
// Finally we expect the upsert will succeed eventually because of
retry with new leaseholder enlisted.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 26872817b0d..ff0706ad805 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
@@ -78,6 +79,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -288,7 +291,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
private final Supplier<Map<Integer, IndexLocker>> indexesLockers;
- private final ConcurrentMap<UUID, TxCleanupReadyFutureList>
txCleanupReadyFutures = new ConcurrentHashMap<>();
+ private final ConcurrentMap<UUID, TxCleanupReadyState>
txCleanupReadyFutures = new ConcurrentHashMap<>();
/** Cleanup futures. */
private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap
= new ConcurrentHashMap<>();
@@ -502,11 +505,9 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
-
return appendTxCommand(
req.transactionId(),
- opId,
+ req.requestType(),
req.full(),
() -> processSingleEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -514,11 +515,9 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
} else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
var req = (ReadWriteSingleRowPkReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
-
return appendTxCommand(
req.transactionId(),
- opId,
+ req.requestType(),
req.full(),
() -> processSingleEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -526,11 +525,9 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
} else if (request instanceof ReadWriteMultiRowReplicaRequest) {
var req = (ReadWriteMultiRowReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
-
return appendTxCommand(
req.transactionId(),
- opId,
+ req.requestType(),
req.full(),
() -> processMultiEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -538,11 +535,9 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
} else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
var req = (ReadWriteMultiRowPkReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
-
return appendTxCommand(
req.transactionId(),
- opId,
+ req.requestType(),
req.full(),
() -> processMultiEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -550,11 +545,9 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
} else if (request instanceof ReadWriteSwapRowReplicaRequest) {
var req = (ReadWriteSwapRowReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
-
return appendTxCommand(
req.transactionId(),
- opId,
+ req.requestType(),
req.full(),
() -> processTwoEntriesAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -573,10 +566,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
.txLabel(req.txLabel())
.build());
- var opId = new OperationId(senderId, req.timestamp().longValue(),
RW_SCAN);
-
// Implicit RW scan can be committed locally on a last batch or
error.
- return appendTxCommand(req.transactionId(), opId, false, () ->
processScanRetrieveBatchAction(req))
+ return appendTxCommand(req.transactionId(), RW_SCAN, false, () ->
processScanRetrieveBatchAction(req))
.thenCompose(rows -> {
if (allElementsAreNull(rows)) {
return completedFuture(rows);
@@ -1430,7 +1421,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
));
}
- return awaitCleanupReadyFutures(request.txId(), request.commit())
+ return awaitCleanupReadyFutures(request.txId())
.thenApply(res -> {
if (res.shouldApplyWriteIntent()) {
applyWriteIntentSwitchCommandLocally(request);
@@ -1440,42 +1431,36 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
});
}
- private CompletableFuture<FuturesCleanupResult>
awaitCleanupReadyFutures(UUID txId, boolean commit) {
- List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
- List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
+ private CompletableFuture<FuturesCleanupResult>
awaitCleanupReadyFutures(UUID txId) {
+ AtomicBoolean cleanupNeeded = new AtomicBoolean(true);
+ AtomicReference<CompletableFuture<Void>> cleanupReadyFutureRef = new
AtomicReference<>(nullCompletedFuture());
- AtomicBoolean forceCleanup = new AtomicBoolean(true);
-
- txCleanupReadyFutures.compute(txId, (id, txOps) -> {
- if (txOps == null) {
- return null;
- }
-
- // Cleanup futures (both read and update) are empty in two cases:
+ txCleanupReadyFutures.compute(txId, (id, txCleanupState) -> {
+ // Cleanup operations (both read and update) aren't registered in
two cases:
// - there were no actions in the transaction
// - write intent switch is being executed on the new primary (the
primary has changed after write intent appeared)
// Both cases are expected to happen extremely rarely so we are
fine to force the write intent switch.
// The reason for the forced switch is that otherwise write
intents would not be switched (if there is no volatile state and
- // FuturesCleanupResult.hadUpdateFutures() returns false).
- forceCleanup.set(txOps.isEmpty());
+ // txCleanupState.hadWrites() returns false).
+ boolean forceCleanup = txCleanupState == null ||
!txCleanupState.hadAnyOperations();
- txOps.futures.forEach((opId, future) -> {
- if (opId.requestType.isRwRead()) {
- txReadFutures.add(future);
- } else {
- txUpdateFutures.add(future);
- }
- });
+ if (txCleanupState == null) {
+ return null;
+ }
+
+ cleanupNeeded.set(txCleanupState.hadWrites() || forceCleanup);
- txOps.clear();
+ CompletableFuture<Void> fut =
txCleanupState.lockAndAwaitInflights();
+ cleanupReadyFutureRef.set(fut);
- return null;
+ return txCleanupState;
});
- return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId)
- .thenCompose(v -> allOfFuturesExceptionIgnored(txReadFutures,
commit, txId))
- .thenApply(v -> new
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty(),
forceCleanup.get()));
+ return cleanupReadyFutureRef.get()
+ .thenApplyAsync(v -> new
FuturesCleanupResult(cleanupNeeded.get()),
txManager.writeIntentSwitchExecutor())
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27904
proper cleanup.
+ .whenComplete((v, e) -> txCleanupReadyFutures.remove(txId));
}
private void
applyWriteIntentSwitchCommandLocally(WriteIntentSwitchReplicaRequestBase
request) {
@@ -1487,25 +1472,6 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
);
}
- /**
- * Creates a future that waits all transaction operations are completed.
- *
- * @param txFutures Transaction operation futures.
- * @param commit If {@code true} this is a commit otherwise a rollback.
- * @param txId Transaction id.
- * @return The future completes when all futures in passed list are
completed.
- */
- private static CompletableFuture<Void>
allOfFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures, boolean
commit, UUID txId) {
- return allOf(txFutures.toArray(new CompletableFuture<?>[0]))
- .exceptionally(e -> {
- assert !commit :
- "Transaction is committing, but an operation has
completed with exception [txId=" + txId
- + ", err=" + e.getMessage() + ']';
-
- return null;
- });
- }
-
private void releaseTxLocks(UUID txId) {
lockManager.releaseAll(txId);
}
@@ -1582,14 +1548,14 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
* Appends an operation to prevent the race between commit/rollback and
the operation execution.
*
* @param txId Transaction id.
- * @param opId Operation id.
+ * @param requestType Request type.
* @param full {@code True} if a full transaction and can be immediately
committed.
* @param op Operation closure.
* @return A future object representing the result of the given operation.
*/
private <T> CompletableFuture<T> appendTxCommand(
UUID txId,
- OperationId opId,
+ RequestType requestType,
boolean full,
Supplier<CompletableFuture<T>> op
) {
@@ -1600,30 +1566,30 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
});
}
- var cleanupReadyFut = new CompletableFuture<Void>();
+ AtomicBoolean inflightStarted = new AtomicBoolean(false);
- txCleanupReadyFutures.compute(txId, (id, txOps) -> {
+ TxCleanupReadyState txCleanupReadyState =
txCleanupReadyFutures.compute(txId, (id, txCleanupState) -> {
// First check whether the transaction has already been finished.
// And complete cleanupReadyFut with exception if it is the case.
TxStateMeta txStateMeta = txManager.stateMeta(txId);
if (txStateMeta == null || isFinalState(txStateMeta.txState()) ||
txStateMeta.txState() == FINISHING) {
- cleanupReadyFut.completeExceptionally(new Exception());
-
- return txOps;
+ // Don't start inflight.
+ return txCleanupState;
}
- // Otherwise collect cleanupReadyFut in the transaction's futures.
- if (txOps == null) {
- txOps = new TxCleanupReadyFutureList();
+ // Otherwise start new inflight in txCleanupState.
+ if (txCleanupState == null) {
+ txCleanupState = new TxCleanupReadyState();
}
- txOps.putOrReplaceFuture(opId, cleanupReadyFut);
+ boolean started = txCleanupState.startInflight(requestType);
+ inflightStarted.set(started);
- return txOps;
+ return txCleanupState;
});
- if (cleanupReadyFut.isCompletedExceptionally()) {
+ if (!inflightStarted.get()) {
TxStateMeta txStateMeta = txManager.stateMeta(txId);
TxState txState = txStateMeta == null ? null :
txStateMeta.txState();
@@ -1639,26 +1605,25 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
CompletableFuture<T> fut = op.get();
+ // If inflightStarted then txCleanupReadyState is not null.
+ requireNonNull(txCleanupReadyState, "txCleanupReadyState cannot be
null here.");
+
fut.whenComplete((v, th) -> {
if (th != null) {
- cleanupReadyFut.completeExceptionally(th);
+ txCleanupReadyState.completeInflight(txId);
} else {
if (v instanceof ReplicaResult) {
ReplicaResult res = (ReplicaResult) v;
if (res.applyResult().replicationFuture() != null) {
res.applyResult().replicationFuture().whenComplete((v0, th0) -> {
- if (th0 != null) {
- cleanupReadyFut.completeExceptionally(th0);
- } else {
- cleanupReadyFut.complete(null);
- }
+ txCleanupReadyState.completeInflight(txId);
});
} else {
- cleanupReadyFut.complete(null);
+ txCleanupReadyState.completeInflight(txId);
}
} else {
- cleanupReadyFut.complete(null);
+ txCleanupReadyState.completeInflight(txId);
}
}
});
@@ -3458,19 +3423,24 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
// and a concurrent RO transaction resolves the same row, hence
computeIfAbsent.
// We don't need to take the partition snapshots read lock, see
#INTERNAL_DOC_PLACEHOLDER why.
- return txManager.executeWriteIntentSwitchAsync(() ->
inBusyLock(busyLock,
- () -> storageUpdateHandler.switchWriteIntents(
- txId,
- txState == COMMITTED,
- commitTimestamp,
- indexIdsAtRwTxBeginTsOrNull(txId)
+ return runAsync(
+ () -> inBusyLock(
+ busyLock,
+ () ->
storageUpdateHandler.switchWriteIntents(
+ txId,
+ txState == COMMITTED,
+ commitTimestamp,
+ indexIdsAtRwTxBeginTsOrNull(txId)
+ )
+ ),
+ txManager.writeIntentSwitchExecutor()
)
- )).whenComplete((unused, e) -> {
- if (e != null &&
!ReplicatorRecoverableExceptions.isRecoverable(e)) {
- LOG.warn("Failed to complete transaction cleanup command
{}", e,
- formatTxInfo(txId, txManager));
- }
- });
+ .whenComplete((unused, e) -> {
+ if (e != null &&
!ReplicatorRecoverableExceptions.isRecoverable(e)) {
+ LOG.warn("Failed to complete transaction cleanup
command {}", e,
+ formatTxInfo(txId, txManager));
+ }
+ });
});
future.handle((v, e) -> rowCleanupMap.remove(rowId, future));
@@ -3647,22 +3617,103 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
}
/**
- * Class that stores a list of futures for operations that has happened in
a specific transaction. Also, the class has a property
- * {@code state} that represents a transaction state.
+ * Class that stores a counter of inflight operations for a transaction.
+ *
+ * <p>Synchronization model:
+ * <ul>
+ * <li>{@code hadAnyOperations}, {@code hadWrites} — plain fields,
only accessed inside {@code compute()} critical section.</li>
+ * <li>{@code inflightOperationsCount} — {@link AtomicInteger},
cross-thread safe.</li>
+ * <li>{@code completionFuture} — volatile, written from {@code
compute()}, read cross-thread.
+ * Non-null value also serves as the "locked" indicator (no new
inflights accepted).</li>
+ * </ul>
*/
- private static class TxCleanupReadyFutureList {
- final Map<OperationId, CompletableFuture<?>> futures = new HashMap<>();
+ private static class TxCleanupReadyState {
+ // Only accessed inside compute() critical section.
+ boolean hadAnyOperations = false;
+ boolean hadWrites = false;
+
+ final AtomicInteger inflightOperationsCount = new AtomicInteger(0);
+
+ // Non-null means locked (no new inflights accepted). Written from
compute(), read cross-thread.
+ volatile CompletableFuture<Void> completionFuture = null;
- boolean isEmpty() {
- return futures.isEmpty();
+ // Should be called inside critical section on transaction.
+ boolean hadAnyOperations() {
+ return hadAnyOperations;
}
- void clear() {
- futures.clear();
+ // Should be called inside critical section on transaction.
+ boolean hadWrites() {
+ return hadWrites;
}
- void putOrReplaceFuture(OperationId opId, CompletableFuture<?> fut) {
- futures.put(opId, fut);
+ // Should be called inside critical section on transaction.
+ CompletableFuture<Void> lockAndAwaitInflights() {
+ CompletableFuture<Void> f = completionFuture;
+
+ if (f != null) {
+ return f; // Already locked.
+ }
+
+ if (inflightOperationsCount.get() == 0) {
+ f = nullCompletedFuture();
+ completionFuture = f;
+ return f;
+ }
+
+ f = new CompletableFuture<>();
+ completionFuture = f;
+
+ // Recheck: a cross-thread completeInflight() may have decremented
to 0
+ // before seeing completionFuture != null.
+ if (inflightOperationsCount.get() == 0) {
+ f.complete(null);
+ }
+
+ return f;
+ }
+
+ // Should be called inside critical section on transaction.
+ boolean startInflight(RequestType requestType) {
+ if (completionFuture != null) {
+ return false;
+ }
+
+ hadAnyOperations = true;
+
+ if (requestType.isWrite()) {
+ hadWrites = true;
+ }
+
+ inflightOperationsCount.incrementAndGet();
+
+ return true;
+ }
+
+ // Cross-thread.
+ void completeInflight(UUID txId) {
+ int remaining = inflightOperationsCount.decrementAndGet();
+
+ if (remaining < 0) {
+ LOG.error("Removed inflight when there were no inflights
[txId={}]", txId);
+ }
+
+ if (remaining == 0) {
+ completeFutureIfAny();
+ }
+ }
+
+ private void completeFutureIfAny() {
+ CompletableFuture<Void> f = completionFuture;
+
+ if (f == null || f.isDone()) {
+ return;
+ }
+
+ // Double check inflightOperationsCount after locked, because we
are outside of critical section.
+ if (inflightOperationsCount.get() == 0) {
+ f.complete(null);
+ }
}
}
@@ -3874,58 +3925,4 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
public void cleanupLocally(UUID txId, boolean commit, @Nullable
HybridTimestamp commitTimestamp) {
storageUpdateHandler.switchWriteIntents(txId, commit, commitTimestamp,
null);
}
-
- /**
- * Operation unique identifier.
- */
- private static class OperationId {
- /** Operation node initiator id. */
- private final UUID initiatorId;
-
- /** Timestamp. */
- private final long ts;
-
- /** Request typ. */
- private final RequestType requestType;
-
- /**
- * The constructor.
- *
- * @param initiatorId Sender node id.
- * @param ts Timestamp.
- */
- public OperationId(UUID initiatorId, long ts, RequestType requestType)
{
- this.initiatorId = initiatorId;
- this.ts = ts;
- this.requestType = requestType;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- OperationId that = (OperationId) o;
-
- if (ts != that.ts) {
- return false;
- }
- if (requestType != that.requestType) {
- return false;
- }
- return initiatorId.equals(that.initiatorId);
- }
-
- @Override
- public int hashCode() {
- int result = initiatorId.hashCode();
- result = 31 * result + Long.hashCode(ts);
- result = 31 * result + requestType.hashCode();
- return result;
- }
- }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 8fc68517c4c..b284ce1a622 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -105,6 +105,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -2027,7 +2028,9 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
return null;
}).when(txManager).updateTxMeta(any(), any());
- doAnswer(invocation ->
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
+ Executor wise = Runnable::run;
+
+ doAnswer(invocation ->
wise).when(txManager).writeIntentSwitchExecutor();
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), any(), any());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index ecffcacf052..ecc3139b0ce 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -85,6 +85,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -964,7 +965,9 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
return null;
}).when(txManager).updateTxMeta(any(), any());
- doAnswer(invocation ->
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
+ Executor wise = Runnable::run;
+
+ doAnswer(invocation ->
wise).when(txManager).writeIntentSwitchExecutor();
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), any(), any());
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 49c9d151b41..e2e2cb6f619 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -159,12 +160,11 @@ public interface TxManager extends IgniteComponent {
LockManager lockManager();
/**
- * Execute write intent switch asynchronously.
+ * Executor that writes intent switch asynchronously.
*
- * @param runnable Write intent switch action.
- * @return Future that completes once the write intent switch action
finishes.
+ * @return Executor.
*/
- CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable runnable);
+ Executor writeIntentSwitchExecutor();
/**
* Finishes a one-phase committed transaction. This method doesn't contain
any distributed communication.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 573f855f6ae..0e4dcb9e88c 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -21,7 +21,6 @@ import static java.lang.Math.toIntExact;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.function.Function.identity;
@@ -1199,8 +1198,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
}
@Override
- public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable
runnable) {
- return runAsync(runnable, writeIntentSwitchPool);
+ public Executor writeIntentSwitchExecutor() {
+ return writeIntentSwitchPool;
}
void onCompleteReadOnlyTransaction(boolean commitIntent, TxIdAndTimestamp
txIdAndTimestamp) {