This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d463d94f4c8 IGNITE-26877 Fixed flaky 
ServiceAwarenessTest.testNodesLeaveMultiThreaded (#12488)
d463d94f4c8 is described below

commit d463d94f4c820306d427164a18a26a7b8e7bab7e
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sun Nov 2 19:05:19 2025 +0300

    IGNITE-26877 Fixed flaky ServiceAwarenessTest.testNodesLeaveMultiThreaded 
(#12488)
---
 .../internal/client/thin/ServiceAwarenessTest.java | 197 +++++++++++----------
 1 file changed, 99 insertions(+), 98 deletions(-)

diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
index 53d36b30752..ebfdb0e6f24 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
@@ -24,9 +24,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -75,11 +77,17 @@ import static 
org.apache.ignite.testframework.GridTestUtils.waitForCondition;
  * Checks the service awareness feature of the thin client.
  */
 public class ServiceAwarenessTest extends AbstractThinClientTest {
+    /** */
+    private static final String ATTR_NODE_IDX = "test.node.idx";
+
     /** Node-filter service name. */
     private static final String SRV_NAME = "node_filtered_svc";
 
     /** Number of grids at the test start. */
-    private static final int GRIDS = 4;
+    private static final int BASE_NODES_CNT = 4;
+
+    /** */
+    private static final int TOP_UPD_NODES_CNT = 3;
 
     /** Number of node instances with the initial service deployment. */
     private static final int INIT_SRVC_NODES_CNT = 2;
@@ -95,6 +103,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         cfg.setDiscoverySpi(new TestBlockingDiscoverySpi());
+        cfg.setUserAttributes(Collections.singletonMap(ATTR_NODE_IDX, 
getTestIgniteInstanceIndex(igniteInstanceName)));
 
         return cfg;
     }
@@ -147,7 +156,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        startGrids(GRIDS);
+        startGrids(BASE_NODES_CNT);
 
         grid(1).services().deploy(serviceCfg());
     }
@@ -160,7 +169,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
         // Service topology on the client.
         Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
 
-        addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+        registerServiceTopologyUpdateListener(srvcTopOnClient::addAll);
 
         AtomicBoolean svcRunFlag = new AtomicBoolean(true);
 
@@ -180,13 +189,13 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
             // Delays service redeployment and the service topology update on 
the server side.
             testDisco.toBlock.add(ServiceClusterDeploymentResultBatch.class);
 
-            startGrid(GRIDS);
+            startGrid(BASE_NODES_CNT);
 
             waitForCondition(() -> testDisco.blocked.size() == 1, 
getTestTimeout());
 
             // Ensure all the nodes have started but the service topology 
hasn't updated yet.
             for (Ignite ig : G.allGrids()) {
-                assertEquals(ig.cluster().nodes().size(), GRIDS + 1);
+                assertEquals(ig.cluster().nodes().size(), BASE_NODES_CNT + 1);
 
                 // Ensure there are still SRVC_FILTERED_NOIDES_CNT nodes with 
the service instance.
                 
assertEquals(((IgniteEx)ig).context().service().serviceTopology(SRV_NAME, 
0).size(),
@@ -195,7 +204,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
 
             // Ensure the client's topology is not updated.
             assertTrue(srvcTopOnClient.size() == INIT_SRVC_NODES_CNT
-                && !srvcTopOnClient.contains(grid(GRIDS).localNode().id()));
+                && 
!srvcTopOnClient.contains(grid(BASE_NODES_CNT).localNode().id()));
 
             testDisco.release();
 
@@ -216,7 +225,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
 
             waitForCondition(() -> srvcTopOnClient.size() == 3 && 
srvcTopOnClient.contains(grid(1).localNode().id())
                 && srvcTopOnClient.contains(grid(2).localNode().id())
-                && srvcTopOnClient.contains(grid(GRIDS).localNode().id()), 
getTestTimeout());
+                && 
srvcTopOnClient.contains(grid(BASE_NODES_CNT).localNode().id()), 
getTestTimeout());
         }
         finally {
             svcRunFlag.set(false);
@@ -228,7 +237,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
      */
     @Test
     public void testNodesJoinSingleThreaded() throws Exception {
-        doTestClusterTopChangesWhileServiceCalling(3, true, false);
+        doTestClusterTopChangesWhileServiceCalling(false, 1);
     }
 
     /**
@@ -236,7 +245,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
      */
     @Test
     public void testNodesJoinMultiThreaded() throws Exception {
-        doTestClusterTopChangesWhileServiceCalling(3, true, true);
+        doTestClusterTopChangesWhileServiceCalling(false, 4);
     }
 
     /**
@@ -244,7 +253,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
      */
     @Test
     public void testNodesLeaveSingleThreaded() throws Exception {
-        doTestClusterTopChangesWhileServiceCalling(3, false, false);
+        doTestClusterTopChangesWhileServiceCalling(true, 1);
     }
 
     /**
@@ -252,7 +261,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
      */
     @Test
     public void testNodesLeaveMultiThreaded() throws Exception {
-        doTestClusterTopChangesWhileServiceCalling(3, false, true);
+        doTestClusterTopChangesWhileServiceCalling(true, 4);
     }
 
     /**
@@ -265,7 +274,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
 
             Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
 
-            addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+            registerServiceTopologyUpdateListener(srvcTopOnClient::addAll);
 
             ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
 
@@ -306,7 +315,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
 
             Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
 
-            addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+            registerServiceTopologyUpdateListener(srvcTopOnClient::addAll);
 
             ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
 
@@ -341,7 +350,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
             waitForCondition(() -> {
                 svc.testMethod();
 
-                return srvcTopOnClient.size() == GRIDS;
+                return srvcTopOnClient.size() == BASE_NODES_CNT;
             }, getTestTimeout());
 
             for (Ignite ig : G.allGrids())
@@ -350,29 +359,22 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
     }
 
     /** */
-    private void doTestClusterTopChangesWhileServiceCalling(
-        int nodesCnt,
-        boolean addNodes,
-        boolean multiThreaded)
-        throws Exception {
-        assert nodesCnt > 0;
-
-        Set<UUID> newNodesUUIDs = new GridConcurrentHashSet<>();
-
+    private void doTestClusterTopChangesWhileServiceCalling(boolean shrinkTop, 
int svcInvokeThreads) throws Exception {
         // Start additional nodes to stop them.
-        if (!addNodes) {
-            startGridsMultiThreaded(GRIDS, nodesCnt);
-
-            for (int i = GRIDS; i < GRIDS + nodesCnt; ++i)
-                newNodesUUIDs.add(grid(i).localNode().id());
+        if (shrinkTop) {
+            for (int nodeIdx = BASE_NODES_CNT; nodeIdx < BASE_NODES_CNT + 
TOP_UPD_NODES_CNT; nodeIdx++)
+                startGrid(nodeIdx);
         }
 
-        // Service topology on the clients.
-        Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+        Set<UUID> expInitSvcTop = resolveServiceTopology();
+
+        assertEquals(shrinkTop ? 5 : 2, expInitSvcTop.size());
 
-        addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+        // Last detected service topology on the client side.
+        AtomicReference<Set<UUID>> svcTop = new AtomicReference<>();
+
+        registerServiceTopologyUpdateListener(svcTop::set);
 
-        AtomicBoolean changeClusterTop = new AtomicBoolean();
         AtomicBoolean stopFlag = new AtomicBoolean();
 
         try (IgniteClient client = startClient()) {
@@ -380,64 +382,61 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
 
             ((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
 
-            IgniteInternalFuture<?> runFut = runMultiThreadedAsync(() -> {
-                do {
-                    try {
-                        svc.testMethod();
-                    }
-                    catch (ClientException e) {
-                        String m = e.getMessage();
-
-                        // TODO: IGNITE-20802 : Exception should not occur.
-                        // Client doesn't retry service invocation if the 
redirected-to service instance node leaves cluster.
-                        if (addNodes || (!m.contains("Node has left grid") && 
!m.contains("Failed to send job due to node failure"))
-                            || newNodesUUIDs.stream().noneMatch(nid -> 
m.contains(nid.toString())))
-                            throw e;
-                    }
-                }
-                while (!stopFlag.get());
-            }, multiThreaded ? 4 : 1, "ServiceTestLoader");
-
-            while (!stopFlag.get()) {
-                // Wait until the initial topology is received.
-                if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT 
: INIT_SRVC_NODES_CNT + nodesCnt)
-                    && changeClusterTop.compareAndSet(false, true)) {
-                    srvcTopOnClient.clear();
-
-                    for (int i = 0; i < nodesCnt; ++i) {
-                        int nodeIdx = GRIDS + i;
-
-                        runAsync(() -> {
-                            try {
-                                if (addNodes)
-                                    
newNodesUUIDs.add(startGrid(nodeIdx).localNode().id());
-                                else
-                                    stopGrid(nodeIdx);
-                            }
-                            catch (Exception e) {
-                                log.error("Unable to start or stop test 
grid.", e);
-
-                                stopFlag.set(true);
-                            }
-                        });
+            IgniteInternalFuture<?> svcInvokeFut = runMultiThreadedAsync(
+                () -> {
+                    do {
+                        try {
+                            svc.testMethod();
+                        }
+                        catch (Exception e) {
+                            String errMsg = e.getMessage();
+
+                            // TODO: IGNITE-20802 : Exception should not occur.
+                            // Client doesn't retry service invocation if the 
redirected-to service instance node leaves cluster.
+                            boolean isErrCausedByNodeLeave = 
errMsg.contains("Failed to execute task due to grid shutdown")
+                                || (errMsg.contains("Node has left grid") || 
errMsg.contains("Failed to send job due to node failure"))
+                                && expInitSvcTop.stream().anyMatch(id -> 
errMsg.contains(id.toString()));
+
+                            assertTrue(shrinkTop && isErrCausedByNodeLeave);
+                        }
                     }
-                }
+                    while (!stopFlag.get());
+                },
+                svcInvokeThreads,
+                "ServiceTestLoader"
+            );
 
-                // Stop if new excepted service topology received.
-                if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT 
+ nodesCnt : INIT_SRVC_NODES_CNT))
-                    stopFlag.set(true);
+            assertTrue(waitForCondition(() -> 
expInitSvcTop.equals(svcTop.get()), getTestTimeout()));
 
-                Thread.sleep(10);
+            Collection<IgniteInternalFuture<?>> topUpdFuts = 
ConcurrentHashMap.newKeySet();
+
+            for (int i = 0; i < TOP_UPD_NODES_CNT; ++i) {
+                int nodeIdx = BASE_NODES_CNT + i;
+
+                topUpdFuts.add(runAsync(() -> {
+                    if (shrinkTop)
+                        stopGrid(nodeIdx);
+                    else
+                        startGrid(nodeIdx);
+                }));
             }
 
-            runFut.get();
-        }
+            for (IgniteInternalFuture<?> topUpdFut : topUpdFuts)
+                topUpdFut.get(getTestTimeout());
+
+            Set<UUID> expUpdSvcTop = resolveServiceTopology();
+
+            assertEquals(shrinkTop ? 2 : 5, expUpdSvcTop.size());
 
-        // The initial nodes must always persist it the service topology.
-        assertTrue(srvcTopOnClient.contains(grid(1).localNode().id())
-            && srvcTopOnClient.contains(grid(2).localNode().id()));
+            assertTrue(waitForCondition(() -> 
expUpdSvcTop.equals(svcTop.get()), getTestTimeout()));
 
-        assertEquals(addNodes ? nodesCnt : 0, 
newNodesUUIDs.stream().filter(srvcTopOnClient::contains).count());
+            stopFlag.set(true);
+
+            svcInvokeFut.get(getTestTimeout());
+        }
+        finally {
+            stopFlag.set(true);
+        }
     }
 
     /**
@@ -502,7 +501,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
             ? top
             : grp.stream().filter(nid -> new 
TestNodeFilter().apply(grid(0).cluster().node(nid))).collect(Collectors.toSet());
 
-        addSrvcTopUpdateClientLogLsnr(uuids -> {
+        registerServiceTopologyUpdateListener(uuids -> {
             // Reset counters on the first topology update.
             if (top.isEmpty())
                 redirectCnt.set(0);
@@ -590,7 +589,7 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
     }
 
     /** Extracts ids of received service instance nodes from the client log. */
-    private static void addSrvcTopUpdateClientLogLsnr(Consumer<Set<UUID>> 
srvTopConsumer) {
+    private static void 
registerServiceTopologyUpdateListener(Consumer<Set<UUID>> srvTopConsumer) {
         clientLogLsnr.registerListener(s -> {
             if (s.contains("Topology of service '" + SRV_NAME + "' has been 
updated. The service instance nodes: ")) {
                 String nodes = s.substring(s.lastIndexOf(": [") + 3, 
s.length() - 2);
@@ -612,6 +611,15 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
             getClientConfiguration(grid(0)));
     }
 
+    /** */
+    private static Set<UUID> resolveServiceTopology() {
+        return G.allGrids().stream()
+            .map(g -> g.cluster().localNode())
+            .filter(TestNodeFilter::test)
+            .map(ClusterNode::id)
+            .collect(Collectors.toSet());
+    }
+
     /**
      * Accepts nodes with the name index equal to 1, 2 or >= GRIDS.
      */
@@ -621,21 +629,14 @@ public class ServiceAwarenessTest extends 
AbstractThinClientTest {
 
         /** {@inheritDoc} */
         @Override public boolean apply(ClusterNode node) {
-            String nodeName = node.attribute("org.apache.ignite.ignite.name");
-
-            if (F.isEmpty(nodeName))
-                return false;
-
-            int nodeIdx = -1;
+            return test(node);
+        }
 
-            try {
-                nodeIdx = 
Integer.parseInt(nodeName.substring(nodeName.length() - 1));
-            }
-            catch (Exception e) {
-                // No-op.
-            }
+        /** */
+        static boolean test(ClusterNode node) {
+            int nodeIdx = node.attribute(ATTR_NODE_IDX);
 
-            return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= GRIDS;
+            return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= BASE_NODES_CNT;
         }
     }
 

Reply via email to