This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c75818740e7 IGNITE-21713 Data streamer does not fire all the CQ events 
(#11269)
c75818740e7 is described below

commit c75818740e7c0419eb2d0c550cf6960707f9e32a
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Mon Mar 11 15:21:32 2024 +0300

    IGNITE-21713 Data streamer does not fire all the CQ events (#11269)
---
 .../continuous/CacheContinuousQueryHandler.java    |  8 +--
 .../continuous/GridContinuousProcessor.java        | 24 ++++---
 .../CacheContinuousQueryCounterAbstractTest.java   | 77 ++++++++++++++++++++++
 3 files changed, 92 insertions(+), 17 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 46e6cebc461..113ab773680 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1212,10 +1212,10 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
             else if (initUpdCntrs != null)
                 partCntrs = initUpdCntrs.get(partId);
 
-            rec = new 
CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), 
topVer,
-                partCntrs != null ? partCntrs.get2() : null);
-
-            CacheContinuousQueryPartitionRecovery oldRec = 
rcvs.putIfAbsent(partId, rec);
+            T2<Long, Long> partCntrs0 = partCntrs;
+            CacheContinuousQueryPartitionRecovery oldRec = 
rcvs.computeIfAbsent(partId, k ->
+                    new 
CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), 
topVer,
+                            partCntrs0 != null ? partCntrs0.get2() : null));
 
             if (oldRec != null)
                 rec = oldRec;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index bfc39f9eab4..dd8bd564a68 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -891,15 +891,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         // Whether local node is included in routine.
         boolean locIncluded = prjPred == null || 
prjPred.apply(ctx.discovery().localNode());
 
-        AbstractContinuousMessage msg;
-
-        try {
-            msg = createStartMessage(routineId, hnd, bufSize, interval, 
autoUnsubscribe, prjPred);
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(e);
-        }
-
         // Register per-routine notifications listener if ordered messaging is 
used.
         registerMessageListener(hnd);
 
@@ -922,6 +913,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                         true);
                 }
 
+                AbstractContinuousMessage msg = createStartMessage(routineId, 
hnd, bufSize, interval, autoUnsubscribe, prjPred);
+
                 ctx.discovery().sendCustomEvent(msg);
             }
             catch (IgniteCheckedException e) {
@@ -1001,10 +994,15 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 reqData.p2pMarshal(marsh);
             }
 
-            return new StartRoutineDiscoveryMessage(
-                routineId,
-                reqData,
-                reqData.handler().keepBinary());
+            StartRoutineDiscoveryMessage msg = new 
StartRoutineDiscoveryMessage(
+                    routineId,
+                    reqData,
+                    reqData.handler().keepBinary());
+
+            if (hnd.updateCounters() != null)
+                msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());
+
+            return msg;
         }
         else {
             assert discoProtoVer == 2 : discoProtoVer;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
index c86030e7f56..e9dff773949 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
@@ -31,9 +31,11 @@ import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -42,6 +44,9 @@ import org.apache.ignite.cache.store.CacheStoreAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -56,6 +61,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  * Continuous queries counter tests.
@@ -76,6 +82,8 @@ public abstract class CacheContinuousQueryCounterAbstractTest 
extends GridCommon
 
         
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
 
+        cfg.setDataStreamerThreadPoolSize(2);
+
         return cfg;
     }
 
@@ -227,6 +235,75 @@ public abstract class 
CacheContinuousQueryCounterAbstractTest extends GridCommon
         }
     }
 
+    /**
+     * The main idea of the test is to emulate entries reordering after update 
counter is already defined.
+     * Thus we can obtain situation when entries from equal partition already 
obtained update counter but finally registered in different
+     * order due to some pauses in data streamer striped pool threads. 
Sprecial latch on event is for race emulation.
+     * This test use assumption that {@link 
org.apache.ignite.internal.processors.cache.GridCacheMapEntry#innerSet} raises
+     * {@code EVT_CACHE_OBJECT_PUT} after update counter was invoked.
+     */
+    @Test
+    public void testDataStreamerItemsReordered() throws 
IgniteInterruptedCheckedException {
+        AtomicInteger partitionWithSlowThread = new AtomicInteger(-1);
+        CountDownLatch partLatch = new CountDownLatch(1);
+
+        CacheConfiguration cacheCfg = new CacheConfiguration("ds-cq-test");
+        cacheCfg.setAffinity(new RendezvousAffinityFunction(false, 2));
+        IgniteCache<Integer, Integer> cache = 
grid(0).getOrCreateCache(cacheCfg);
+
+        grid(0).events().enableLocal(EventType.EVT_CACHE_OBJECT_PUT);
+
+        grid(0).events().localListen(e -> {
+            CacheEvent ce = (CacheEvent)e;
+            if (partitionWithSlowThread.compareAndSet(-1, ce.partition())) {
+                try {
+                    partLatch.await();
+                }
+                catch (InterruptedException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+
+            if (partitionWithSlowThread.get() == ce.partition()) {
+                partLatch.countDown();
+            }
+
+            return true;
+        }, EventType.EVT_CACHE_OBJECT_PUT);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        ConcurrentHashMap<Integer, Integer> itemsHolder = new 
ConcurrentHashMap<>();
+
+        qry.setLocalListener(events -> {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : 
events) {
+                itemsHolder.put(evt.getKey(), evt.getValue());
+            }
+        });
+
+        cache.query(qry);
+
+        int itemsToProc = gridCount() * 5000;
+
+        try (IgniteDataStreamer<Integer, Integer> stmr = 
grid(0).dataStreamer("ds-cq-test")) {
+            stmr.allowOverwrite(true);
+            stmr.perNodeBufferSize(1024);
+            stmr.autoFlushFrequency(500);
+
+            // Stream entries.
+            for (int i = 0; i < itemsToProc; i++) {
+                stmr.addData(i, i);
+
+                if (i == 1024)
+                    stmr.tryFlush();
+            }
+
+            stmr.flush();
+        }
+
+        assertTrue(waitForCondition(() -> itemsToProc == itemsHolder.size(), 
2000));
+    }
+
     /**
      * @throws Exception If failed.
      */

Reply via email to