This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e5305d76fb3 IGNITE-27911 Fixed flaky ContinuousQueryBuffersCleanupTest
(#12769)
e5305d76fb3 is described below
commit e5305d76fb37cf5ec8a83e3850c03762bbda3b7e
Author: Mikhail Petrov <[email protected]>
AuthorDate: Fri Feb 20 00:57:59 2026 +0300
IGNITE-27911 Fixed flaky ContinuousQueryBuffersCleanupTest (#12769)
---
...ils.java => TestCacheContinuousQueryUtils.java} | 2 +-
.../ContinuousQueryBuffersCleanupTest.java | 27 ++++++++++++----------
2 files changed, 16 insertions(+), 13 deletions(-)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/TestCacheConrinuousQueryUtils.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/TestCacheContinuousQueryUtils.java
similarity index 97%
rename from
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/TestCacheConrinuousQueryUtils.java
rename to
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/TestCacheContinuousQueryUtils.java
index 9f188200561..61d1bfba4ca 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/TestCacheConrinuousQueryUtils.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/TestCacheContinuousQueryUtils.java
@@ -21,7 +21,7 @@ import java.util.Map;
import org.apache.ignite.internal.util.GridAtomicLong;
/** */
-public class TestCacheConrinuousQueryUtils {
+public class TestCacheContinuousQueryUtils {
/** */
public static Map<Integer, CacheContinuousQueryEventBuffer>
partitionContinuesQueryEntryBuffers(
CacheContinuousQueryHandler<?, ?> hnd
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java
index 26187fe8050..8f453ca4a87 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java
@@ -29,7 +29,6 @@ import javax.cache.expiry.Duration;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
@@ -39,6 +38,7 @@ import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import
org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer;
@@ -48,14 +48,16 @@ import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static
org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
import static
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD;
-import static
org.apache.ignite.internal.processors.cache.query.continuous.TestCacheConrinuousQueryUtils.backupQueueSize;
-import static
org.apache.ignite.internal.processors.cache.query.continuous.TestCacheConrinuousQueryUtils.bufferedEntries;
-import static
org.apache.ignite.internal.processors.cache.query.continuous.TestCacheConrinuousQueryUtils.maxReceivedBackupAcknowledgeUpdateCounter;
-import static
org.apache.ignite.internal.processors.cache.query.continuous.TestCacheConrinuousQueryUtils.partitionContinuesQueryEntryBuffers;
+import static
org.apache.ignite.internal.processors.cache.query.continuous.TestCacheContinuousQueryUtils.backupQueueSize;
+import static
org.apache.ignite.internal.processors.cache.query.continuous.TestCacheContinuousQueryUtils.bufferedEntries;
+import static
org.apache.ignite.internal.processors.cache.query.continuous.TestCacheContinuousQueryUtils.maxReceivedBackupAcknowledgeUpdateCounter;
+import static
org.apache.ignite.internal.processors.cache.query.continuous.TestCacheContinuousQueryUtils.partitionContinuesQueryEntryBuffers;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
@@ -66,6 +68,7 @@ public class ContinuousQueryBuffersCleanupTest extends
GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
+ .setUserAttributes(singletonMap(IDX_ATTR,
getTestIgniteInstanceIndex(igniteInstanceName)))
.setCommunicationSpi(new TestRecordingCommunicationSpi()
.setAckSendThreshold(1));
}
@@ -77,11 +80,11 @@ public class ContinuousQueryBuffersCleanupTest extends
GridCommonAbstractTest {
/** */
@Test
- public void
testBackupUpdateAndBackupContinuusQueryAcknowledgmentReordered() throws
Exception {
+ public void
testBackupUpdateAndBackupContinuousQueryAcknowledgmentReordered() throws
Exception {
startGrids(2);
IgniteCache<Object, Object> cache = grid(0).createCache(new
CacheConfiguration<>()
- .setAffinity(new RendezvousAffinityFunction().setPartitions(2))
+ .setAffinity(new GridCacheModuloAffinityFunction(2, 1))
.setName(DEFAULT_CACHE_NAME)
.setBackups(1));
@@ -97,7 +100,7 @@ public class ContinuousQueryBuffersCleanupTest extends
GridCommonAbstractTest {
// Here we are waiting for initialization of CQ entry buffers on
backup node, otherwise backup node will ignore
// acknowledgements.
- assertTrue(waitForCondition(() ->
!continuosQueryEntryBuffers(grid(1)).isEmpty(), getTestTimeout()));
+ assertTrue(waitForCondition(() ->
!continuousQueryEntryBuffers(grid(1)).isEmpty(), getTestTimeout()));
spi(grid(0)).blockMessages(GridDhtAtomicSingleUpdateRequest.class,
grid(1).name());
@@ -111,7 +114,7 @@ public class ContinuousQueryBuffersCleanupTest extends
GridCommonAbstractTest {
assertTrue(cqListenerNotifiedLatch.await(getTestTimeout(),
MILLISECONDS));
- CacheContinuousQueryEventBuffer buf =
continuosQueryEntryBuffers(grid(1)).get(part);
+ CacheContinuousQueryEventBuffer buf =
continuousQueryEntryBuffers(grid(1)).get(part);
assertTrue(waitForCondition(() ->
maxReceivedBackupAcknowledgeUpdateCounter(buf).get() == 2, getTestTimeout()));
@@ -129,7 +132,7 @@ public class ContinuousQueryBuffersCleanupTest extends
GridCommonAbstractTest {
try (IgniteClient thinCli = Ignition.startClient(new
ClientConfiguration().setAddresses("127.0.0.1:10800"))) {
grid(0).createCache(new CacheConfiguration<Integer, Integer>()
- .setAffinity(new RendezvousAffinityFunction().setPartitions(2))
+ .setAffinity(new GridCacheModuloAffinityFunction(2, 1))
.setName(DEFAULT_CACHE_NAME)
.setWriteSynchronizationMode(FULL_SYNC)
.setBackups(1));
@@ -149,7 +152,7 @@ public class ContinuousQueryBuffersCleanupTest extends
GridCommonAbstractTest {
int cacheOpRounds =
Math.round(DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD * 0.3f);
- // We repeatedly perform 5 cache operations that raise CREATE, UPDATE,
REMOVED, EXPIRIED events.
+ // We repeatedly perform 5 cache operations that raise CREATE, UPDATE,
REMOVED, EXPIRED events.
// The total number of events is selected in a such way as to check
for a backup notification due to a
// buffer overflow, and then by timeout.
int expEvtsCnt = cacheOpRounds * 5;
@@ -215,7 +218,7 @@ public class ContinuousQueryBuffersCleanupTest extends
GridCommonAbstractTest {
}
/** */
- private Map<Integer, CacheContinuousQueryEventBuffer>
continuosQueryEntryBuffers(IgniteEx ignite) {
+ private Map<Integer, CacheContinuousQueryEventBuffer>
continuousQueryEntryBuffers(IgniteEx ignite) {
GridContinuousProcessor contProc = ignite.context().continuous();
return partitionContinuesQueryEntryBuffers(