ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/056b28fd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/056b28fd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/056b28fd Branch: refs/heads/ignite-5075 Commit: 056b28fd3530691155156ac3c496223f5342629d Parents: 70fc1a0 Author: sboikov <[email protected]> Authored: Thu Jun 1 11:41:31 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jun 1 11:41:31 2017 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryManager.java | 13 +-- ...nuousQueryConcurrentPartitionUpdateTest.java | 106 +++++++++++-------- 2 files changed, 69 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/056b28fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index d472054..184e872 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -829,14 +829,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } else { synchronized (this) { - added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; - - if (added) { - int cnt = lsnrCnt.incrementAndGet(); - - if (cctx.group().sharedGroup() && cnt == 1 && !cctx.isLocal()) + if (lsnrCnt.get() == 0) { + if (cctx.group().sharedGroup() && !cctx.isLocal()) cctx.group().addCacheWithContinuousQuery(cctx); } + + added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; + + if (added) + lsnrCnt.incrementAndGet(); } if (added) http://git-wip-us.apache.org/repos/asf/ignite/blob/056b28fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java index 32320f6..34cb777 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java @@ -35,6 +35,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -129,7 +130,7 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo caches.add(cache.getName()); - cntrs.add(startListener(cache)); + cntrs.add(startListener(cache).get1()); } } else { @@ -142,7 +143,7 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo caches.add(cache.getName()); - cntrs.add(startListener(cache)); + cntrs.add(startListener(cache).get1()); } Affinity<Integer> aff = srv.affinity(caches.get(0)); @@ -206,7 +207,7 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo * @param cache Cache. * @return Event counter. */ - private AtomicInteger startListener(IgniteCache<Object, Object> cache) { + private T2<AtomicInteger, QueryCursor> startListener(IgniteCache<Object, Object> cache) { final AtomicInteger evtCnt = new AtomicInteger(); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -217,14 +218,15 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo assertNotNull(evt.getKey()); assertNotNull(evt.getValue()); - evtCnt.incrementAndGet(); + if ((Integer)evt.getValue() >= 0) + evtCnt.incrementAndGet(); } } }); - cache.query(qry); + QueryCursor cur = cache.query(qry); - return evtCnt; + return new T2<>(evtCnt, cur); } /** @@ -267,14 +269,33 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo Ignite client = startGrid(1); - CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + List<String> caches = new ArrayList<>(); + + if (cacheGrp) { + for (int i = 0; i < 3; i++) { + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME + i); - ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setAtomicityMode(atomicityMode); + ccfg.setGroupName("testGroup"); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); - IgniteCache clientCache = client.createCache(ccfg); + IgniteCache cache = client.createCache(ccfg); - Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME); + caches.add(cache.getName()); + } + } + else { + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + + IgniteCache cache = client.createCache(ccfg); + + caches.add(cache.getName()); + } + + Affinity<Integer> aff = srv.affinity(caches.get(0)); final List<Integer> keys = new ArrayList<>(); @@ -294,38 +315,27 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo final int THREADS = 10; final int UPDATES = 1000; - for (int i = 0; i < 5; i++) { - log.info("Iteration: " + i); - - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - - final AtomicInteger evtCnt = new AtomicInteger(); - - qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { - for (CacheEntryEvent evt : evts) { - assertNotNull(evt.getKey()); - assertNotNull(evt.getValue()); - - if ((Integer)evt.getValue() >= 0) - evtCnt.incrementAndGet(); - } - } - }); + final List<IgniteCache<Object, Object>> srvCaches = new ArrayList<>(); - QueryCursor cur; + for (String cacheName : caches) + srvCaches.add(srv.cache(cacheName)); - final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); + for (int i = 0; i < 5; i++) { + log.info("Iteration: " + i); final AtomicBoolean stop = new AtomicBoolean(); + List<T2<AtomicInteger, QueryCursor> > qrys = new ArrayList<>(); + try { IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { @Override public Void call() throws Exception { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - while (!stop.get()) - srvCache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200); + while (!stop.get()) { + for (IgniteCache<Object, Object> srvCache : srvCaches) + srvCache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200); + } return null; } @@ -333,7 +343,8 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo U.sleep(1000); - cur = clientCache.query(qry); + for (String cache : caches) + qrys.add(startListener(client.cache(cache))); U.sleep(1000); @@ -349,25 +360,32 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo @Override public Void call() throws Exception { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - for (int i = 0; i < UPDATES; i++) - srvCache.put(keys.get(rnd.nextInt(KEYS)), i); + for (int i = 0; i < UPDATES; i++) { + for (IgniteCache<Object, Object> srvCache : srvCaches) + srvCache.put(keys.get(rnd.nextInt(KEYS)), i); + } return null; } }, THREADS, "update"); + for (T2<AtomicInteger, QueryCursor> qry : qrys) { + final AtomicInteger evtCnt = qry.get1(); - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - log.info("Events: " + evtCnt.get()); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + log.info("Events: " + evtCnt.get()); - return evtCnt.get() >= THREADS * UPDATES; - } - }, 5000); + return evtCnt.get() >= THREADS * UPDATES; + } + }, 5000); + } - assertEquals(THREADS * UPDATES, evtCnt.get()); + for (T2<AtomicInteger, QueryCursor> qry : qrys) { + assertEquals(THREADS * UPDATES, qry.get1().get()); - cur.close(); + qry.get2().close(); + } } } }
