This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c25f9fdac1 IGNITE-22245 Force switch write intents (#3765)
c25f9fdac1 is described below

commit c25f9fdac1668c46e27a6ff2190f58948d0f75ef
Author: Cyrill <[email protected]>
AuthorDate: Thu May 16 10:50:27 2024 +0300

    IGNITE-22245 Force switch write intents (#3765)
---
 .../ignite/raft/jraft/core/FSMCallerImpl.java      |  2 +-
 .../internal/table/ItTxResourcesVacuumTest.java    | 61 ++++++++++------------
 .../internal/table/distributed/TableManager.java   |  9 +---
 .../replicator/PartitionReplicaListener.java       | 23 ++++++--
 .../internal/tx/impl/TxCleanupRequestSender.java   |  4 +-
 5 files changed, 52 insertions(+), 47 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index c8846ca134..a157eee51c 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -188,7 +188,7 @@ public class FSMCallerImpl implements FSMCaller {
         }
         this.error = new RaftException(ErrorType.ERROR_TYPE_NONE);
         this.msgFactory = opts.getRaftMessagesFactory();
-        LOG.info("Starts FSMCaller successfully.");
+        LOG.info("Starts FSMCaller successfully [nodeId={}].", nodeId);
         return true;
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index 2b320a5740..cd40ce6168 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -218,7 +218,6 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         view.upsert(parallelTx2, tupleForParallelTx);
 
         CompletableFuture<Void> finishStartedFuture = new 
CompletableFuture<>();
-        CompletableFuture<Void> finishAllowedFuture = new 
CompletableFuture<>();
 
         node.dropMessages((n, msg) -> {
             if (msg instanceof TxFinishReplicaRequest) {
@@ -227,7 +226,9 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
                 if (finishRequest.txId().equals(txId)) {
                     finishStartedFuture.complete(null);
 
-                    joinWithTimeout(finishAllowedFuture);
+                    log.info("Test: dropping finish on [node= {}].", n);
+
+                    return true;
                 }
             }
 
@@ -248,7 +249,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         // Check that the volatile state of the transaction is preserved.
         assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
 
-        assertTrue(finishAllowedFuture.complete(null));
+        node.stopDroppingMessages();
 
         assertThat(commitFut, willCompleteSuccessfully());
 
@@ -395,16 +396,17 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         view.upsert(tx, tuple1);
 
         CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
-        CompletableFuture<Void> cleanupAllowed = new CompletableFuture<>();
 
         commitPartitionLeaseholder.dropMessages((n, msg) -> {
             if (msg instanceof TxCleanupMessage) {
-                cleanupStarted.complete(null);
-
-                log.info("Test: cleanup started.");
+                log.info("Test: cleanup started on [node= {}].", n);
 
                 if (commitPartNodes.contains(n)) {
-                    joinWithTimeout(cleanupAllowed);
+                    cleanupStarted.complete(null);
+
+                    log.info("Test: dropping cleanup on [node= {}].", n);
+
+                    return true;
                 }
             }
 
@@ -424,7 +426,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         assertTxStateVacuumized(Set.of(leaseholderForAnotherTuple.name()), 
txId, commitPartId, false);
 
         // Unblocking cleanup.
-        assertTrue(cleanupAllowed.complete(null));
+        commitPartitionLeaseholder.stopDroppingMessages();
 
         assertThat(commitFut, willCompleteSuccessfully());
 
@@ -490,14 +492,19 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         view.upsert(tx, tuple);
 
         CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
-        CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
         boolean[] cleanupAllowed = new boolean[1];
 
         commitPartitionLeaseholder.dropMessages((n, msg) -> {
-            if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
+            if (msg instanceof TxCleanupMessage) {
+                log.info("Test: perform cleanup on [node= {}, msg={}].", n, 
msg);
+
                 cleanupStarted.complete(null);
 
-                joinWithTimeout(cleanupAllowedFut);
+                if (!cleanupAllowed[0]) {
+                    log.info("Test: dropping cleanup on [node= {}].", n);
+
+                    return true;
+                }
             }
 
             return false;
@@ -511,8 +518,6 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         transferPrimary(cluster.runningNodes().collect(toSet()), 
commitPartGrpId, commitPartNodes::contains);
 
-        assertTrue(cleanupAllowedFut.complete(null));
-
         cleanupAllowed[0] = true;
 
         assertThat(commitFut, willCompleteSuccessfully());
@@ -521,10 +526,10 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         waitForTxStateVacuum(txId, commitPartId, true, 10_000);
 
-        Transaction roTxAfter = beginReadOnlyTx(anyNode());
-
         log.info("Test: checking values.");
 
+        Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
         // Trying to read the value.
         Tuple key = Tuple.create().set("key", tuple.longValue("key"));
         checkValueReadOnly(view, roTxBefore, key, null);
@@ -575,19 +580,20 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         view.upsert(tx, tuple);
 
         CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
-        CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
         boolean[] cleanupAllowed = new boolean[1];
 
         // Cleanup may be triggered by the primary replica reelection as well.
         runningNodes().filter(n -> 
commitPartNodes.contains(n.name())).forEach(nd -> nd.dropMessages((n, msg) -> {
-            if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
-                cleanupStarted.complete(null);
+            if (msg instanceof TxCleanupMessage) {
+                log.info("Test: perform cleanup on [node= {}, msg={}].", n, 
msg);
 
-                log.warn("Test: cleanup started.");
+                cleanupStarted.complete(null);
 
-                joinWithTimeout(cleanupAllowedFut);
+                if (!cleanupAllowed[0]) {
+                    log.info("Test: dropping cleanup on [node= {}].", n);
 
-                log.info("Test: cleanup resumed.");
+                    return true;
+                }
             }
 
             return false;
@@ -609,8 +615,6 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         log.info("Test: volatile state vacuumized");
 
-        assertTrue(cleanupAllowedFut.complete(null));
-
         cleanupAllowed[0] = true;
 
         assertThat(commitFut, willCompleteSuccessfully());
@@ -1066,13 +1070,4 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
                 .get();
     }
 
-    private void joinWithTimeout(CompletableFuture<?> future) {
-        future.orTimeout(60, TimeUnit.SECONDS)
-                .exceptionally(e -> {
-                    log.error("Could not wait for the future.", e);
-
-                    return null;
-                })
-                .join();
-    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 5dfb1ad1e4..dec713da93 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1610,14 +1610,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @return Future.
      */
     public CompletableFuture<TableViewInternal> tableAsync(long 
causalityToken, int id) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
-        }
-        try {
-            return tablesById(causalityToken).thenApply(tablesById -> 
tablesById.get(id));
-        } finally {
-            busyLock.leaveBusy();
-        }
+        return inBusyLockAsync(busyLock, () -> 
tablesById(causalityToken).thenApply(tablesById -> tablesById.get(id)));
     }
 
     @Override
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 00ca2e230a..7113ee5b68 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1802,7 +1802,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         return awaitCleanupReadyFutures(request.txId(), request.commit())
                 .thenCompose(res -> {
-                    if (res.hadUpdateFutures()) {
+                    if (res.hadUpdateFutures() || res.forceCleanup()) {
                         HybridTimestamp commandTimestamp = clockService.now();
 
                         return reliableCatalogVersionFor(commandTimestamp)
@@ -1830,11 +1830,22 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
         List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
 
+        AtomicBoolean forceCleanup = new AtomicBoolean(true);
+
         txCleanupReadyFutures.compute(txId, (id, txOps) -> {
             if (txOps == null) {
                 return null;
             }
 
+            // Cleanup futures (both read and update) are empty in two cases:
+            // - there were no actions in the transaction
+            // - write intent switch is being executed on the new primary (the 
primary has changed after write intent appeared)
+            // Both cases are expected to happen extremely rarely so we are 
fine to force the write intent switch.
+
+            // The reason for the forced switch is that otherwise write 
intents would not be switched (if there is no volatile state and
+            // FuturesCleanupResult.hadUpdateFutures() returns false).
+            forceCleanup.set(txOps.futures.isEmpty());
+
             txOps.futures.forEach((opType, futures) -> {
                 if (opType.isRwRead()) {
                     txReadFutures.addAll(futures.values());
@@ -1850,7 +1861,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId)
                 .thenCompose(v -> allOfFuturesExceptionIgnored(txReadFutures, 
commit, txId))
-                .thenApply(v -> new 
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty()));
+                .thenApply(v -> new 
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty(), 
forceCleanup.get()));
     }
 
     private CompletableFuture<WriteIntentSwitchReplicatedInfo> 
applyWriteIntentSwitchCommand(
@@ -3951,10 +3962,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private static class FuturesCleanupResult {
         private final boolean hadReadFutures;
         private final boolean hadUpdateFutures;
+        private final boolean forceCleanup;
 
-        public FuturesCleanupResult(boolean hadReadFutures, boolean 
hadUpdateFutures) {
+        public FuturesCleanupResult(boolean hadReadFutures, boolean 
hadUpdateFutures, boolean forceCleanup) {
             this.hadReadFutures = hadReadFutures;
             this.hadUpdateFutures = hadUpdateFutures;
+            this.forceCleanup = forceCleanup;
         }
 
         public boolean hadReadFutures() {
@@ -3964,6 +3977,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         public boolean hadUpdateFutures() {
             return hadUpdateFutures;
         }
+
+        public boolean forceCleanup() {
+            return forceCleanup;
+        }
     }
 
     private CompletableFuture<?> processOperationRequestWithTxRwCounter(
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index dcf97ff5da..82cfee4fab 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -51,7 +51,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class TxCleanupRequestSender {
     /** Logger. */
-    private final IgniteLogger log = 
Loggers.forClass(TxCleanupRequestSender.class);
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxCleanupRequestSender.class);
 
     /** Placement driver helper. */
     private final PlacementDriverHelper placementDriverHelper;
@@ -96,7 +96,7 @@ public class TxCleanupRequestSender {
                 if (msg instanceof TxCleanupMessageErrorResponse) {
                     TxCleanupMessageErrorResponse response = 
(TxCleanupMessageErrorResponse) msg;
 
-                    log.warn("Exception happened during transaction cleanup 
[txId={}].", response.throwable(), response.txId());
+                    LOG.warn("Exception happened during transaction cleanup 
[txId={}].", response.throwable(), response.txId());
                 }
             }
         });

Reply via email to