Repository: ignite Updated Branches: refs/heads/master a8713f6d5 -> 0193012b2
IGNITE-9009 Do not notify local CQ listeners on partition reassignment - Fixes #4403. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0193012b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0193012b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0193012b Branch: refs/heads/master Commit: 0193012b27e73f269b33e10258eac7df89d54b68 Parents: a8713f6 Author: Denis Mekhanikov <dmekhani...@gmail.com> Authored: Tue Oct 30 19:08:58 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Tue Oct 30 19:08:58 2018 +0300 ---------------------------------------------------------------------- .../ignite/cache/query/ContinuousQuery.java | 11 +- .../continuous/CacheContinuousQueryHandler.java | 16 +- .../CacheContinuousQueryListener.java | 8 +- .../continuous/CacheContinuousQueryManager.java | 10 +- .../ContinuousQueryReassignmentTest.java | 170 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite3.java | 2 + 6 files changed, 205 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0193012b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index e4d6d0a..0d1444b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -213,7 +213,16 @@ public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> { return (ContinuousQuery<K, V>)super.setPageSize(pageSize); } - /** {@inheritDoc} */ + /** + * Sets whether this query should be executed on local node only. + * + * Note: backup event queues are not kept for local continuous queries. It may lead to loss of notifications in case + * of node failures. Use {@link ContinuousQuery#setRemoteFilterFactory(Factory)} to register cache event listeners + * on all cache nodes, if delivery guarantee is required. + * + * @param loc Local flag. + * @return {@code this} for chaining. + */ @Override public ContinuousQuery<K, V> setLocal(boolean loc) { return (ContinuousQuery<K, V>)super.setLocal(loc); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0193012b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index d1640c6..ade360a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -144,7 +144,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private transient boolean skipPrimaryCheck; /** */ - private boolean locCache; + private transient boolean locOnly; /** */ private boolean keepBinary; @@ -247,10 +247,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** - * @param locCache Local cache. + * @param locOnly Local only. */ - public void localCache(boolean locCache) { - this.locCache = locCache; + public void localOnly(boolean locOnly) { + this.locOnly = locOnly; } /** @@ -514,7 +514,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler skipCtx = new CounterSkipContext(part, cntr, topVer); if (loc) { - assert !locCache; + assert !locOnly; final Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, skipCtx.entry()); @@ -583,6 +583,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private String taskName() { return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; } + + @Override public boolean isPrimaryOnly() { + return locOnly && !skipPrimaryCheck; + } }; CacheContinuousQueryManager mgr = manager(ctx); @@ -860,7 +864,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans = getTransformer(); if (loc) { - if (!locCache) { + if (!locOnly) { Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry); notifyLocalListener(evts, trans); http://git-wip-us.apache.org/repos/asf/ignite/blob/0193012b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 7da657f..029b6d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.Map; +import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -114,4 +115,9 @@ public interface CacheContinuousQueryListener<K, V> { * @return Whether to notify on existing entries. */ public boolean notifyExisting(); -} \ No newline at end of file + + /** + * @return {@code True} if this listener should be called on events on primary partitions only. + */ + public boolean isPrimaryOnly(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0193012b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index c41e1a3..6bd3fc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -385,7 +385,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean recordIgniteEvt = primary && !internal && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { - if (preload && !lsnr.notifyExisting()) + if (preload && !lsnr.notifyExisting() || lsnr.isPrimaryOnly() && !primary) continue; if (!initialized) { @@ -722,12 +722,14 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { final CacheContinuousQueryHandler hnd = clsr.apply(); + boolean locOnly = cctx.isLocal() || loc; + hnd.taskNameHash(taskNameHash); hnd.skipPrimaryCheck(skipPrimaryCheck); hnd.notifyExisting(notifyExisting); hnd.internal(internal); hnd.keepBinary(keepBinary); - hnd.localCache(cctx.isLocal()); + hnd.localOnly(locOnly); IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? F.nodeForNodeId(cctx.localNodeId()) : cctx.group().nodeFilter(); @@ -739,13 +741,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { try { id = cctx.kernalContext().continuous().startRoutine( hnd, - internal && loc, + locOnly, bufSize, timeInterval, autoUnsubscribe, pred).get(); - if (hnd.isQuery() && cctx.userCache() && !onStart) + if (hnd.isQuery() && cctx.userCache() && !locOnly && !onStart) hnd.waitTopologyFuture(cctx.kernalContext()); } catch (NodeStoppingException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0193012b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryReassignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryReassignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryReassignmentTest.java new file mode 100644 index 0000000..2a537ca --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryReassignmentTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** + * + */ +public class ContinuousQueryReassignmentTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration igniteCfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)igniteCfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + return igniteCfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override public boolean isDebug() { + return true; + } + + /** + * + * @throws Exception If failed. + */ + public void testContinuousQueryNotCalledOnReassignment() throws Exception { + testContinuousQueryNotCalledOnReassignment(false); + } + + /** + * @throws Exception if failed. + */ + public void testLocalContinuousQueryNotCalledOnReassignment() throws Exception { + testContinuousQueryNotCalledOnReassignment(true); + } + + /** + * @param loc If {@code true}, then local continuous query will be tested. + * @throws Exception If failed. + */ + private void testContinuousQueryNotCalledOnReassignment(boolean loc) throws Exception { + Ignite lsnrNode = startGrid(1); + Ignite victim = startGrid(2); + + awaitPartitionMapExchange(); + + CacheConfiguration<Integer, String> cacheCfg = new CacheConfiguration<>("cache"); + cacheCfg.setBackups(1); + IgniteCache<Integer, String> cache = lsnrNode.getOrCreateCache(cacheCfg); + + AtomicInteger updCntr = new AtomicInteger(); + + listenToUpdates(cache, loc, updCntr, null); + + // Subscribe on all nodes to receive all updates. + if (loc) + listenToUpdates(victim.cache("cache"), true, updCntr, null); + + int updates = 1000; + + for (int i = 0; i < updates; i++) + cache.put(i, Integer.toString(i)); + + assertTrue( + "Failed to wait for continuous query updates. Exp: " + updates + "; actual: " + updCntr.get(), + waitForCondition(() -> updCntr.get() == updates, 10000)); + + victim.close(); + + assertFalse("Continuous query is called on reassignment.", + waitForCondition(() -> updCntr.get() > updates, 2000)); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousQueryWithRemoteFilterNotCalledOnReassignment() throws Exception { + Ignite lsnrNode = startGrid(1); + Ignite victim = startGrid(2); + + awaitPartitionMapExchange(); + + CacheConfiguration<Integer, String> cacheCfg = new CacheConfiguration<>("cache"); + cacheCfg.setBackups(1); + IgniteCache<Integer, String> cache = lsnrNode.getOrCreateCache(cacheCfg); + + AtomicInteger updCntr = new AtomicInteger(); + + CacheEntryEventSerializableFilter<Integer, String> filter = (e) -> e.getKey() % 2 == 0; + + listenToUpdates(cache, false, updCntr, filter); + + int updates = 1000; + + for (int i = 0; i < updates; i++) + cache.put(i, Integer.toString(i)); + + assertTrue( + "Failed to wait for continuous query updates. Exp: " + updates + "; actual: " + updCntr.get(), + waitForCondition(() -> updCntr.get() == updates / 2, 10000)); + + victim.close(); + + assertFalse("Continuous query is called on reassignment.", + waitForCondition(() -> updCntr.get() > updates / 2, 2000)); + } + + /** + * Register a continuous query, that counts updates on the provided cache. + * + * @param cache Cache. + * @param loc If {@code true}, then local continuous query will be registered. + * @param updCntr Update counter. + * @param rmtFilter Remote filter. + */ + private void listenToUpdates(IgniteCache<Integer, String> cache, boolean loc, AtomicInteger updCntr, + CacheEntryEventSerializableFilter<Integer, String> rmtFilter) { + + ContinuousQuery<Integer, String> cq = new ContinuousQuery<>(); + cq.setLocal(loc); + cq.setLocalListener((evts) -> { + for (CacheEntryEvent e : evts) + updCntr.incrementAndGet(); + }); + if (rmtFilter != null) + cq.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFilter)); + + cache.query(cq); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0193012b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index f6b8a08..b16008b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin import org.apache.ignite.internal.processors.cache.query.continuous.ClientReconnectContinuousQueryTest; import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryMarshallerTest; import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryPeerClassLoadingTest; +import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryReassignmentTest; import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest; @@ -137,6 +138,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(ClientReconnectContinuousQueryTest.class); suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class); suite.addTestSuite(ContinuousQueryMarshallerTest.class); + suite.addTestSuite(ContinuousQueryReassignmentTest.class); suite.addTestSuite(CacheContinuousQueryConcurrentPartitionUpdateTest.class); suite.addTestSuite(CacheContinuousQueryEventBufferTest.class);