ignite-3413 Use cache node filter for continuous query registration
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89d64e74 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89d64e74 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89d64e74 Branch: refs/heads/ignite-3414 Commit: 89d64e74b697054a88c3a91433aaaf4f7fdd0284 Parents: a056954 Author: sboikov <[email protected]> Authored: Wed Jul 13 12:41:18 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 13 12:41:18 2016 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryManager.java | 4 +- ...eContinuousQueryMultiNodesFilteringTest.java | 278 +++++++++++++++++++ ...dCacheContinuousQueryNodesFilteringTest.java | 168 +++++++++++ .../IgniteCacheQuerySelfTestSuite3.java | 4 + 4 files changed, 453 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/89d64e74/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index c966527..195f3ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -642,7 +642,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { hnd.localCache(cctx.isLocal()); IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? - F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue(); + F.nodeForNodeId(cctx.localNodeId()) : cctx.config().getNodeFilter(); + + assert pred != null : cctx.config(); UUID id = cctx.kernalContext().continuous().startRoutine( hnd, http://git-wip-us.apache.org/repos/asf/ignite/blob/89d64e74/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java new file mode 100644 index 0000000..7000446 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.Collections; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.Factory; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListener; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryRemovedListener; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** */ +@SuppressWarnings("unchecked") +public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SERVER_GRIDS_COUNT = 6; + + /** Cache entry operations' counts. */ + private static final ConcurrentMap<String, AtomicInteger> opCounts = new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** */ + public void testFiltersAndListeners() throws Exception { + for (int i = 1; i <= SERVER_GRIDS_COUNT; i++) + startGrid(i, false); + + startGrid(SERVER_GRIDS_COUNT + 1, true); + + for (int i = 1; i <= SERVER_GRIDS_COUNT + 1; i++) { + for (int j = 0; j < i; j++) { + jcache(i, "part" + i).put("k" + j, "v0"); + jcache(i, "repl" + i).put("k" + j, "v0"); + + // Should trigger updates + jcache(i, "part" + i).put("k" + j, "v1"); + jcache(i, "repl" + i).put("k" + j, "v1"); + + jcache(i, "part" + i).remove("k" + j); + jcache(i, "repl" + i).remove("k" + j); + } + } + + for (int i = 1; i <= SERVER_GRIDS_COUNT + 1; i++) { + // For each i, we did 3 ops on 2 caches on i keys, hence expected number. + final int expTotal = i * 3 * 2; + final int i0 = i; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return opCounts.get("qry" + i0 + "_total").get() == expTotal; + } + }, 5000); + + int partInserts = opCounts.get("part" + i + "_ins").get(); + int replInserts = opCounts.get("repl" + i + "_ins").get(); + int partUpdates = opCounts.get("part" + i + "_upd").get(); + int replUpdates = opCounts.get("repl" + i + "_upd").get(); + int partRemoves = opCounts.get("part" + i + "_rmv").get(); + int replRemoves = opCounts.get("repl" + i + "_rmv").get(); + int totalQryOps = opCounts.get("qry" + i + "_total").get(); + + assertEquals(i, partInserts); + assertEquals(i, replInserts); + + assertEquals(i, partUpdates); + assertEquals(i, replUpdates); + + assertEquals(i, partRemoves); + assertEquals(i, replRemoves); + + assertEquals(expTotal, totalQryOps); + + assertEquals(totalQryOps, partInserts + replInserts + partUpdates + replUpdates + partRemoves + replRemoves); + } + } + + /** */ + private Ignite startGrid(final int idx, boolean isClientMode) throws Exception { + String gridName = getTestGridName(idx); + + IgniteConfiguration cfg = optimize(getConfiguration(gridName)).setClientMode(isClientMode); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setUserAttributes(Collections.singletonMap("idx", idx)); + + Ignite node = startGrid(gridName, cfg); + + IgnitePredicate<ClusterNode> nodeFilter = new NodeFilter(idx); + + String partCacheName = "part" + idx; + + IgniteCache partCache = node.createCache(defaultCacheConfiguration().setName("part" + idx) + .setCacheMode(PARTITIONED).setBackups(1).setNodeFilter(nodeFilter)); + + opCounts.put(partCacheName + "_ins", new AtomicInteger()); + opCounts.put(partCacheName + "_upd", new AtomicInteger()); + opCounts.put(partCacheName + "_rmv", new AtomicInteger()); + + partCache.registerCacheEntryListener(new ListenerConfiguration(partCacheName, ListenerConfiguration.Op.INSERT)); + partCache.registerCacheEntryListener(new ListenerConfiguration(partCacheName, ListenerConfiguration.Op.UPDATE)); + partCache.registerCacheEntryListener(new ListenerConfiguration(partCacheName, ListenerConfiguration.Op.REMOVE)); + + String replCacheName = "repl" + idx; + + IgniteCache replCache = node.createCache(defaultCacheConfiguration().setName("repl" + idx) + .setCacheMode(REPLICATED).setNodeFilter(nodeFilter)); + + opCounts.put(replCacheName + "_ins", new AtomicInteger()); + opCounts.put(replCacheName + "_upd", new AtomicInteger()); + opCounts.put(replCacheName + "_rmv", new AtomicInteger()); + + replCache.registerCacheEntryListener(new ListenerConfiguration(replCacheName, ListenerConfiguration.Op.INSERT)); + replCache.registerCacheEntryListener(new ListenerConfiguration(replCacheName, ListenerConfiguration.Op.UPDATE)); + replCache.registerCacheEntryListener(new ListenerConfiguration(replCacheName, ListenerConfiguration.Op.REMOVE)); + + opCounts.put("qry" + idx + "_total", new AtomicInteger()); + + ContinuousQuery qry = new ContinuousQuery(); + qry.setRemoteFilterFactory(new EntryEventFilterFactory(idx)); + qry.setLocalListener(new CacheEntryUpdatedListener() { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable evts) { + opCounts.get("qry" + idx + "_total").incrementAndGet(); + } + }); + + partCache.query(qry); + replCache.query(qry); + + return node; + } + + /** */ + private final static class ListenerConfiguration extends MutableCacheEntryListenerConfiguration { + /** Operation. */ + enum Op { + /** Insert. */ + INSERT, + + /** Update. */ + UPDATE, + + /** Remove. */ + REMOVE + } + + /** */ + ListenerConfiguration(final String cacheName, final Op op) { + super(new Factory<CacheEntryListener>() { + /** {@inheritDoc} */ + @Override public CacheEntryListener create() { + switch (op) { + case INSERT: + return new CacheEntryCreatedListener() { + /** {@inheritDoc} */ + @Override public void onCreated(Iterable iterable) { + for (Object evt : iterable) + opCounts.get(cacheName + "_ins").getAndIncrement(); + } + }; + case UPDATE: + return new CacheEntryUpdatedListener() { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable iterable) { + for (Object evt : iterable) + opCounts.get(cacheName + "_upd").getAndIncrement(); + } + }; + case REMOVE: + return new CacheEntryRemovedListener() { + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable iterable) { + for (Object evt : iterable) + opCounts.get(cacheName + "_rmv").getAndIncrement(); + } + }; + default: + throw new IgniteException(new IllegalArgumentException()); + } + } + }, null, true, false); + } + } + + /** */ + private final static class EntryEventFilterFactory implements Factory<CacheEntryEventFilter> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** Grid index to determine whether node filter has been invoked. */ + private final int idx; + + /** */ + private EntryEventFilterFactory(int idx) { + this.idx = idx; + } + + /** {@inheritDoc} */ + @Override public CacheEntryEventFilter create() { + return new CacheEntryEventFilter() { + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException { + int evtNodeIdx = (Integer)(ignite.cluster().localNode().attributes().get("idx")); + + assertTrue(evtNodeIdx % 2 == idx % 2); + + return true; + } + }; + } + } + + /** */ + private final static class NodeFilter implements IgnitePredicate<ClusterNode> { + /** */ + private final int idx; + + /** */ + private NodeFilter(int idx) { + this.idx = idx; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + return ((Integer)clusterNode.attributes().get("idx") % 2) == idx % 2; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89d64e74/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java new file mode 100644 index 0000000..dccde65 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.Collections; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridStringLogger; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** */ +@SuppressWarnings("unused") +public class GridCacheContinuousQueryNodesFilteringTest extends GridCommonAbstractTest implements Serializable { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String ENTRY_FILTER_CLS_NAME = "org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilter"; + + /** + * Tests that node not matched by filter really does not try to participate in the query. + * + * @throws Exception if failed. + */ + @SuppressWarnings("EmptyTryBlock") + public void testNodeWithoutAttributeExclusion() throws Exception { + try (Ignite node1 = startNodeWithCache()) { + try (Ignite node2 = startGrid("node2", getConfiguration("node2", false, null))) { + assertEquals(2, node2.cluster().nodes().size()); + } + } + } + + /** + * Test that node matched by filter and having filter instantiation problems fails for sure. + * + * @throws Exception if failed. + */ + public void testNodeWithAttributeFailure() throws Exception { + try (Ignite node1 = startNodeWithCache()) { + GridStringLogger log = new GridStringLogger(); + + try (Ignite node2 = startGrid("node2", getConfiguration("node2", true, log))) { + fail(); + } + catch (IgniteException e) { + assertTrue(log.toString().contains("Class not found for continuous query remote filter " + + "[name=org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilter]")); + } + } + } + + /** + * Start first, attribute-bearing, node, create new cache and launch continuous query on it. + * + * @return Node. + * @throws Exception if failed. + */ + private Ignite startNodeWithCache() throws Exception { + Ignite node1 = startGrid("node1", getConfiguration("node1", true, null)); + + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + ccfg.setName("attrsTestCache"); + ccfg.setNodeFilter(new IgnitePredicate<ClusterNode>() { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return "data".equals(node.attribute("node-type")); + } + }); + + IgniteCache<Integer, Integer> cache = node1.createCache(ccfg); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + qry.setRemoteFilterFactory(new RemoteFilterFactory()); + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + // No-op. + } + }); + + RemoteFilterFactory.clsLdr = getExternalClassLoader(); + + cache.query(qry); + + // Switch class loader before starting the second node. + RemoteFilterFactory.clsLdr = getClass().getClassLoader(); + + return node1; + } + + /** + * @param name Node name. + * @param setAttr Flag indicating whether node user attribute should be set. + * @param log Logger. + * @return Node configuration w/specified name. + * @throws Exception If failed. + */ + private IgniteConfiguration getConfiguration(String name, boolean setAttr, GridStringLogger log) throws Exception { + IgniteConfiguration cfg = optimize(getConfiguration(name)); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + if (setAttr) + cfg.setUserAttributes(Collections.singletonMap("node-type", "data")); + + cfg.setGridLogger(log); + + return cfg; + } + + /** + * + */ + private static class RemoteFilterFactory implements Factory<CacheEntryEventFilter<Integer, Integer>> { + /** */ + private static ClassLoader clsLdr; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheEntryEventFilter<Integer, Integer> create() { + try { + Class<?> filterCls = clsLdr.loadClass(ENTRY_FILTER_CLS_NAME); + + assert CacheEntryEventFilter.class.isAssignableFrom(filterCls); + + return ((Class<CacheEntryEventFilter>)filterCls).newInstance(); + } + catch (ClassNotFoundException e) { + throw new IgniteException("Class not found for continuous query remote filter [name=" + + e.getMessage() + "]"); + } + catch (Exception e) { // We really don't expect anything else fancy here. + throw new AssertionError("Unexpected exception", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89d64e74/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index a1a32a1..abf4ac3 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -43,6 +43,8 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryConcurrentTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest; +import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryNodesFilteringTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionAtomicOneNodeTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionTxOneNodeTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedOnlySelfTest; @@ -115,6 +117,8 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class); suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class); suite.addTestSuite(ContinuousQueryRemoteFilterMissingInClassPathSelfTest.class); + suite.addTestSuite(GridCacheContinuousQueryNodesFilteringTest.class); + suite.addTestSuite(GridCacheContinuousQueryMultiNodesFilteringTest.class); suite.addTestSuite(IgniteCacheContinuousQueryImmutableEntryTest.class); suite.addTestSuite(CacheKeepBinaryIterationTest.class); suite.addTestSuite(CacheKeepBinaryIterationStoreEnabledTest.class);
