Repository: ignite
Updated Branches:
  refs/heads/master 5281a0fe6 -> 19c7f3882


IGNITE-3907 Fixed "Incorrect initialization CQ when node filter configured for 
cache"


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ebf354c5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ebf354c5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ebf354c5

Branch: refs/heads/master
Commit: ebf354c568d0802b7eed1cc6b9d251941dbce014
Parents: 2474e2b
Author: nikolay_tikhonov <ntikho...@gridgain.com>
Authored: Fri Sep 16 14:32:13 2016 +0300
Committer: nikolay_tikhonov <ntikho...@gridgain.com>
Committed: Fri Sep 16 14:32:13 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   5 -
 .../internal/GridMessageListenHandler.java      |   5 -
 .../continuous/CacheContinuousQueryHandler.java |   5 -
 .../continuous/GridContinuousHandler.java       |   8 -
 .../continuous/GridContinuousProcessor.java     |  33 ++--
 ...eContinuousQueryMultiNodesFilteringTest.java | 161 +++++++++++++++++++
 6 files changed, 170 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index b4b1e58..ed6998d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -262,11 +262,6 @@ class GridEventConsumeHandler implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void onListenerRegistered(UUID routineId, 
GridKernalContext ctx) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public void unregister(UUID routineId, GridKernalContext ctx) {
         assert routineId != null;
         assert ctx != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 2b8041d..1bca85c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -139,11 +139,6 @@ public class GridMessageListenHandler implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void onListenerRegistered(UUID routineId, 
GridKernalContext ctx) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public void unregister(UUID routineId, GridKernalContext ctx) {
         ctx.io().removeUserMessageListener(topic, pred);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7b3b47b..a5752ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -564,11 +564,6 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     }
 
     /** {@inheritDoc} */
-    @Override public void onListenerRegistered(UUID routineId, 
GridKernalContext ctx) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public void unregister(UUID routineId, GridKernalContext ctx) {
         assert routineId != null;
         assert ctx != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index c90746d..f14b450 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -57,14 +57,6 @@ public interface GridContinuousHandler extends 
Externalizable, Cloneable {
     public RegisterStatus register(UUID nodeId, UUID routineId, 
GridKernalContext ctx) throws IgniteCheckedException;
 
     /**
-     * Callback called after listener is registered and acknowledgement is 
sent.
-     *
-     * @param routineId Routine ID.
-     * @param ctx Kernal context.
-     */
-    public void onListenerRegistered(UUID routineId, GridKernalContext ctx);
-
-    /**
      * Unregisters listener.
      *
      * @param routineId Routine ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 5f61051..ad7ad4f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -478,11 +478,9 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
                     // Register handler only if local node passes projection 
predicate.
                     if ((item.prjPred == null || 
item.prjPred.apply(ctx.discovery().localNode())) &&
-                        !locInfos.containsKey(item.routineId)) {
-                        if (registerHandler(data.nodeId, item.routineId, 
item.hnd, item.bufSize, item.interval,
-                            item.autoUnsubscribe, false))
-                            item.hnd.onListenerRegistered(item.routineId, ctx);
-                    }
+                        !locInfos.containsKey(item.routineId))
+                        registerHandler(data.nodeId, item.routineId, item.hnd, 
item.bufSize, item.interval,
+                            item.autoUnsubscribe, false);
 
                     if (!item.autoUnsubscribe)
                         // Register routine locally.
@@ -509,14 +507,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                                 ctx.resource().injectGeneric(info.prjPred);
 
                             if (info.prjPred == null || 
info.prjPred.apply(ctx.discovery().localNode())) {
-                                if (registerHandler(clientNodeId,
+                                registerHandler(clientNodeId,
                                     routineId,
                                     info.hnd,
                                     info.bufSize,
                                     info.interval,
                                     info.autoUnsubscribe,
-                                    false))
-                                    info.hnd.onListenerRegistered(routineId, 
ctx);
+                                    false);
                             }
                         }
                         catch (IgniteCheckedException err) {
@@ -555,9 +552,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 GridContinuousHandler.RegisterStatus status = 
hnd.register(rmtInfo.nodeId, routineId, this.ctx);
 
                 assert status != GridContinuousHandler.RegisterStatus.DELAYED;
-
-                if (status == GridContinuousHandler.RegisterStatus.REGISTERED)
-                    hnd.onListenerRegistered(routineId, this.ctx);
             }
         }
     }
@@ -649,8 +643,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             try {
                 registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, 
interval, autoUnsubscribe, true);
 
-                hnd.onListenerRegistered(routineId, ctx);
-
                 return new GridFinishedFuture<>(routineId);
             }
             catch (IgniteCheckedException e) {
@@ -700,9 +692,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         startFuts.put(routineId, fut);
 
         try {
-            if (locIncluded
-                && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, 
interval, autoUnsubscribe, true))
-                hnd.onListenerRegistered(routineId, ctx);
+            if (locIncluded || hnd.isQuery())
+                registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, 
interval, autoUnsubscribe, true);
 
             ctx.discovery().sendCustomEvent(new 
StartRoutineDiscoveryMessage(routineId, reqData,
                 reqData.handler().keepBinary()));
@@ -1020,8 +1011,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 data.autoUnsubscribe()));
         }
 
-        boolean registered = false;
-
         if (err == null) {
             try {
                 IgnitePredicate<ClusterNode> prjPred = 
data.projectionPredicate();
@@ -1030,10 +1019,9 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     ctx.resource().injectGeneric(prjPred);
 
                 if ((prjPred == null || 
prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
-                    !locInfos.containsKey(routineId)) {
-                    registered = registerHandler(node.id(), routineId, hnd0, 
data.bufferSize(), data.interval(),
+                    !locInfos.containsKey(routineId))
+                    registerHandler(node.id(), routineId, hnd0, 
data.bufferSize(), data.interval(),
                         data.autoUnsubscribe(), false);
-                }
 
                 if (!data.autoUnsubscribe())
                     // Register routine locally.
@@ -1061,9 +1049,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         if (err != null)
             req.addError(ctx.localNodeId(), err);
-
-        if (registered)
-            hnd0.onListenerRegistered(routineId, ctx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
index 7000446..cf0c0d9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
@@ -17,9 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
 import javax.cache.event.CacheEntryCreatedListener;
@@ -33,9 +41,12 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -45,8 +56,10 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /** */
 @SuppressWarnings("unchecked")
@@ -57,13 +70,21 @@ public class 
GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
     /** */
     private static final int SERVER_GRIDS_COUNT = 6;
 
+    /** */
+    public static final int KEYS = 2_000;
+
     /** Cache entry operations' counts. */
     private static final ConcurrentMap<String, AtomicInteger> opCounts = new 
ConcurrentHashMap8<>();
 
+    /** Client. */
+    private static boolean client = false;
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
 
+        client = false;
+
         super.afterTest();
     }
 
@@ -122,6 +143,108 @@ public class 
GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWithNodeFilter() throws Exception {
+        List<QueryCursor> qryCursors = new ArrayList<>();
+
+        final int nodesCnt = 3;
+
+        startGridsMultiThreaded(nodesCnt);
+
+        awaitPartitionMapExchange();
+
+        CacheConfiguration ccfg = cacheConfiguration(new 
NodeFilterByRegexp(".*(0|1)$"));
+
+        grid(0).createCache(ccfg);
+
+        final AtomicInteger cntr = new AtomicInteger();
+
+        final ConcurrentMap<ClusterNode, Set<Integer>> maps = new 
ConcurrentHashMap<>();
+
+        final AtomicBoolean doubleNtfFail = new AtomicBoolean(false);
+
+        CacheEntryUpdatedListener<Integer, Integer> lsnr = new 
CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts)
+                throws CacheEntryListenerException {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : 
evts) {
+                    cntr.incrementAndGet();
+
+                    ClusterNode node = 
((Ignite)e.getSource().unwrap(Ignite.class)).cluster().localNode();
+
+                    Set<Integer> set = maps.get(node);
+
+                    if (set == null) {
+                        set = new ConcurrentSkipListSet<>();
+
+                        Set<Integer> oldVal = maps.putIfAbsent(node, set);
+
+                        set = oldVal != null ? oldVal : set;
+                    }
+
+                    if (!set.add(e.getValue()))
+                        doubleNtfFail.set(false);
+                }
+            }
+        };
+
+        for (int i = 0; i < nodesCnt; i++) {
+            ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+            qry.setLocalListener(lsnr);
+
+            Ignite ignite = grid(i);
+
+            log.info("Try to start CQ on node: " + 
ignite.cluster().localNode().id());
+
+            qryCursors.add(ignite.cache(ccfg.getName()).query(qry));
+
+            log.info("CQ started on node: " + 
ignite.cluster().localNode().id());
+        }
+
+        client = true;
+
+        startGrid(nodesCnt);
+
+        awaitPartitionMapExchange();
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        qryCursors.add(grid(nodesCnt).cache(ccfg.getName()).query(qry));
+
+        for (int i = 0; i <= nodesCnt; i++) {
+            for (int key = 0; key < KEYS; key++) {
+                int val = (i * KEYS) + key;
+
+                grid(i).cache(ccfg.getName()).put(val, val);
+            }
+        }
+
+        assertTrue(GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return cntr.get() >= 2 * (nodesCnt + 1) * KEYS;
+            }
+        }, 5000L));
+
+        assertFalse("Got duplicate", doubleNtfFail.get());
+
+        for (int i = 0; i < (nodesCnt + 1) * KEYS; i++) {
+            for (Map.Entry<ClusterNode, Set<Integer>> e : maps.entrySet())
+                assertTrue("Lost event on node: " + e.getKey().id() + ", 
event: " + i, e.getValue().remove(i));
+        }
+
+        for (Map.Entry<ClusterNode, Set<Integer>> e : maps.entrySet())
+            assertTrue("Unexpected event on node: " + e.getKey(), 
e.getValue().isEmpty());
+
+        assertEquals("Not expected count of CQ", nodesCnt + 1, 
qryCursors.size());
+
+        for (QueryCursor cur : qryCursors)
+            cur.close();
+    }
+
     /** */
     private Ignite startGrid(final int idx, boolean isClientMode) throws 
Exception {
         String gridName = getTestGridName(idx);
@@ -179,6 +302,28 @@ public class 
GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
         return node;
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @param filter Node filter.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(NodeFilterByRegexp filter) {
+        return new CacheConfiguration("test-cache-cq")
+            .setBackups(1)
+            .setNodeFilter(filter)
+            .setAtomicityMode(ATOMIC)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setCacheMode(PARTITIONED);
+    }
+
     /** */
     private final static class ListenerConfiguration extends 
MutableCacheEntryListenerConfiguration {
         /** Operation. */
@@ -275,4 +420,20 @@ public class 
GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
             return ((Integer)clusterNode.attributes().get("idx") % 2) == idx % 
2;
         }
     }
+
+    /** */
+    private final static class NodeFilterByRegexp implements 
IgnitePredicate<ClusterNode> {
+        /** */
+        private final Pattern pattern;
+
+        /** */
+        private NodeFilterByRegexp(String regExp) {
+            this.pattern = Pattern.compile(regExp);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode clusterNode) {
+            return pattern.matcher(clusterNode.id().toString()).matches();
+        }
+    }
 }

Reply via email to