This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 969f21a3aa8 IGNITE-28041 Fixed flaky ClientSlowDiscoveryAbstractTest
(#12845)
969f21a3aa8 is described below
commit 969f21a3aa877ee970d9e77750b031596ded1926
Author: Mikhail Petrov <[email protected]>
AuthorDate: Thu Mar 5 14:59:10 2026 +0300
IGNITE-28041 Fixed flaky ClientSlowDiscoveryAbstractTest (#12845)
---
.../cache/ClientSlowDiscoveryAbstractTest.java | 66 +++++++--------
.../ClientSlowDiscoveryTopologyChangeTest.java | 99 ++++++++++------------
.../ClientSlowDiscoveryTransactionRemapTest.java | 40 +++------
3 files changed, 93 insertions(+), 112 deletions(-)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
index 7b437382a48..d81ef052420 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.function.Supplier;
-import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -26,52 +24,49 @@ import
org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.spi.communication.CommunicationSpi;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-/**
- *
- */
-public class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/** */
+abstract class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
/** Cache name. */
protected static final String CACHE_NAME = "cache";
- /** Cache configuration. */
- private final CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
- .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
- .setReadFromBackup(false)
- .setBackups(1)
- .setAffinity(new RendezvousAffinityFunction(false, 64));
-
- /** Communication SPI supplier. */
- protected Supplier<CommunicationSpi> communicationSpiSupplier =
TestRecordingCommunicationSpi::new;
-
- /** Discovery SPI supplier. */
- protected Supplier<DiscoverySpi> discoverySpiSupplier =
TcpDiscoverySpi::new;
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setReadFromBackup(false)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 64)));
+ }
+
+ /** */
+ protected IgniteConfiguration getConfiguration(int nodeIdx,
TcpDiscoverySpi discoSpi) throws Exception {
+ IgniteConfiguration cfg =
getConfiguration(getTestIgniteInstanceName(nodeIdx));
- cfg.setConsistentId(igniteInstanceName);
- cfg.setCacheConfiguration(ccfg);
- cfg.setCommunicationSpi(communicationSpiSupplier.get());
- cfg.setDiscoverySpi(discoverySpiSupplier.get());
+
cfg.setDiscoverySpi(discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
return cfg;
}
- /**
- *
- */
+ /** */
static class NodeJoinInterceptingDiscoverySpi extends TcpDiscoverySpi {
/** Interceptor. */
- protected volatile IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage>
interceptor;
+ private final IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage>
interceptor;
+
+ /** */
+
NodeJoinInterceptingDiscoverySpi(IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage>
interceptor) {
+ this.interceptor = interceptor;
+ }
/** {@inheritDoc} */
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
@@ -80,12 +75,15 @@ public class ClientSlowDiscoveryAbstractTest extends
GridCommonAbstractTest {
}
}
- /**
- *
- */
+ /** */
static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi
{
/** Interceptor. */
- protected volatile IgniteInClosure<DiscoveryCustomMessage> interceptor;
+ private final IgniteInClosure<DiscoveryCustomMessage> interceptor;
+
+ /** */
+
CustomMessageInterceptingDiscoverySpi(IgniteInClosure<DiscoveryCustomMessage>
interceptor) {
+ this.interceptor = interceptor;
+ }
/** {@inheritDoc} */
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
index 6a3c0214096..a202bf89a7e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -28,31 +29,22 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
-/**
- *
- */
+/** */
public class ClientSlowDiscoveryTopologyChangeTest extends
ClientSlowDiscoveryAbstractTest {
- /**
- *
- */
- @Before
- public void before() throws Exception {
+ /** {@inheritDoc} */
+ @Override public void beforeTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
- /**
- *
- */
- @After
- public void after() throws Exception {
+ /** {@inheritDoc} */
+ @Override public void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
@@ -75,53 +67,25 @@ public class ClientSlowDiscoveryTopologyChangeTest extends
ClientSlowDiscoveryAb
for (int k = 0; k < 64; k++)
crd.cache(CACHE_NAME).put(k, k);
- TestRecordingCommunicationSpi clientCommSpi = new
TestRecordingCommunicationSpi();
-
- // Delay client join process.
- clientCommSpi.blockMessages((node, msg) -> {
- if (!(msg instanceof GridDhtPartitionsSingleMessage))
- return false;
+ CountDownLatch cliDiscoSpiUnblockedLatch = new CountDownLatch(1);
- GridDhtPartitionsSingleMessage singleMsg =
(GridDhtPartitionsSingleMessage)msg;
-
- return Optional.ofNullable(singleMsg.exchangeId())
- .map(GridDhtPartitionExchangeId::topologyVersion)
- .filter(topVer -> topVer.equals(new AffinityTopologyVersion(4,
0)))
- .isPresent();
- });
-
- communicationSpiSupplier = () -> clientCommSpi;
-
- CustomMessageInterceptingDiscoverySpi clientDiscoSpi = new
CustomMessageInterceptingDiscoverySpi();
-
- CountDownLatch clientDiscoSpiBlock = new CountDownLatch(1);
-
- // Delay cache destroying on client node.
- clientDiscoSpi.interceptor = (msg) -> {
- if (!(msg instanceof DynamicCacheChangeBatch))
- return;
-
- DynamicCacheChangeBatch cacheChangeBatch =
(DynamicCacheChangeBatch)msg;
-
- boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
- .anyMatch(req -> req.stop() &&
req.cacheName().equals(CACHE_NAME));
+ IgniteConfiguration cliCfg = getConfiguration(3,
createBlockingDiscoverySpi(cliDiscoSpiUnblockedLatch));
- if (hasCacheStopReq)
- U.awaitQuiet(clientDiscoSpiBlock);
- };
+ TestRecordingCommunicationSpi commSpi =
(TestRecordingCommunicationSpi)cliCfg.getCommunicationSpi();
- discoverySpiSupplier = () -> clientDiscoSpi;
+ // Delay client join process.
+ blockSingleMessage(commSpi);
- IgniteInternalFuture<IgniteEx> clientStartFut =
GridTestUtils.runAsync(() -> startClientGrid(3));
+ IgniteInternalFuture<IgniteEx> clientStartFut =
GridTestUtils.runAsync(() -> startClientGrid(cliCfg));
// Wait till client node starts join process.
- clientCommSpi.waitForBlocked();
+ commSpi.waitForBlocked();
// Destroy cache on server nodes.
crd.destroyCache(CACHE_NAME);
// Resume client join.
- clientCommSpi.stopBlock();
+ commSpi.stopBlock();
// Client join should succeed.
IgniteEx client = clientStartFut.get();
@@ -143,7 +107,7 @@ public class ClientSlowDiscoveryTopologyChangeTest extends
ClientSlowDiscoveryAb
}
finally {
// Resume processing cache destroy on client node.
- clientDiscoSpiBlock.countDown();
+ cliDiscoSpiUnblockedLatch.countDown();
}
// Wait till cache destroyed on client node.
@@ -157,4 +121,35 @@ public class ClientSlowDiscoveryTopologyChangeTest extends
ClientSlowDiscoveryAb
Assert.assertNull("Cache should be destroyed on client node",
client.cache(CACHE_NAME));
}
+
+ /** */
+ private TcpDiscoverySpi createBlockingDiscoverySpi(CountDownLatch
discoSpiUnblockedLatch) {
+ return new CustomMessageInterceptingDiscoverySpi(msg -> {
+ if (!(msg instanceof DynamicCacheChangeBatch))
+ return;
+
+ DynamicCacheChangeBatch cacheChangeBatch =
(DynamicCacheChangeBatch)msg;
+
+ boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
+ .anyMatch(req -> req.stop() &&
req.cacheName().equals(CACHE_NAME));
+
+ if (hasCacheStopReq)
+ U.awaitQuiet(discoSpiUnblockedLatch);
+ });
+ }
+
+ /** */
+ private void blockSingleMessage(TestRecordingCommunicationSpi commSpi) {
+ commSpi.blockMessages((node, msg) -> {
+ if (!(msg instanceof GridDhtPartitionsSingleMessage))
+ return false;
+
+ GridDhtPartitionsSingleMessage singleMsg =
(GridDhtPartitionsSingleMessage)msg;
+
+ return Optional.ofNullable(singleMsg.exchangeId())
+ .map(GridDhtPartitionExchangeId::topologyVersion)
+ .filter(topVer -> topVer.equals(new AffinityTopologyVersion(4,
0)))
+ .isPresent();
+ });
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
index d408333bc91..b1c1958f61a 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
@@ -34,15 +34,12 @@ import
org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionTimeoutException;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -204,9 +201,9 @@ public class ClientSlowDiscoveryTransactionRemapTest
extends ClientSlowDiscovery
/**
* Interface to work with cache operations within transaction.
*/
- private static interface TestTransaction<K, V> {
+ private interface TestTransaction<K, V> {
/** Possible operations. */
- static int POSSIBLE_OPERATIONS = 5;
+ int POSSIBLE_OPERATIONS = 5;
/**
* @param key Key.
@@ -347,7 +344,7 @@ public class ClientSlowDiscoveryTransactionRemapTest
extends ClientSlowDiscovery
public IgniteInClosure<TestTransaction<?, ?>> operation;
/** Client disco spi block. */
- private CountDownLatch clientDiscoSpiBlock;
+ private CountDownLatch cliDiscoSpiUnblockedLatch;
/** Client node to perform operations. */
private IgniteEx clnt;
@@ -368,34 +365,25 @@ public class ClientSlowDiscoveryTransactionRemapTest
extends ClientSlowDiscovery
cleanPersistenceDir();
}
- /** */
- @Before
- public void before() throws Exception {
- NodeJoinInterceptingDiscoverySpi clientDiscoSpi = new
NodeJoinInterceptingDiscoverySpi();
-
- clientDiscoSpiBlock = new CountDownLatch(1);
+ /** {@inheritDoc} */
+ @Override public void beforeTest() throws Exception {
+ cliDiscoSpiUnblockedLatch = new CountDownLatch(1);
- // Delay node join of second client.
- clientDiscoSpi.interceptor = msg -> {
+ NodeJoinInterceptingDiscoverySpi clientDiscoSpi = new
NodeJoinInterceptingDiscoverySpi(msg -> {
if (msg.nodeId().toString().endsWith("2"))
- U.awaitQuiet(clientDiscoSpiBlock);
- };
-
- discoverySpiSupplier = () -> clientDiscoSpi;
+ U.awaitQuiet(cliDiscoSpiUnblockedLatch);
+ });
- clnt = startClientGrid(1);
+ clnt = startClientGrid(getConfiguration(1, clientDiscoSpi));
for (int k = 0; k < 64; k++)
clnt.cache(CACHE_NAME).put(k, 0);
- discoverySpiSupplier = TcpDiscoverySpi::new;
-
startClientGrid(2);
}
- /** */
- @After
- public void after() throws Exception {
+ /** {@inheritDoc} */
+ @Override public void afterTest() throws Exception {
// Stop client nodes.
stopGrid(1);
stopGrid(2);
@@ -421,7 +409,7 @@ public class ClientSlowDiscoveryTransactionRemapTest
extends ClientSlowDiscovery
// Expected.
}
finally {
- clientDiscoSpiBlock.countDown();
+ cliDiscoSpiUnblockedLatch.countDown();
}
// After resume second client join, transaction should succesfully
await new affinity and commit.
@@ -451,7 +439,7 @@ public class ClientSlowDiscoveryTransactionRemapTest
extends ClientSlowDiscovery
// Expected.
}
finally {
- clientDiscoSpiBlock.countDown();
+ cliDiscoSpiUnblockedLatch.countDown();
}
// After resume second client join, transaction should be timed out
and rolled back.