Repository: ignite Updated Branches: refs/heads/ignite-comm-balance 928c805dc -> 2b25194c2
IGNITE-3907 Fixed "Incorrect initialization CQ when node filter configured for cache" Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ebf354c5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ebf354c5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ebf354c5 Branch: refs/heads/ignite-comm-balance Commit: ebf354c568d0802b7eed1cc6b9d251941dbce014 Parents: 2474e2b Author: nikolay_tikhonov <[email protected]> Authored: Fri Sep 16 14:32:13 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Sep 16 14:32:13 2016 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 5 - .../internal/GridMessageListenHandler.java | 5 - .../continuous/CacheContinuousQueryHandler.java | 5 - .../continuous/GridContinuousHandler.java | 8 - .../continuous/GridContinuousProcessor.java | 33 ++-- ...eContinuousQueryMultiNodesFilteringTest.java | 161 +++++++++++++++++++ 6 files changed, 170 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index b4b1e58..ed6998d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -262,11 +262,6 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - // No-op. - } - - /** {@inheritDoc} */ @Override public void unregister(UUID routineId, GridKernalContext ctx) { assert routineId != null; assert ctx != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 2b8041d..1bca85c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -139,11 +139,6 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - // No-op. - } - - /** {@inheritDoc} */ @Override public void unregister(UUID routineId, GridKernalContext ctx) { ctx.io().removeUserMessageListener(topic, pred); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 7b3b47b..a5752ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -564,11 +564,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** {@inheritDoc} */ - @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - // No-op. - } - - /** {@inheritDoc} */ @Override public void unregister(UUID routineId, GridKernalContext ctx) { assert routineId != null; assert ctx != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index c90746d..f14b450 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -57,14 +57,6 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { public RegisterStatus register(UUID nodeId, UUID routineId, GridKernalContext ctx) throws IgniteCheckedException; /** - * Callback called after listener is registered and acknowledgement is sent. - * - * @param routineId Routine ID. - * @param ctx Kernal context. - */ - public void onListenerRegistered(UUID routineId, GridKernalContext ctx); - - /** * Unregisters listener. * * @param routineId Routine ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 5f61051..ad7ad4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -478,11 +478,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Register handler only if local node passes projection predicate. if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) && - !locInfos.containsKey(item.routineId)) { - if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, - item.autoUnsubscribe, false)) - item.hnd.onListenerRegistered(item.routineId, ctx); - } + !locInfos.containsKey(item.routineId)) + registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, + item.autoUnsubscribe, false); if (!item.autoUnsubscribe) // Register routine locally. @@ -509,14 +507,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.resource().injectGeneric(info.prjPred); if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) { - if (registerHandler(clientNodeId, + registerHandler(clientNodeId, routineId, info.hnd, info.bufSize, info.interval, info.autoUnsubscribe, - false)) - info.hnd.onListenerRegistered(routineId, ctx); + false); } } catch (IgniteCheckedException err) { @@ -555,9 +552,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridContinuousHandler.RegisterStatus status = hnd.register(rmtInfo.nodeId, routineId, this.ctx); assert status != GridContinuousHandler.RegisterStatus.DELAYED; - - if (status == GridContinuousHandler.RegisterStatus.REGISTERED) - hnd.onListenerRegistered(routineId, this.ctx); } } } @@ -649,8 +643,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true); - hnd.onListenerRegistered(routineId, ctx); - return new GridFinishedFuture<>(routineId); } catch (IgniteCheckedException e) { @@ -700,9 +692,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { startFuts.put(routineId, fut); try { - if (locIncluded - && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true)) - hnd.onListenerRegistered(routineId, ctx); + if (locIncluded || hnd.isQuery()) + registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true); ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData, reqData.handler().keepBinary())); @@ -1020,8 +1011,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { data.autoUnsubscribe())); } - boolean registered = false; - if (err == null) { try { IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate(); @@ -1030,10 +1019,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.resource().injectGeneric(prjPred); if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) && - !locInfos.containsKey(routineId)) { - registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(), + !locInfos.containsKey(routineId)) + registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe(), false); - } if (!data.autoUnsubscribe()) // Register routine locally. @@ -1061,9 +1049,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (err != null) req.addError(ctx.localNodeId(), err); - - if (registered) - hnd0.onListenerRegistered(routineId, ctx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf354c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java index 7000446..cf0c0d9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java @@ -17,9 +17,17 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import javax.cache.configuration.Factory; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; import javax.cache.event.CacheEntryCreatedListener; @@ -33,9 +41,12 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -45,8 +56,10 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** */ @SuppressWarnings("unchecked") @@ -57,13 +70,21 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA /** */ private static final int SERVER_GRIDS_COUNT = 6; + /** */ + public static final int KEYS = 2_000; + /** Cache entry operations' counts. */ private static final ConcurrentMap<String, AtomicInteger> opCounts = new ConcurrentHashMap8<>(); + /** Client. */ + private static boolean client = false; + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); + client = false; + super.afterTest(); } @@ -122,6 +143,108 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA } } + /** + * @throws Exception If failed. + */ + public void testWithNodeFilter() throws Exception { + List<QueryCursor> qryCursors = new ArrayList<>(); + + final int nodesCnt = 3; + + startGridsMultiThreaded(nodesCnt); + + awaitPartitionMapExchange(); + + CacheConfiguration ccfg = cacheConfiguration(new NodeFilterByRegexp(".*(0|1)$")); + + grid(0).createCache(ccfg); + + final AtomicInteger cntr = new AtomicInteger(); + + final ConcurrentMap<ClusterNode, Set<Integer>> maps = new ConcurrentHashMap<>(); + + final AtomicBoolean doubleNtfFail = new AtomicBoolean(false); + + CacheEntryUpdatedListener<Integer, Integer> lsnr = new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { + cntr.incrementAndGet(); + + ClusterNode node = ((Ignite)e.getSource().unwrap(Ignite.class)).cluster().localNode(); + + Set<Integer> set = maps.get(node); + + if (set == null) { + set = new ConcurrentSkipListSet<>(); + + Set<Integer> oldVal = maps.putIfAbsent(node, set); + + set = oldVal != null ? oldVal : set; + } + + if (!set.add(e.getValue())) + doubleNtfFail.set(false); + } + } + }; + + for (int i = 0; i < nodesCnt; i++) { + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + Ignite ignite = grid(i); + + log.info("Try to start CQ on node: " + ignite.cluster().localNode().id()); + + qryCursors.add(ignite.cache(ccfg.getName()).query(qry)); + + log.info("CQ started on node: " + ignite.cluster().localNode().id()); + } + + client = true; + + startGrid(nodesCnt); + + awaitPartitionMapExchange(); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + qryCursors.add(grid(nodesCnt).cache(ccfg.getName()).query(qry)); + + for (int i = 0; i <= nodesCnt; i++) { + for (int key = 0; key < KEYS; key++) { + int val = (i * KEYS) + key; + + grid(i).cache(ccfg.getName()).put(val, val); + } + } + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cntr.get() >= 2 * (nodesCnt + 1) * KEYS; + } + }, 5000L)); + + assertFalse("Got duplicate", doubleNtfFail.get()); + + for (int i = 0; i < (nodesCnt + 1) * KEYS; i++) { + for (Map.Entry<ClusterNode, Set<Integer>> e : maps.entrySet()) + assertTrue("Lost event on node: " + e.getKey().id() + ", event: " + i, e.getValue().remove(i)); + } + + for (Map.Entry<ClusterNode, Set<Integer>> e : maps.entrySet()) + assertTrue("Unexpected event on node: " + e.getKey(), e.getValue().isEmpty()); + + assertEquals("Not expected count of CQ", nodesCnt + 1, qryCursors.size()); + + for (QueryCursor cur : qryCursors) + cur.close(); + } + /** */ private Ignite startGrid(final int idx, boolean isClientMode) throws Exception { String gridName = getTestGridName(idx); @@ -179,6 +302,28 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA return node; } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param filter Node filter. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(NodeFilterByRegexp filter) { + return new CacheConfiguration("test-cache-cq") + .setBackups(1) + .setNodeFilter(filter) + .setAtomicityMode(ATOMIC) + .setWriteSynchronizationMode(FULL_SYNC) + .setCacheMode(PARTITIONED); + } + /** */ private final static class ListenerConfiguration extends MutableCacheEntryListenerConfiguration { /** Operation. */ @@ -275,4 +420,20 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA return ((Integer)clusterNode.attributes().get("idx") % 2) == idx % 2; } } + + /** */ + private final static class NodeFilterByRegexp implements IgnitePredicate<ClusterNode> { + /** */ + private final Pattern pattern; + + /** */ + private NodeFilterByRegexp(String regExp) { + this.pattern = Pattern.compile(regExp); + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + return pattern.matcher(clusterNode.id().toString()).matches(); + } + } }
