Repository: ignite Updated Branches: refs/heads/ignite-5075 43fbc9352 -> e914572ff
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e914572f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e914572f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e914572f Branch: refs/heads/ignite-5075 Commit: e914572ffcd464264257f83d1f53b5aa68e8f141 Parents: 43fbc93 Author: sboikov <[email protected]> Authored: Tue May 30 16:06:55 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue May 30 18:52:27 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheGroupInfrastructure.java | 2 + .../processors/cache/GridCacheProcessor.java | 11 ++-- .../continuous/CacheContinuousQueryHandler.java | 68 +++++++++----------- .../continuous/CacheContinuousQueryManager.java | 4 +- .../processors/cache/IgniteCacheGroupsTest.java | 16 +++-- ...nuousQueryConcurrentPartitionUpdateTest.java | 8 +-- .../cache/IgniteCacheAbstractBenchmark.java | 5 +- 7 files changed, 59 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 97f9324..b7d8243 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -732,6 +732,7 @@ public class CacheGroupInfrastructure { public void addCacheWithContinuousQuery(GridCacheContext cctx) { assert sharedGroup() : cacheOrGroupName(); assert cctx.group() == this : cctx.name(); + assert !cctx.isLocal() : cctx.name(); synchronized (this) { List<GridCacheContext> contQryCaches = this.contQryCaches; @@ -751,6 +752,7 @@ public class CacheGroupInfrastructure { public void removeCacheWithContinuousQuery(GridCacheContext cctx) { assert sharedGroup() : cacheOrGroupName(); assert cctx.group() == this : cctx.name(); + assert !cctx.isLocal() : cctx.name(); synchronized (this) { List<GridCacheContext> contQryCaches = this.contQryCaches; http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 82771b0..bee7860 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1303,7 +1303,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.group().stopCache(ctx, destroy); - U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.store().configuredStore())); + U.stopLifecycleAware(log, lifecycleAwares(ctx.group(), cache.configuration(), ctx.store().configuredStore())); if (log.isInfoEnabled()) { if (ctx.group().sharedGroup()) @@ -1465,7 +1465,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { prepare(cfg, toPrepare); - U.startLifecycleAware(lifecycleAwares(cfg, cfgStore)); + U.startLifecycleAware(lifecycleAwares(grp, cfg, cfgStore)); boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg); @@ -3455,14 +3455,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param grp Cache group. * @param ccfg Cache configuration. * @param objs Extra components. * @return Components provided in cache configuration which can implement {@link LifecycleAware} interface. */ - private Iterable<Object> lifecycleAwares(CacheConfiguration ccfg, Object... objs) { + private Iterable<Object> lifecycleAwares(CacheGroupInfrastructure grp, CacheConfiguration ccfg, Object... objs) { Collection<Object> ret = new ArrayList<>(7 + objs.length); - ret.add(ccfg.getAffinity()); + if (grp.affinityFunction() != ccfg.getAffinity()) + ret.add(ccfg.getAffinity()); + ret.add(ccfg.getAffinityMapper()); ret.add(ccfg.getEvictionFilter()); ret.add(ccfg.getEvictionPolicy()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 3d56531..149bd69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -493,51 +493,47 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler long cntr, AffinityTopologyVersion topVer, boolean primary) { - CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part); - if (skipCtx == null) skipCtx = new CounterSkipContext(part, cntr, topVer); - final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary); + if (loc) { + assert !locCache; - if (entryOrList != null) { - if (loc && asyncCb) { - // TODO - return skipCtx; + final Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, skipCtx.entry()); + + if (!evts.isEmpty()) { + if (asyncCb) { + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + locLsnr.onUpdated(evts); + } + }, part); + } + else + skipCtx.addSendClosure(new Runnable() { + @Override public void run() { + locLsnr.onUpdated(evts); + } + }); } + return skipCtx; + } + + CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part); + + final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary); + + if (entryOrList != null) { skipCtx.addSendClosure(new Runnable() { @Override public void run() { try { - if (loc) { - if (entryOrList instanceof CacheContinuousQueryEntry) { - CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null, - cctx, - (CacheContinuousQueryEntry)entryOrList); - - handleLocalListener(evt); - } - else { - List<CacheContinuousQueryEntry> list = - (List<CacheContinuousQueryEntry>)entryOrList; - - for (CacheContinuousQueryEntry entry : list) { - CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null, - cctx, - entry); - - handleLocalListener(evt); - } - } - } - else { - ctx.continuous().addNotification(nodeId, - routineId, - entryOrList, - topic, - false, - true); - } + ctx.continuous().addNotification(nodeId, + routineId, + entryOrList, + topic, + false, + true); } catch (ClusterTopologyCheckedException ex) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 5156455..d472054 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -834,7 +834,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (added) { int cnt = lsnrCnt.incrementAndGet(); - if (cctx.group().sharedGroup() && cnt == 1) + if (cctx.group().sharedGroup() && cnt == 1 && !cctx.isLocal()) cctx.group().addCacheWithContinuousQuery(cctx); } } @@ -866,7 +866,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if ((lsnr = lsnrs.remove(id)) != null) { int cnt = lsnrCnt.decrementAndGet(); - if (cctx.group().sharedGroup() && cnt == 0) + if (cctx.group().sharedGroup() && cnt == 0 && !cctx.isLocal()) cctx.group().removeCacheWithContinuousQuery(cctx); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 2076981..7b70472 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -595,13 +595,14 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { GridTestUtils.runMultiThreaded(cls, "loaders"); } - assertTrue("Expected: [cntr1=" + keys + ", cntr2=" + keys + "] " + - "but was: [cntr1=" + cntr1.get() + ", cntr2=" + cntr2.get() + "]", - GridTestUtils.waitForCondition(new PA() { + GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { return cntr1.get() == keys && cntr2.get() == keys; } - }, 2000)); + }, 2000); + + assertEquals(cntr1.get(), keys); + assertEquals(cntr2.get(), keys); qry1.close(); @@ -610,12 +611,13 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { srv0.cache(CACHE1).putAll(map); srv0.cache(CACHE2).putAll(map); - assertTrue("Expected: <" + keys + 10 + "> but was: <" + cntr2.get() + ">", - GridTestUtils.waitForCondition(new PA() { + GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { return cntr2.get() == keys + 10; } - }, 2000)); + }, 2000); + + assertEquals(keys + 10, cntr2.get()); assertEquals(keys, cntr1.get()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java index ed0dec0..32320f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java @@ -91,15 +91,15 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo /** * @throws Exception If failed. */ - public void testConcurrentUpdatePartitionTxCacheGroup() throws Exception { - concurrentUpdatePartition(TRANSACTIONAL, true); + public void testConcurrentUpdatePartitionAtomicCacheGroup() throws Exception { + concurrentUpdatePartition(ATOMIC, true); } /** * @throws Exception If failed. */ - public void testConcurrentUpdatePartitionAtomicCacheGroup() throws Exception { - concurrentUpdatePartition(ATOMIC, true); + public void testConcurrentUpdatePartitionTxCacheGroup() throws Exception { + concurrentUpdatePartition(TRANSACTIONAL, true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java index 6514dd7..9c0344d 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java @@ -233,8 +233,9 @@ public abstract class IgniteCacheAbstractBenchmark<K, V> extends IgniteAbstractB loadCacheData(cacheName); - println(cfg, "Finished populating data [cache=" + cacheName + - ", time=" + ((System.nanoTime() - start) / 1_000_000) + "ms]"); + long time = ((System.nanoTime() - start) / 1_000_000); + + println(cfg, "Finished populating data [cache=" + cacheName + ", time=" + time + "ms]"); } /**
