This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c02b3ea7eb IGNITE-18617 Clear rw tx context and cleanup ready futures
on tx finish (#3436)
c02b3ea7eb is described below
commit c02b3ea7eb15fd62bb48cac527fa49b700accf57
Author: Cyrill <[email protected]>
AuthorDate: Thu Mar 21 17:45:09 2024 +0300
IGNITE-18617 Clear rw tx context and cleanup ready futures on tx finish
(#3436)
---
.../internal/testframework/IgniteTestUtils.java | 11 ++++++++++-
.../replicator/PartitionReplicaListener.java | 20 ++++++++++++--------
.../internal/tx/impl/TransactionInflights.java | 13 +++++++++----
.../ignite/internal/tx/impl/TxManagerImpl.java | 2 +-
4 files changed, 32 insertions(+), 14 deletions(-)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index f6aa4ce8c8..1b76ccef0f 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -289,7 +289,16 @@ public final class IgniteTestUtils {
try {
run.execute();
} catch (Throwable throwable) {
- assertInstanceOf(expectedClass, throwable);
+ try {
+ assertInstanceOf(expectedClass, throwable);
+ } catch (AssertionError err) {
+ // An AssertionError from assertInstanceOf has nothing but a
class name of the original exception.
+ AssertionError assertionError = new AssertionError(err);
+
+ assertionError.addSuppressed(throwable);
+
+ throw assertionError;
+ }
IgniteException igniteException = (IgniteException) throwable;
assertEquals(expectedErrorCode, igniteException.code());
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b16202cabf..755992f497 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1730,7 +1730,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
- // TODO https://issues.apache.org/jira/browse/IGNITE-18617
txCleanupReadyFutures.compute(txId, (id, txOps) -> {
if (txOps == null) {
return null;
@@ -1746,7 +1745,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
txOps.futures.clear();
- return txOps;
+ return null;
});
return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId)
@@ -1894,18 +1893,23 @@ public class PartitionReplicaListener implements
ReplicaListener {
var cleanupReadyFut = new CompletableFuture<Void>();
txCleanupReadyFutures.compute(txId, (id, txOps) -> {
- if (txOps == null) {
- txOps = new TxCleanupReadyFutureList();
- }
-
+ // First check whether the transaction has already been finished.
+ // And complete cleanupReadyFut with exception if it is the case.
TxStateMeta txStateMeta = txManager.stateMeta(txId);
if (txStateMeta == null || isFinalState(txStateMeta.txState())) {
cleanupReadyFut.completeExceptionally(new Exception());
- } else {
- txOps.futures.computeIfAbsent(cmdType, type -> new
ArrayList<>()).add(cleanupReadyFut);
+
+ return txOps;
+ }
+
+ // Otherwise collect cleanupReadyFut in the transaction's futures.
+ if (txOps == null) {
+ txOps = new TxCleanupReadyFutureList();
}
+ txOps.futures.computeIfAbsent(cmdType, type -> new
ArrayList<>()).add(cleanupReadyFut);
+
return txOps;
});
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index b303532fe9..bdbe1d0ab2 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -87,16 +87,17 @@ public class TransactionInflights {
* @param txId The transaction id.
*/
public void removeInflight(UUID txId) {
- TxContext tuple = txCtxMap.compute(txId, (uuid, ctx) -> {
- assert ctx != null : format("No tx context found on removing
inflight [txId={}]", txId);
-
+ // Can be null if tx was aborted and inflights were removed from the
collection.
+ TxContext tuple = txCtxMap.computeIfPresent(txId, (uuid, ctx) -> {
ctx.removeInflight(txId);
return ctx;
});
// Avoid completion under lock.
- tuple.onInflightsRemoved();
+ if (tuple != null) {
+ tuple.onInflightsRemoved();
+ }
}
Collection<UUID> finishedReadOnlyTransactions() {
@@ -106,6 +107,10 @@ public class TransactionInflights {
.collect(toSet());
}
+ void removeTxContext(UUID txId) {
+ txCtxMap.remove(txId);
+ }
+
void removeTxContexts(Collection<UUID> txIds) {
txCtxMap.keySet().removeAll(txIds);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 2f0c40e4de..156d3bf50a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -517,7 +517,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) {
decrementRwTxCount(txId);
}
- });
+ }).whenComplete((unused, throwable) ->
transactionInflights.removeTxContext(txId));
}
private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID
txId, TransactionMeta stateMeta) {