Repository: ignite Updated Branches: refs/heads/ignite-5075 9585dd326 -> ef1105d12
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ef1105d1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ef1105d1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ef1105d1 Branch: refs/heads/ignite-5075 Commit: ef1105d122eb43a89630b79da4b8921b0a4bdccc Parents: 9585dd3 Author: sboikov <[email protected]> Authored: Fri May 26 15:28:33 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri May 26 16:23:14 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 2 +- .../cache/CacheGroupInfrastructure.java | 123 +++++++++++++++++-- .../processors/cache/ClusterCachesInfo.java | 6 +- .../processors/cache/GridCacheContext.java | 8 -- .../processors/cache/GridCacheProcessor.java | 7 +- .../continuous/CacheContinuousQueryManager.java | 31 +++-- .../processors/cache/IgniteCacheGroupsTest.java | 98 ++++++++++++++- 7 files changed, 239 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ef1105d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index c3311a8..d3be472 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -379,7 +379,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } else { startCache = cctx.cacheContext(cacheDesc.cacheId()) == null && - CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter()); + CU.affinityNode(cctx.localNode(), cacheDesc.groupDescriptor().config().getNodeFilter()); } if (startCache) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ef1105d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index aed96e4..11efd77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -103,6 +104,9 @@ public class CacheGroupInfrastructure { private volatile List<GridCacheContext> caches; /** */ + private volatile List<GridCacheContext> contQryCaches; + + /** */ private final IgniteLogger log; /** */ @@ -129,6 +133,12 @@ public class CacheGroupInfrastructure { /** ReuseList instance this group is associated with */ private final ReuseList reuseList; + /** */ + private boolean drEnabled; + + /** */ + private boolean qryEnabled; + /** * @param grpId Group ID. * @param ctx Context. @@ -263,8 +273,14 @@ public class CacheGroupInfrastructure { assert add : cctx.name(); + if (!qryEnabled && QueryUtils.isEnabled(cctx.config())) + qryEnabled = true; + + if (!drEnabled && cctx.isDrEnabled()) + drEnabled = true; + this.caches = caches; - } + } /** * @param cctx Cache context. @@ -280,9 +296,39 @@ public class CacheGroupInfrastructure { assert sharedGroup() || caches.size() == 1 : caches.size(); it.remove(); + + break; } } + if (QueryUtils.isEnabled(cctx.config())) { + boolean qryEnabled = false; + + for (int i = 0; i < caches.size(); i++) { + if (QueryUtils.isEnabled(caches.get(i).config())) { + qryEnabled = true; + + break; + } + } + + this.qryEnabled = qryEnabled; + } + + if (cctx.isDrEnabled()) { + boolean drEnabled = false; + + for (int i = 0; i < caches.size(); i++) { + if (caches.get(i).isDrEnabled()) { + drEnabled = true; + + break; + } + } + + this.drEnabled = drEnabled; + } + this.caches = caches; } @@ -423,19 +469,25 @@ public class CacheGroupInfrastructure { } } - // TODO IGNITE-5075 + /** + * @return {@code True} if contains cache with query indexing enabled. + */ public boolean queriesEnabled() { - return QueryUtils.isEnabled(ccfg); + return qryEnabled; } - // TODO IGNITE-5075 see GridCacheContext#allowFastEviction + /** + * @return {@code True} if fast eviction is allowed. + */ public boolean allowFastEviction() { - return false; + return ctx.database().persistenceEnabled() && !queriesEnabled(); } - // TODO IGNITE-5075. + /** + * @return {@code True} in case replication is enabled. + */ public boolean isDrEnabled() { - return false; + return drEnabled; } /** @@ -533,6 +585,13 @@ public class CacheGroupInfrastructure { } /** + * @return Cache node filter. + */ + public IgnitePredicate<ClusterNode> nodeFilter() { + return ccfg.getNodeFilter(); + } + + /** * @return Configured user objects which should be initialized/stopped on group start/stop. */ Collection<?> configuredUserObjects() { @@ -668,6 +727,47 @@ public class CacheGroupInfrastructure { } /** + * @param cctx Cache context. + */ + public void addCacheWithContinuousQuery(GridCacheContext cctx) { + assert sharedGroup() : cacheOrGroupName(); + assert cctx.group() == this : cctx.name(); + + synchronized (this) { + List<GridCacheContext> contQryCaches = this.contQryCaches; + + if (contQryCaches == null) + contQryCaches = new ArrayList<>(); + + contQryCaches.add(cctx); + + this.contQryCaches = contQryCaches; + } + } + + /** + * @param cctx Cache context. + */ + public void removeCacheWithContinuousQuery(GridCacheContext cctx) { + assert sharedGroup() : cacheOrGroupName(); + assert cctx.group() == this : cctx.name(); + + synchronized (this) { + List<GridCacheContext> contQryCaches = this.contQryCaches; + + if (contQryCaches == null) + return; + + contQryCaches.remove(cctx); + + if (contQryCaches.isEmpty()) + contQryCaches = null; + + this.contQryCaches = contQryCaches; + } + } + + /** * @param cacheId ID of cache initiated counter update. * @param part Partition number. * @param cntr Counter. @@ -682,12 +782,15 @@ public class CacheGroupInfrastructure { if (isLocal()) return; - List<GridCacheContext> caches = this.caches; + List<GridCacheContext> contQryCaches = this.contQryCaches; + + if (contQryCaches == null) + return; CounterSkipContext skipCtx = null; - for (int i = 0; i < caches.size(); i++) { - GridCacheContext cctx = caches.get(i); + for (int i = 0; i < contQryCaches.size(); i++) { + GridCacheContext cctx = contQryCaches.get(i); if (cacheId != cctx.cacheId()) skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/ef1105d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 370f07b..25f70da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -820,7 +820,9 @@ class ClusterCachesInfo { desc = desc0; } - if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter())) + if (locCfg != null || + joinDiscoData.startCaches() || + CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) locJoinStartCaches.add(new T2<>(desc, nearCfg)); } } @@ -1015,7 +1017,7 @@ class ClusterCachesInfo { assert old == null : old; - ctx.discovery().addCacheGroup(grpDesc, startedCacheCfg.getNodeFilter(), startedCacheCfg.getCacheMode()); + ctx.discovery().addCacheGroup(grpDesc, grpDesc.config().getNodeFilter(), startedCacheCfg.getCacheMode()); if (exchActions != null) exchActions.addCacheGroupToStart(grpDesc); http://git-wip-us.apache.org/repos/asf/ignite/blob/ef1105d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 3341d4c..aaa67ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -83,7 +83,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEnt import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.plugin.CachePluginManager; -import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.lang.GridFunc; @@ -2016,13 +2015,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return {@code True} if fast eviction is allowed. - */ - public boolean allowFastEviction() { - return shared().database().persistenceEnabled() && !QueryUtils.isEnabled(cacheCfg); - } - - /** * @param part Partition. * @param affNodes Affinity nodes. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/ef1105d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e28edfb..3a380a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -836,12 +836,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (DynamicCacheDescriptor desc : cacheDescriptors().values()) { CacheConfiguration c = desc.cacheConfiguration(); - IgnitePredicate filter = c.getNodeFilter(); + IgnitePredicate filter = desc.groupDescriptor().config().getNodeFilter(); if (c.getName().equals(conf.getName()) && ((desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter)) || CU.isSystemCache(c.getName()))) { - tmpCacheCfg.add(c); break; @@ -1820,7 +1819,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (started != null) { for (DynamicCacheDescriptor desc : started) { - IgnitePredicate<ClusterNode> filter = desc.cacheConfiguration().getNodeFilter(); + IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter(); if (CU.affinityNode(ctx.discovery().localNode(), filter)) { prepareCacheStart( @@ -1868,7 +1867,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ccfg.setNearConfiguration(null); } - else if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter())) + else if (CU.affinityNode(ctx.discovery().localNode(), grpDesc.config().getNodeFilter())) affNode = true; else { affNode = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/ef1105d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index fc39b6d..e1a5179 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -644,7 +644,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { hnd.localCache(cctx.isLocal()); IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? - F.nodeForNodeId(cctx.localNodeId()) : cctx.config().getNodeFilter(); + F.nodeForNodeId(cctx.localNodeId()) : cctx.group().nodeFilter(); assert pred != null : cctx.config(); @@ -820,16 +820,23 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { intLsnrCnt.incrementAndGet(); } else { - added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; + synchronized (this) { + added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; - if (added) { - lsnrCnt.incrementAndGet(); + if (added) { + int cnt = lsnrCnt.incrementAndGet(); - lsnr.onExecution(); + if (cctx.group().sharedGroup() && cnt == 1) + cctx.group().addCacheWithContinuousQuery(cctx); + } } + + if (added) + lsnr.onExecution(); } - return added ? GridContinuousHandler.RegisterStatus.REGISTERED : GridContinuousHandler.RegisterStatus.NOT_REGISTERED; + return added ? GridContinuousHandler.RegisterStatus.REGISTERED : + GridContinuousHandler.RegisterStatus.NOT_REGISTERED; } /** @@ -847,11 +854,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } } else { - if ((lsnr = lsnrs.remove(id)) != null) { - lsnrCnt.decrementAndGet(); + synchronized (this) { + if ((lsnr = lsnrs.remove(id)) != null) { + int cnt = lsnrCnt.decrementAndGet(); - lsnr.onUnregister(); + if (cctx.group().sharedGroup() && cnt == 0) + cctx.group().removeCacheWithContinuousQuery(cctx); + } } + + if (lsnr != null) + lsnr.onUnregister(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ef1105d1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 4529f77..1cc8999 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -35,11 +35,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import javax.cache.Cache; import javax.cache.CacheException; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -56,6 +57,7 @@ import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreAdapter; @@ -2419,6 +2421,98 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testContinuousQueriesMultipleGroups1() throws Exception { + continuousQueriesMultipleGroups(1); + } + + /** + * @throws Exception If failed. + */ + public void testContinuousQueriesMultipleGroups2() throws Exception { + continuousQueriesMultipleGroups(4); + } + + /** + * @param srvs Number of server nodes. + * @throws Exception If failed. + */ + private void continuousQueriesMultipleGroups(int srvs) throws Exception { + Ignite srv0 = startGrids(srvs); + + client = true; + + Ignite client = startGrid(srvs); + + client.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false)); + client.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL, 1, false)); + client.createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, ATOMIC, 1, false)); + + client.createCache(cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1, false)); + client.createCache(cacheConfiguration(GROUP2, "c5", PARTITIONED, ATOMIC, 1, false)); + client.createCache(cacheConfiguration(GROUP2, "c6", PARTITIONED, TRANSACTIONAL, 1, false)); + + client.createCache(cacheConfiguration(null, "c7", PARTITIONED, ATOMIC, 1, false)); + client.createCache(cacheConfiguration(null, "c8", PARTITIONED, TRANSACTIONAL, 1, false)); + + String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"}; + + AtomicInteger c1 = registerListener(client, "c1"); + + for (String cache : cacheNames) + srv0.cache(cache).put(1, 1); + + waitForEvents(c1, 1); + + for (String cache : cacheNames) + srv0.cache(cache).put(1, 1); + + waitForEvents(c1, 1); + } + + /** + * @param cntr Counter. + * @param expEvts Expected events number. + * @throws Exception If failed. + */ + private void waitForEvents(final AtomicInteger cntr, final int expEvts) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + if (cntr.get() < expEvts) + log.info("Wait for events [rcvd=" + cntr.get() + ", exp=" + expEvts + ']'); + + return false; + } + }, 5000); + + assertEquals(expEvts, cntr.get()); + assertTrue(cntr.compareAndSet(expEvts, 0)); + } + + /** + * @param node Node. + * @param cacheName Cache name. + * @return Received events counter. + */ + private AtomicInteger registerListener(Ignite node, String cacheName) { + ContinuousQuery qry = new ContinuousQuery(); + + final AtomicInteger cntr = new AtomicInteger(); + + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable iterable) { + for (Object evt : iterable) + cntr.incrementAndGet(); + } + }); + + node.cache(cacheName).query(qry); + + return cntr; + } + + /** * */ static class Mapper1 implements AffinityKeyMapper {
