This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 87a7e19de1 IGNITE-20998 Fix timeouts in ItDurableFinishTest messaging 
(#2909)
87a7e19de1 is described below

commit 87a7e19de14c6449b4ca0b086f55911d210664b0
Author: Cyrill <[email protected]>
AuthorDate: Mon Dec 4 14:34:41 2023 +0300

    IGNITE-20998 Fix timeouts in ItDurableFinishTest messaging (#2909)
---
 .../ignite/internal/table/ItDurableFinishTest.java | 46 +++++-----------------
 1 file changed, 10 insertions(+), 36 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 606165ad51..c55f46f26e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -30,7 +30,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
@@ -138,23 +137,15 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
     ) {
         DefaultMessagingService coordinatorMessaging = 
messaging(coordinatorNode);
 
-        CountDownLatch msg = new CountDownLatch(1);
-        CountDownLatch transfer = new CountDownLatch(1);
+        AtomicBoolean dropMessage = new AtomicBoolean(true);
 
         // Make sure the finish message is prepared, i.e. the outcome, commit 
timestamp, primary node, etc. have been set,
         // and then temporarily block the messaging to simulate network issues.
         coordinatorMessaging.dropMessages((s, networkMessage) -> {
-            if (networkMessage instanceof TxFinishReplicaRequest) {
-                try {
-                    logger().info("Pausing message handling: {}.", 
networkMessage);
+            if (networkMessage instanceof TxFinishReplicaRequest && 
dropMessage.get()) {
+                logger().info("Dropping: {}.", networkMessage);
 
-                    transfer.countDown();
-                    msg.await();
-
-                    logger().info("Continue message handling: {}.", 
networkMessage);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                return true;
             }
 
             return false;
@@ -165,8 +156,6 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         // will run in the current thread.
         CompletableFuture.runAsync(() -> {
             try {
-                transfer.await();
-
                 logger().info("Start transferring primary.");
 
                 NodeUtils.transferPrimary(tbl, null, this::node);
@@ -175,7 +164,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
             } finally {
                 logger().info("Finished transferring primary.");
 
-                msg.countDown();
+                dropMessage.set(false);
             }
         });
     }
@@ -235,28 +224,15 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
     ) {
         DefaultMessagingService primaryMessaging = messaging(primaryNode);
 
-        CountDownLatch msg = new CountDownLatch(1);
-        CountDownLatch transfer = new CountDownLatch(1);
-        AtomicBoolean messageHandled = new AtomicBoolean();
+        AtomicBoolean dropMessage = new AtomicBoolean(true);
 
         // Make sure the finish message is prepared, i.e. the outcome, commit 
timestamp, primary node, etc. have been set,
         // and then temporarily block the messaging to simulate network issues.
         primaryMessaging.dropMessages((s, networkMessage) -> {
-            if (networkMessage instanceof TxCleanupReplicaRequest && 
!messageHandled.get()) {
-                messageHandled.set(true);
+            if (networkMessage instanceof TxCleanupReplicaRequest && 
dropMessage.get()) {
+                logger().info("Dropping message: {}.", networkMessage);
 
-                try {
-                    logger().info("Pausing message handling: {}.", 
networkMessage);
-
-                    transfer.countDown();
-                    msg.await();
-
-                    logger().info("Continue message handling: {}.", 
networkMessage);
-
-                    return true;
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                return true;
             }
 
             return false;
@@ -267,8 +243,6 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         // will run in the current thread.
         CompletableFuture.runAsync(() -> {
             try {
-                transfer.await();
-
                 logger().info("Start transferring primary.");
 
                 NodeUtils.transferPrimary(tbl, null, this::node);
@@ -277,7 +251,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
             } finally {
                 logger().info("Finished transferring primary.");
 
-                msg.countDown();
+                dropMessage.set(false);
             }
         });
     }

Reply via email to