This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 87a7e19de1 IGNITE-20998 Fix timeouts in ItDurableFinishTest messaging
(#2909)
87a7e19de1 is described below
commit 87a7e19de14c6449b4ca0b086f55911d210664b0
Author: Cyrill <[email protected]>
AuthorDate: Mon Dec 4 14:34:41 2023 +0300
IGNITE-20998 Fix timeouts in ItDurableFinishTest messaging (#2909)
---
.../ignite/internal/table/ItDurableFinishTest.java | 46 +++++-----------------
1 file changed, 10 insertions(+), 36 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 606165ad51..c55f46f26e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -30,7 +30,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
@@ -138,23 +137,15 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
) {
DefaultMessagingService coordinatorMessaging =
messaging(coordinatorNode);
- CountDownLatch msg = new CountDownLatch(1);
- CountDownLatch transfer = new CountDownLatch(1);
+ AtomicBoolean dropMessage = new AtomicBoolean(true);
// Make sure the finish message is prepared, i.e. the outcome, commit
timestamp, primary node, etc. have been set,
// and then temporarily block the messaging to simulate network issues.
coordinatorMessaging.dropMessages((s, networkMessage) -> {
- if (networkMessage instanceof TxFinishReplicaRequest) {
- try {
- logger().info("Pausing message handling: {}.",
networkMessage);
+ if (networkMessage instanceof TxFinishReplicaRequest &&
dropMessage.get()) {
+ logger().info("Dropping: {}.", networkMessage);
- transfer.countDown();
- msg.await();
-
- logger().info("Continue message handling: {}.",
networkMessage);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ return true;
}
return false;
@@ -165,8 +156,6 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
// will run in the current thread.
CompletableFuture.runAsync(() -> {
try {
- transfer.await();
-
logger().info("Start transferring primary.");
NodeUtils.transferPrimary(tbl, null, this::node);
@@ -175,7 +164,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
} finally {
logger().info("Finished transferring primary.");
- msg.countDown();
+ dropMessage.set(false);
}
});
}
@@ -235,28 +224,15 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
) {
DefaultMessagingService primaryMessaging = messaging(primaryNode);
- CountDownLatch msg = new CountDownLatch(1);
- CountDownLatch transfer = new CountDownLatch(1);
- AtomicBoolean messageHandled = new AtomicBoolean();
+ AtomicBoolean dropMessage = new AtomicBoolean(true);
// Make sure the finish message is prepared, i.e. the outcome, commit
timestamp, primary node, etc. have been set,
// and then temporarily block the messaging to simulate network issues.
primaryMessaging.dropMessages((s, networkMessage) -> {
- if (networkMessage instanceof TxCleanupReplicaRequest &&
!messageHandled.get()) {
- messageHandled.set(true);
+ if (networkMessage instanceof TxCleanupReplicaRequest &&
dropMessage.get()) {
+ logger().info("Dropping message: {}.", networkMessage);
- try {
- logger().info("Pausing message handling: {}.",
networkMessage);
-
- transfer.countDown();
- msg.await();
-
- logger().info("Continue message handling: {}.",
networkMessage);
-
- return true;
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ return true;
}
return false;
@@ -267,8 +243,6 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
// will run in the current thread.
CompletableFuture.runAsync(() -> {
try {
- transfer.await();
-
logger().info("Start transferring primary.");
NodeUtils.transferPrimary(tbl, null, this::node);
@@ -277,7 +251,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
} finally {
logger().info("Finished transferring primary.");
- msg.countDown();
+ dropMessage.set(false);
}
});
}