This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e1e208a6ab IGNITE-20953 Move cleanup out of the TX locks (#2870)
e1e208a6ab is described below
commit e1e208a6ab968b54664e975fab6837fa02a74ee6
Author: Cyrill <[email protected]>
AuthorDate: Mon Nov 27 10:59:27 2023 +0300
IGNITE-20953 Move cleanup out of the TX locks (#2870)
---
.../replicator/PartitionReplicaListener.java | 86 ++++++++++++++--------
1 file changed, 54 insertions(+), 32 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 543967ff77..ed862682fa 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1710,11 +1710,38 @@ public class PartitionReplicaListener implements
ReplicaListener {
markFinished(request.txId(), txState, request.commitTimestamp());
+ return awaitCleanupReadyFutures(request.txId(), request.commit())
+ .thenCompose(res -> {
+ if (res.hadUpdateFutures()) {
+ HybridTimestamp commandTimestamp = hybridClock.now();
+
+ return reliableCatalogVersionFor(commandTimestamp)
+ .thenCompose(catalogVersion ->
+ applyCleanupCommand(
+ request.txId(),
+ request.commit(),
+ request.commitTimestamp(),
+ request.commitTimestampLong(),
+ catalogVersion
+ ))
+ .thenApply(unused -> res);
+ } else {
+ return completedFuture(res);
+ }
+ })
+ .thenAccept(res -> {
+ if (res.hadUpdateFutures() || res.hadReadFutures()) {
+ releaseTxLocks(request.txId());
+ }
+ });
+ }
+
+ private CompletableFuture<FuturesCleanupResult>
awaitCleanupReadyFutures(UUID txId, boolean commit) {
List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
// TODO https://issues.apache.org/jira/browse/IGNITE-18617
- txCleanupReadyFutures.compute(request.txId(), (id, txOps) -> {
+ txCleanupReadyFutures.compute(txId, (id, txOps) -> {
if (txOps == null) {
return null;
}
@@ -1732,32 +1759,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
return txOps;
});
- if (txUpdateFutures.isEmpty()) {
- if (!txReadFutures.isEmpty()) {
- return allOffFuturesExceptionIgnored(txReadFutures, request)
- .thenRun(() -> releaseTxLocks(request.txId()));
- }
-
- return completedFuture(null);
- }
-
- return allOffFuturesExceptionIgnored(txUpdateFutures,
request).thenCompose(v -> {
- HybridTimestamp commandTimestamp = hybridClock.now();
-
- return reliableCatalogVersionFor(commandTimestamp)
- .thenCompose(catalogVersion -> {
- applyCleanupCommand(
- request.txId(),
- request.commit(),
- request.commitTimestamp(),
- request.commitTimestampLong(),
- catalogVersion
- );
-
- return allOffFuturesExceptionIgnored(txReadFutures,
request)
- .thenRun(() -> releaseTxLocks(request.txId()));
- });
- });
+ return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId)
+ .thenCompose(v -> allOfFuturesExceptionIgnored(txReadFutures,
commit, txId))
+ .thenApply(v -> new
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty()));
}
private CompletableFuture<Void> applyCleanupCommand(
@@ -1803,15 +1807,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
* Creates a future that waits all transaction operations are completed.
*
* @param txFutures Transaction operation futures.
- * @param request Cleanup request.
+ * @param commit If {@code true} this is a commit otherwise a rollback.
+ * @param txId Transaction id.
* @return The future completes when all futures in passed list are
completed.
*/
- private static CompletableFuture<Void>
allOffFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures,
- TxCleanupReplicaRequest request) {
+ private static CompletableFuture<Void>
allOfFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures, boolean
commit, UUID txId) {
return allOf(txFutures.toArray(new CompletableFuture<?>[0]))
.exceptionally(e -> {
- assert !request.commit() :
- "Transaction is committing, but an operation has
completed with exception [txId=" + request.txId()
+ assert !commit :
+ "Transaction is committing, but an operation has
completed with exception [txId=" + txId
+ ", err=" + e.getMessage() + ']';
return null;
@@ -3757,4 +3761,22 @@ public class PartitionReplicaListener implements
ReplicaListener {
.finish(request.finish())
.build();
}
+
+ private static class FuturesCleanupResult {
+ private final boolean hadReadFutures;
+ private final boolean hadUpdateFutures;
+
+ public FuturesCleanupResult(boolean hadReadFutures, boolean
hadUpdateFutures) {
+ this.hadReadFutures = hadReadFutures;
+ this.hadUpdateFutures = hadUpdateFutures;
+ }
+
+ public boolean hadReadFutures() {
+ return hadReadFutures;
+ }
+
+ public boolean hadUpdateFutures() {
+ return hadUpdateFutures;
+ }
+ }
}