This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a65f242670 IGNITE-27828 Fix flaky 
nodeCannotCommunicateAfterLeavingPhysicalTopology (#7582)
3a65f242670 is described below

commit 3a65f242670c71c6c40259fb527f330b593997e5
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Feb 12 15:55:26 2026 +0200

    IGNITE-27828 Fix flaky nodeCannotCommunicateAfterLeavingPhysicalTopology 
(#7582)
---
 .../scalecube/ItScaleCubeNetworkMessagingTest.java | 119 ++++++++-------------
 1 file changed, 46 insertions(+), 73 deletions(-)

diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 9446dcb5ddf..d108a7ea914 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
 import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.equalTo;
@@ -96,6 +97,7 @@ import 
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeMan
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
 import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.log4j2.LogInspector;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource;
@@ -115,7 +117,7 @@ import reactor.core.publisher.Mono;
 /**
  * Integration tests for messaging based on ScaleCube.
  */
-class ItScaleCubeNetworkMessagingTest {
+class ItScaleCubeNetworkMessagingTest extends BaseIgniteAbstractTest {
     /** Message sent to establish a connection. */
     private static final String TRAILBLAZER = "trailblazer";
 
@@ -142,7 +144,7 @@ class ItScaleCubeNetworkMessagingTest {
     }
 
     @AfterEach
-    void tearDown() throws Exception {
+    void tearDown() {
         testCluster.shutdown();
         logInspectors.forEach(LogInspector::stop);
         logInspectors.clear();
@@ -329,8 +331,7 @@ class ItScaleCubeNetworkMessagingTest {
     }
 
     /**
-     * Tests that if the network component is stopped while making an "invoke" 
call, the corresponding future completes
-     * exceptionally.
+     * Tests that if the network component is stopped while making an "invoke" 
call, the corresponding future completes exceptionally.
      */
     @Test
     public void testInvokeDuringStop() throws InterruptedException {
@@ -524,18 +525,17 @@ class ItScaleCubeNetworkMessagingTest {
      * @throws Exception in case of errors.
      */
     @SuppressWarnings("ConstantConditions")
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void nodeCannotReuseOldId(boolean keepPreExistingConnections) 
throws Exception {
+    @Test
+    public void nodeCannotReuseOldId() throws Exception {
         testCluster = new Cluster(3, testInfo);
 
         testCluster.startAwait();
 
-        String outcastName = 
testCluster.members.get(testCluster.members.size() - 1).nodeName();
+        ClusterService outcast = 
testCluster.members.get(testCluster.members.size() - 1);
 
-        knockOutNode(outcastName, !keepPreExistingConnections);
+        knockOutNode(outcast);
 
-        IgniteBiTuple<CountDownLatch, AtomicBoolean> pair = 
reanimateNode(outcastName);
+        IgniteBiTuple<CountDownLatch, AtomicBoolean> pair = 
reanimateNode(outcast);
         CountDownLatch ready = pair.get1();
         AtomicBoolean reappeared = pair.get2();
 
@@ -544,9 +544,8 @@ class ItScaleCubeNetworkMessagingTest {
         assertThat(reappeared.get(), is(false));
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void nodeCannotCommunicateAfterLeavingPhysicalTopology(boolean 
keepPreExistingConnections) throws Exception {
+    @Test
+    public void nodeCannotCommunicateAfterLeavingPhysicalTopology() throws 
Exception {
         testCluster = new Cluster(3, testInfo);
 
         testCluster.startAwait();
@@ -559,14 +558,12 @@ class ItScaleCubeNetworkMessagingTest {
         assertNotNull(outcastNode);
         assertNotNull(notOutcastNode);
 
-        if (keepPreExistingConnections) {
-            assertThat(notOutcast.messagingService().send(outcastNode, 
messageFactory.testMessage().build()), willCompleteSuccessfully());
-            assertThat(outcast.messagingService().send(notOutcastNode, 
messageFactory.testMessage().build()), willCompleteSuccessfully());
-        }
+        assertThat(notOutcast.messagingService().send(outcastNode, 
messageFactory.testMessage().build()), willCompleteSuccessfully());
+        assertThat(outcast.messagingService().send(notOutcastNode, 
messageFactory.testMessage().build()), willCompleteSuccessfully());
 
-        knockOutNode(outcast.nodeName(), !keepPreExistingConnections);
+        knockOutNode(outcast);
 
-        stopDroppingMessagesTo(outcast.nodeName());
+        stopDroppingMessagesTo(outcast);
 
         CompletableFuture<Void> sendFromOutcast = 
outcast.messagingService().send(notOutcastNode, 
messageFactory.testMessage().build());
         assertThat(sendFromOutcast, 
either(willThrow(HandshakeException.class)).or(willThrow(RecipientLeftException.class)));
@@ -600,7 +597,7 @@ class ItScaleCubeNetworkMessagingTest {
 
         establishConnection(sender, receiver);
 
-        closeAllChannels(sender.messagingService());
+        closeAllChannels(sender);
 
         receiver.messagingService().addMessageHandler(
                 TestMessageTypes.class,
@@ -691,7 +688,7 @@ class ItScaleCubeNetworkMessagingTest {
         return nettySenderWarp[0];
     }
 
-    private void collectReceivedPayloads(ClusterService sender, ClusterService 
receiver, List<String> receivedPayloads) {
+    private static void collectReceivedPayloads(ClusterService sender, 
ClusterService receiver, List<String> receivedPayloads) {
         receiver.messagingService().addMessageHandler(
                 TestMessageTypes.class,
                 (message, senderParam, correlationId) -> {
@@ -952,7 +949,7 @@ class ItScaleCubeNetworkMessagingTest {
 
         assertTrue(blockingStarted.await(10, SECONDS));
 
-        knockOutNode(sender.nodeName(), false);
+        knockOutNode(sender);
 
         canProceed.countDown();
 
@@ -964,10 +961,8 @@ class ItScaleCubeNetworkMessagingTest {
         assertThat(messagesDelivered.get(), is(1));
     }
 
-    private static void closeAllChannels(MessagingService messagingService) {
-        ConnectionManager connectionManager = ((DefaultMessagingService) 
messagingService).connectionManager();
-
-        for (NettySender sender : connectionManager.channels().values()) {
+    private static void closeAllChannels(ClusterService clusterService) {
+        for (NettySender sender : 
connectionManager(clusterService).channels().values()) {
             assertThat(sender.closeAsync(), willCompleteSuccessfully());
         }
     }
@@ -1031,11 +1026,7 @@ class ItScaleCubeNetworkMessagingTest {
     }
 
     private static CompletableFuture<Void> ackFuture(ClusterService 
clusterService, NetworkMessage msg) {
-        DefaultMessagingService messagingService = (DefaultMessagingService) 
clusterService.messagingService();
-
-        ConnectionManager connectionManager = 
messagingService.connectionManager();
-
-        for (NettySender sender : connectionManager.channels().values()) {
+        for (NettySender sender : 
connectionManager(clusterService).channels().values()) {
             for (OutNetworkObject outMsgWrapper : 
sender.recoveryDescriptor().unacknowledgedMessages()) {
                 if (outMsgWrapper.networkMessage() == msg) {
                     return outMsgWrapper.acknowledgedFuture();
@@ -1054,9 +1045,8 @@ class ItScaleCubeNetworkMessagingTest {
 
     private static OutgoingAcknowledgementSilencer 
dropAcksWhenDefaultChannelOpens(ClusterService clusterService)
             throws InterruptedException {
-        DefaultMessagingService messagingService = (DefaultMessagingService) 
clusterService.messagingService();
+        ConnectionManager connectionManager = 
connectionManager(clusterService);
 
-        ConnectionManager connectionManager = 
messagingService.connectionManager();
         assertTrue(
                 waitForCondition(
                         () -> 
connectionManager.channels().keySet().stream().anyMatch(key -> key.type() == 
ChannelType.DEFAULT),
@@ -1099,7 +1089,7 @@ class ItScaleCubeNetworkMessagingTest {
 
         CompletableFuture<Void> ackSendFuture = ackFuture(notOutcast, msg);
 
-        knockOutNode(outcast.nodeName(), false);
+        knockOutNode(outcast);
 
         assertThat(ackSendFuture, willThrow(RecipientLeftException.class));
     }
@@ -1209,53 +1199,36 @@ class ItScaleCubeNetworkMessagingTest {
         );
     }
 
-    private void knockOutNode(String outcastName, boolean 
closeConnectionsForcibly) throws InterruptedException {
-        CountDownLatch disappeared = new 
CountDownLatch(testCluster.members.size() - 1);
-
-        TopologyEventHandler disappearListener = new TopologyEventHandler() {
-            @Override
-            public void onDisappeared(InternalClusterNode member) {
-                if (Objects.equals(member.name(), outcastName)) {
-                    disappeared.countDown();
-                }
-            }
-        };
-
+    private void knockOutNode(ClusterService outcast) {
         List<ClusterService> notOutcasts = testCluster.members.stream()
-                .filter(service -> !outcastName.equals(service.nodeName()))
+                .filter(service -> outcast != service)
                 .collect(toList());
 
-        notOutcasts.forEach(clusterService -> {
-            
clusterService.topologyService().addEventHandler(disappearListener);
-        });
-
         notOutcasts.forEach(service -> {
             DefaultMessagingService messagingService = 
(DefaultMessagingService) service.messagingService();
-            messagingService.dropMessages((recipientConsistentId, message) -> 
outcastName.equals(recipientConsistentId));
+            messagingService.dropMessages((recipientConsistentId, message) -> 
outcast.nodeName().equals(recipientConsistentId));
         });
 
-        // Wait until all nodes see disappearance of the outcast.
-        assertTrue(disappeared.await(10, SECONDS), "Node did not disappear in 
time");
+        UUID outcastId = outcast.topologyService().localMember().id();
 
-        if (closeConnectionsForcibly) {
-            MessagingService messagingService = testCluster.members.stream()
-                    .filter(service -> outcastName.equals(service.nodeName()))
-                    .findFirst().orElseThrow()
-                    .messagingService();
-
-            // Forcefully close channels, so that nodes will create new 
channels on reanimation of the outcast.
-            closeAllChannels(messagingService);
-        }
+        // Wait for the remote nodes to close their channels automatically.
+        await().until(() ->
+                testCluster.members.stream()
+                        .flatMap(clusterService -> 
connectionManager(clusterService).channels().values().stream())
+                        .filter(sender -> sender.launchId().equals(outcastId))
+                        .findAny()
+                        .isEmpty()
+        );
     }
 
-    private IgniteBiTuple<CountDownLatch, AtomicBoolean> reanimateNode(String 
outcastName) {
+    private IgniteBiTuple<CountDownLatch, AtomicBoolean> 
reanimateNode(ClusterService outcast) {
         CountDownLatch ready = new CountDownLatch(1);
         AtomicBoolean reappeared = new AtomicBoolean(false);
 
         testCluster.members.get(0).topologyService().addEventHandler(new 
TopologyEventHandler() {
             @Override
             public void onAppeared(InternalClusterNode member) {
-                if (Objects.equals(member.name(), outcastName)) {
+                if (Objects.equals(member.name(), outcast.nodeName())) {
                     reappeared.compareAndSet(false, true);
 
                     ready.countDown();
@@ -1277,14 +1250,14 @@ class ItScaleCubeNetworkMessagingTest {
 
         logInspectors.forEach(LogInspector::start);
 
-        stopDroppingMessagesTo(outcastName);
+        stopDroppingMessagesTo(outcast);
 
         return new IgniteBiTuple<>(ready, reappeared);
     }
 
-    private void stopDroppingMessagesTo(String outcastName) {
+    private void stopDroppingMessagesTo(ClusterService outcast) {
         testCluster.members.stream()
-                .filter(service -> !outcastName.equals(service.nodeName()))
+                .filter(service -> outcast != service)
                 .forEach(service -> {
                     DefaultMessagingService messagingService = 
(DefaultMessagingService) service.messagingService();
                     messagingService.stopDroppingMessages();
@@ -1372,7 +1345,7 @@ class ItScaleCubeNetworkMessagingTest {
          * Creates a test cluster with the given amount of members.
          *
          * @param numOfNodes Amount of cluster members.
-         * @param testInfo   Test info.
+         * @param testInfo Test info.
          */
         Cluster(int numOfNodes, TestInfo testInfo) {
             this(numOfNodes, testInfo, normalClusterIdSupplierFactory);
@@ -1382,7 +1355,7 @@ class ItScaleCubeNetworkMessagingTest {
          * Creates a test cluster with the given amount of members.
          *
          * @param numOfNodes Amount of cluster members.
-         * @param testInfo   Test info.
+         * @param testInfo Test info.
          * @param clusterIdSupplierFactory Allows to obtain a Supplier for 
cluster ID by node address.
          */
         Cluster(int numOfNodes, TestInfo testInfo, Function<NetworkAddress, 
ClusterIdSupplier> clusterIdSupplierFactory) {
@@ -1393,7 +1366,7 @@ class ItScaleCubeNetworkMessagingTest {
          * Creates a test cluster with the given amount of members.
          *
          * @param numOfNodes Amount of cluster members.
-         * @param testInfo   Test info.
+         * @param testInfo Test info.
          * @param clusterIdSupplierFactory Allows to obtain a Supplier for 
cluster ID by node address.
          */
         Cluster(
@@ -1445,7 +1418,7 @@ class ItScaleCubeNetworkMessagingTest {
          * Starts and waits for the cluster to come up.
          *
          * @throws InterruptedException If failed.
-         * @throws AssertionError       If the cluster was unable to start in 
3 seconds.
+         * @throws AssertionError If the cluster was unable to start in 3 
seconds.
          */
         void startAwait() throws InterruptedException {
             assertThat(startAsync(new ComponentContext(), members), 
willCompleteSuccessfully());
@@ -1475,7 +1448,7 @@ class ItScaleCubeNetworkMessagingTest {
         }
     }
 
-    private enum SendOperation {
+    enum SendOperation {
         SEND {
             @Override
             CompletableFuture<Void> send(MessagingService messagingService, 
NetworkMessage message, InternalClusterNode recipient) {

Reply via email to