This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d463d94f4c8 IGNITE-26877 Fixed flaky
ServiceAwarenessTest.testNodesLeaveMultiThreaded (#12488)
d463d94f4c8 is described below
commit d463d94f4c820306d427164a18a26a7b8e7bab7e
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sun Nov 2 19:05:19 2025 +0300
IGNITE-26877 Fixed flaky ServiceAwarenessTest.testNodesLeaveMultiThreaded
(#12488)
---
.../internal/client/thin/ServiceAwarenessTest.java | 197 +++++++++++----------
1 file changed, 99 insertions(+), 98 deletions(-)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
index 53d36b30752..ebfdb0e6f24 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
@@ -24,9 +24,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -75,11 +77,17 @@ import static
org.apache.ignite.testframework.GridTestUtils.waitForCondition;
* Checks the service awareness feature of the thin client.
*/
public class ServiceAwarenessTest extends AbstractThinClientTest {
+ /** */
+ private static final String ATTR_NODE_IDX = "test.node.idx";
+
/** Node-filter service name. */
private static final String SRV_NAME = "node_filtered_svc";
/** Number of grids at the test start. */
- private static final int GRIDS = 4;
+ private static final int BASE_NODES_CNT = 4;
+
+ /** */
+ private static final int TOP_UPD_NODES_CNT = 3;
/** Number of node instances with the initial service deployment. */
private static final int INIT_SRVC_NODES_CNT = 2;
@@ -95,6 +103,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setDiscoverySpi(new TestBlockingDiscoverySpi());
+ cfg.setUserAttributes(Collections.singletonMap(ATTR_NODE_IDX,
getTestIgniteInstanceIndex(igniteInstanceName)));
return cfg;
}
@@ -147,7 +156,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- startGrids(GRIDS);
+ startGrids(BASE_NODES_CNT);
grid(1).services().deploy(serviceCfg());
}
@@ -160,7 +169,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
// Service topology on the client.
Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
- addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+ registerServiceTopologyUpdateListener(srvcTopOnClient::addAll);
AtomicBoolean svcRunFlag = new AtomicBoolean(true);
@@ -180,13 +189,13 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
// Delays service redeployment and the service topology update on
the server side.
testDisco.toBlock.add(ServiceClusterDeploymentResultBatch.class);
- startGrid(GRIDS);
+ startGrid(BASE_NODES_CNT);
waitForCondition(() -> testDisco.blocked.size() == 1,
getTestTimeout());
// Ensure all the nodes have started but the service topology
hasn't updated yet.
for (Ignite ig : G.allGrids()) {
- assertEquals(ig.cluster().nodes().size(), GRIDS + 1);
+ assertEquals(ig.cluster().nodes().size(), BASE_NODES_CNT + 1);
// Ensure there are still SRVC_FILTERED_NOIDES_CNT nodes with
the service instance.
assertEquals(((IgniteEx)ig).context().service().serviceTopology(SRV_NAME,
0).size(),
@@ -195,7 +204,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
// Ensure the client's topology is not updated.
assertTrue(srvcTopOnClient.size() == INIT_SRVC_NODES_CNT
- && !srvcTopOnClient.contains(grid(GRIDS).localNode().id()));
+ &&
!srvcTopOnClient.contains(grid(BASE_NODES_CNT).localNode().id()));
testDisco.release();
@@ -216,7 +225,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
waitForCondition(() -> srvcTopOnClient.size() == 3 &&
srvcTopOnClient.contains(grid(1).localNode().id())
&& srvcTopOnClient.contains(grid(2).localNode().id())
- && srvcTopOnClient.contains(grid(GRIDS).localNode().id()),
getTestTimeout());
+ &&
srvcTopOnClient.contains(grid(BASE_NODES_CNT).localNode().id()),
getTestTimeout());
}
finally {
svcRunFlag.set(false);
@@ -228,7 +237,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
*/
@Test
public void testNodesJoinSingleThreaded() throws Exception {
- doTestClusterTopChangesWhileServiceCalling(3, true, false);
+ doTestClusterTopChangesWhileServiceCalling(false, 1);
}
/**
@@ -236,7 +245,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
*/
@Test
public void testNodesJoinMultiThreaded() throws Exception {
- doTestClusterTopChangesWhileServiceCalling(3, true, true);
+ doTestClusterTopChangesWhileServiceCalling(false, 4);
}
/**
@@ -244,7 +253,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
*/
@Test
public void testNodesLeaveSingleThreaded() throws Exception {
- doTestClusterTopChangesWhileServiceCalling(3, false, false);
+ doTestClusterTopChangesWhileServiceCalling(true, 1);
}
/**
@@ -252,7 +261,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
*/
@Test
public void testNodesLeaveMultiThreaded() throws Exception {
- doTestClusterTopChangesWhileServiceCalling(3, false, true);
+ doTestClusterTopChangesWhileServiceCalling(true, 4);
}
/**
@@ -265,7 +274,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
- addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+ registerServiceTopologyUpdateListener(srvcTopOnClient::addAll);
((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
@@ -306,7 +315,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
- addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+ registerServiceTopologyUpdateListener(srvcTopOnClient::addAll);
((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
@@ -341,7 +350,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
waitForCondition(() -> {
svc.testMethod();
- return srvcTopOnClient.size() == GRIDS;
+ return srvcTopOnClient.size() == BASE_NODES_CNT;
}, getTestTimeout());
for (Ignite ig : G.allGrids())
@@ -350,29 +359,22 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
}
/** */
- private void doTestClusterTopChangesWhileServiceCalling(
- int nodesCnt,
- boolean addNodes,
- boolean multiThreaded)
- throws Exception {
- assert nodesCnt > 0;
-
- Set<UUID> newNodesUUIDs = new GridConcurrentHashSet<>();
-
+ private void doTestClusterTopChangesWhileServiceCalling(boolean shrinkTop,
int svcInvokeThreads) throws Exception {
// Start additional nodes to stop them.
- if (!addNodes) {
- startGridsMultiThreaded(GRIDS, nodesCnt);
-
- for (int i = GRIDS; i < GRIDS + nodesCnt; ++i)
- newNodesUUIDs.add(grid(i).localNode().id());
+ if (shrinkTop) {
+ for (int nodeIdx = BASE_NODES_CNT; nodeIdx < BASE_NODES_CNT +
TOP_UPD_NODES_CNT; nodeIdx++)
+ startGrid(nodeIdx);
}
- // Service topology on the clients.
- Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>();
+ Set<UUID> expInitSvcTop = resolveServiceTopology();
+
+ assertEquals(shrinkTop ? 5 : 2, expInitSvcTop.size());
- addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll);
+ // Last detected service topology on the client side.
+ AtomicReference<Set<UUID>> svcTop = new AtomicReference<>();
+
+ registerServiceTopologyUpdateListener(svcTop::set);
- AtomicBoolean changeClusterTop = new AtomicBoolean();
AtomicBoolean stopFlag = new AtomicBoolean();
try (IgniteClient client = startClient()) {
@@ -380,64 +382,61 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
((GridTestLog4jLogger)log).setLevel(Level.DEBUG);
- IgniteInternalFuture<?> runFut = runMultiThreadedAsync(() -> {
- do {
- try {
- svc.testMethod();
- }
- catch (ClientException e) {
- String m = e.getMessage();
-
- // TODO: IGNITE-20802 : Exception should not occur.
- // Client doesn't retry service invocation if the
redirected-to service instance node leaves cluster.
- if (addNodes || (!m.contains("Node has left grid") &&
!m.contains("Failed to send job due to node failure"))
- || newNodesUUIDs.stream().noneMatch(nid ->
m.contains(nid.toString())))
- throw e;
- }
- }
- while (!stopFlag.get());
- }, multiThreaded ? 4 : 1, "ServiceTestLoader");
-
- while (!stopFlag.get()) {
- // Wait until the initial topology is received.
- if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT
: INIT_SRVC_NODES_CNT + nodesCnt)
- && changeClusterTop.compareAndSet(false, true)) {
- srvcTopOnClient.clear();
-
- for (int i = 0; i < nodesCnt; ++i) {
- int nodeIdx = GRIDS + i;
-
- runAsync(() -> {
- try {
- if (addNodes)
-
newNodesUUIDs.add(startGrid(nodeIdx).localNode().id());
- else
- stopGrid(nodeIdx);
- }
- catch (Exception e) {
- log.error("Unable to start or stop test
grid.", e);
-
- stopFlag.set(true);
- }
- });
+ IgniteInternalFuture<?> svcInvokeFut = runMultiThreadedAsync(
+ () -> {
+ do {
+ try {
+ svc.testMethod();
+ }
+ catch (Exception e) {
+ String errMsg = e.getMessage();
+
+ // TODO: IGNITE-20802 : Exception should not occur.
+ // Client doesn't retry service invocation if the
redirected-to service instance node leaves cluster.
+ boolean isErrCausedByNodeLeave =
errMsg.contains("Failed to execute task due to grid shutdown")
+ || (errMsg.contains("Node has left grid") ||
errMsg.contains("Failed to send job due to node failure"))
+ && expInitSvcTop.stream().anyMatch(id ->
errMsg.contains(id.toString()));
+
+ assertTrue(shrinkTop && isErrCausedByNodeLeave);
+ }
}
- }
+ while (!stopFlag.get());
+ },
+ svcInvokeThreads,
+ "ServiceTestLoader"
+ );
- // Stop if new excepted service topology received.
- if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT
+ nodesCnt : INIT_SRVC_NODES_CNT))
- stopFlag.set(true);
+ assertTrue(waitForCondition(() ->
expInitSvcTop.equals(svcTop.get()), getTestTimeout()));
- Thread.sleep(10);
+ Collection<IgniteInternalFuture<?>> topUpdFuts =
ConcurrentHashMap.newKeySet();
+
+ for (int i = 0; i < TOP_UPD_NODES_CNT; ++i) {
+ int nodeIdx = BASE_NODES_CNT + i;
+
+ topUpdFuts.add(runAsync(() -> {
+ if (shrinkTop)
+ stopGrid(nodeIdx);
+ else
+ startGrid(nodeIdx);
+ }));
}
- runFut.get();
- }
+ for (IgniteInternalFuture<?> topUpdFut : topUpdFuts)
+ topUpdFut.get(getTestTimeout());
+
+ Set<UUID> expUpdSvcTop = resolveServiceTopology();
+
+ assertEquals(shrinkTop ? 2 : 5, expUpdSvcTop.size());
- // The initial nodes must always persist it the service topology.
- assertTrue(srvcTopOnClient.contains(grid(1).localNode().id())
- && srvcTopOnClient.contains(grid(2).localNode().id()));
+ assertTrue(waitForCondition(() ->
expUpdSvcTop.equals(svcTop.get()), getTestTimeout()));
- assertEquals(addNodes ? nodesCnt : 0,
newNodesUUIDs.stream().filter(srvcTopOnClient::contains).count());
+ stopFlag.set(true);
+
+ svcInvokeFut.get(getTestTimeout());
+ }
+ finally {
+ stopFlag.set(true);
+ }
}
/**
@@ -502,7 +501,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
? top
: grp.stream().filter(nid -> new
TestNodeFilter().apply(grid(0).cluster().node(nid))).collect(Collectors.toSet());
- addSrvcTopUpdateClientLogLsnr(uuids -> {
+ registerServiceTopologyUpdateListener(uuids -> {
// Reset counters on the first topology update.
if (top.isEmpty())
redirectCnt.set(0);
@@ -590,7 +589,7 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
}
/** Extracts ids of received service instance nodes from the client log. */
- private static void addSrvcTopUpdateClientLogLsnr(Consumer<Set<UUID>>
srvTopConsumer) {
+ private static void
registerServiceTopologyUpdateListener(Consumer<Set<UUID>> srvTopConsumer) {
clientLogLsnr.registerListener(s -> {
if (s.contains("Topology of service '" + SRV_NAME + "' has been
updated. The service instance nodes: ")) {
String nodes = s.substring(s.lastIndexOf(": [") + 3,
s.length() - 2);
@@ -612,6 +611,15 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
getClientConfiguration(grid(0)));
}
+ /** */
+ private static Set<UUID> resolveServiceTopology() {
+ return G.allGrids().stream()
+ .map(g -> g.cluster().localNode())
+ .filter(TestNodeFilter::test)
+ .map(ClusterNode::id)
+ .collect(Collectors.toSet());
+ }
+
/**
* Accepts nodes with the name index equal to 1, 2 or >= GRIDS.
*/
@@ -621,21 +629,14 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
- String nodeName = node.attribute("org.apache.ignite.ignite.name");
-
- if (F.isEmpty(nodeName))
- return false;
-
- int nodeIdx = -1;
+ return test(node);
+ }
- try {
- nodeIdx =
Integer.parseInt(nodeName.substring(nodeName.length() - 1));
- }
- catch (Exception e) {
- // No-op.
- }
+ /** */
+ static boolean test(ClusterNode node) {
+ int nodeIdx = node.attribute(ATTR_NODE_IDX);
- return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= GRIDS;
+ return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= BASE_NODES_CNT;
}
}