This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c75818740e7 IGNITE-21713 Data streamer does not fire all the CQ events
(#11269)
c75818740e7 is described below
commit c75818740e7c0419eb2d0c550cf6960707f9e32a
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Mon Mar 11 15:21:32 2024 +0300
IGNITE-21713 Data streamer does not fire all the CQ events (#11269)
---
.../continuous/CacheContinuousQueryHandler.java | 8 +--
.../continuous/GridContinuousProcessor.java | 24 ++++---
.../CacheContinuousQueryCounterAbstractTest.java | 77 ++++++++++++++++++++++
3 files changed, 92 insertions(+), 17 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 46e6cebc461..113ab773680 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1212,10 +1212,10 @@ public class CacheContinuousQueryHandler<K, V>
implements GridContinuousHandler
else if (initUpdCntrs != null)
partCntrs = initUpdCntrs.get(partId);
- rec = new
CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
topVer,
- partCntrs != null ? partCntrs.get2() : null);
-
- CacheContinuousQueryPartitionRecovery oldRec =
rcvs.putIfAbsent(partId, rec);
+ T2<Long, Long> partCntrs0 = partCntrs;
+ CacheContinuousQueryPartitionRecovery oldRec =
rcvs.computeIfAbsent(partId, k ->
+ new
CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
topVer,
+ partCntrs0 != null ? partCntrs0.get2() : null));
if (oldRec != null)
rec = oldRec;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index bfc39f9eab4..dd8bd564a68 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -891,15 +891,6 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
// Whether local node is included in routine.
boolean locIncluded = prjPred == null ||
prjPred.apply(ctx.discovery().localNode());
- AbstractContinuousMessage msg;
-
- try {
- msg = createStartMessage(routineId, hnd, bufSize, interval,
autoUnsubscribe, prjPred);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
-
// Register per-routine notifications listener if ordered messaging is
used.
registerMessageListener(hnd);
@@ -922,6 +913,8 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
true);
}
+ AbstractContinuousMessage msg = createStartMessage(routineId,
hnd, bufSize, interval, autoUnsubscribe, prjPred);
+
ctx.discovery().sendCustomEvent(msg);
}
catch (IgniteCheckedException e) {
@@ -1001,10 +994,15 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
reqData.p2pMarshal(marsh);
}
- return new StartRoutineDiscoveryMessage(
- routineId,
- reqData,
- reqData.handler().keepBinary());
+ StartRoutineDiscoveryMessage msg = new
StartRoutineDiscoveryMessage(
+ routineId,
+ reqData,
+ reqData.handler().keepBinary());
+
+ if (hnd.updateCounters() != null)
+ msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());
+
+ return msg;
}
else {
assert discoProtoVer == 2 : discoProtoVer;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
index c86030e7f56..e9dff773949 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
@@ -31,9 +31,11 @@ import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.CacheQueryEntryEvent;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
@@ -42,6 +44,9 @@ import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -56,6 +61,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Continuous queries counter tests.
@@ -76,6 +82,8 @@ public abstract class CacheContinuousQueryCounterAbstractTest
extends GridCommon
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+ cfg.setDataStreamerThreadPoolSize(2);
+
return cfg;
}
@@ -227,6 +235,75 @@ public abstract class
CacheContinuousQueryCounterAbstractTest extends GridCommon
}
}
+ /**
+ * The main idea of the test is to emulate entries reordering after update
counter is already defined.
+ * Thus we can obtain situation when entries from equal partition already
obtained update counter but finally registered in different
+ * order due to some pauses in data streamer striped pool threads.
Sprecial latch on event is for race emulation.
+ * This test use assumption that {@link
org.apache.ignite.internal.processors.cache.GridCacheMapEntry#innerSet} raises
+ * {@code EVT_CACHE_OBJECT_PUT} after update counter was invoked.
+ */
+ @Test
+ public void testDataStreamerItemsReordered() throws
IgniteInterruptedCheckedException {
+ AtomicInteger partitionWithSlowThread = new AtomicInteger(-1);
+ CountDownLatch partLatch = new CountDownLatch(1);
+
+ CacheConfiguration cacheCfg = new CacheConfiguration("ds-cq-test");
+ cacheCfg.setAffinity(new RendezvousAffinityFunction(false, 2));
+ IgniteCache<Integer, Integer> cache =
grid(0).getOrCreateCache(cacheCfg);
+
+ grid(0).events().enableLocal(EventType.EVT_CACHE_OBJECT_PUT);
+
+ grid(0).events().localListen(e -> {
+ CacheEvent ce = (CacheEvent)e;
+ if (partitionWithSlowThread.compareAndSet(-1, ce.partition())) {
+ try {
+ partLatch.await();
+ }
+ catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ if (partitionWithSlowThread.get() == ce.partition()) {
+ partLatch.countDown();
+ }
+
+ return true;
+ }, EventType.EVT_CACHE_OBJECT_PUT);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ ConcurrentHashMap<Integer, Integer> itemsHolder = new
ConcurrentHashMap<>();
+
+ qry.setLocalListener(events -> {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> evt :
events) {
+ itemsHolder.put(evt.getKey(), evt.getValue());
+ }
+ });
+
+ cache.query(qry);
+
+ int itemsToProc = gridCount() * 5000;
+
+ try (IgniteDataStreamer<Integer, Integer> stmr =
grid(0).dataStreamer("ds-cq-test")) {
+ stmr.allowOverwrite(true);
+ stmr.perNodeBufferSize(1024);
+ stmr.autoFlushFrequency(500);
+
+ // Stream entries.
+ for (int i = 0; i < itemsToProc; i++) {
+ stmr.addData(i, i);
+
+ if (i == 1024)
+ stmr.tryFlush();
+ }
+
+ stmr.flush();
+ }
+
+ assertTrue(waitForCondition(() -> itemsToProc == itemsHolder.size(),
2000));
+ }
+
/**
* @throws Exception If failed.
*/