ignite-5075

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/056b28fd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/056b28fd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/056b28fd

Branch: refs/heads/ignite-5075
Commit: 056b28fd3530691155156ac3c496223f5342629d
Parents: 70fc1a0
Author: sboikov <[email protected]>
Authored: Thu Jun 1 11:41:31 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Jun 1 11:41:31 2017 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryManager.java |  13 +--
 ...nuousQueryConcurrentPartitionUpdateTest.java | 106 +++++++++++--------
 2 files changed, 69 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/056b28fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index d472054..184e872 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -829,14 +829,15 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         }
         else {
             synchronized (this) {
-                added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
-
-                if (added) {
-                    int cnt = lsnrCnt.incrementAndGet();
-
-                    if (cctx.group().sharedGroup() && cnt == 1 && 
!cctx.isLocal())
+                if (lsnrCnt.get() == 0) {
+                    if (cctx.group().sharedGroup() && !cctx.isLocal())
                         cctx.group().addCacheWithContinuousQuery(cctx);
                 }
+
+                added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
+
+                if (added)
+                    lsnrCnt.incrementAndGet();
             }
 
             if (added)

http://git-wip-us.apache.org/repos/asf/ignite/blob/056b28fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
index 32320f6..34cb777 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -129,7 +130,7 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
 
                 caches.add(cache.getName());
 
-                cntrs.add(startListener(cache));
+                cntrs.add(startListener(cache).get1());
             }
         }
         else {
@@ -142,7 +143,7 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
 
             caches.add(cache.getName());
 
-            cntrs.add(startListener(cache));
+            cntrs.add(startListener(cache).get1());
         }
 
         Affinity<Integer> aff = srv.affinity(caches.get(0));
@@ -206,7 +207,7 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
      * @param cache Cache.
      * @return Event counter.
      */
-    private AtomicInteger startListener(IgniteCache<Object, Object> cache) {
+    private T2<AtomicInteger, QueryCursor> startListener(IgniteCache<Object, 
Object> cache) {
         final AtomicInteger evtCnt = new AtomicInteger();
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
@@ -217,14 +218,15 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
                     assertNotNull(evt.getKey());
                     assertNotNull(evt.getValue());
 
-                    evtCnt.incrementAndGet();
+                    if ((Integer)evt.getValue() >= 0)
+                        evtCnt.incrementAndGet();
                 }
             }
         });
 
-        cache.query(qry);
+        QueryCursor cur = cache.query(qry);
 
-        return evtCnt;
+        return new T2<>(evtCnt, cur);
     }
 
     /**
@@ -267,14 +269,33 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
 
         Ignite client = startGrid(1);
 
-        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+        List<String> caches = new ArrayList<>();
+
+        if (cacheGrp) {
+            for (int i = 0; i < 3; i++) {
+                CacheConfiguration ccfg = new 
CacheConfiguration(DEFAULT_CACHE_NAME + i);
 
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setAtomicityMode(atomicityMode);
+                ccfg.setGroupName("testGroup");
+                ccfg.setWriteSynchronizationMode(FULL_SYNC);
+                ccfg.setAtomicityMode(atomicityMode);
 
-        IgniteCache clientCache = client.createCache(ccfg);
+                IgniteCache cache = client.createCache(ccfg);
 
-        Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME);
+                caches.add(cache.getName());
+            }
+        }
+        else {
+            CacheConfiguration ccfg = new 
CacheConfiguration(DEFAULT_CACHE_NAME);
+
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
+            ccfg.setAtomicityMode(atomicityMode);
+
+            IgniteCache cache = client.createCache(ccfg);
+
+            caches.add(cache.getName());
+        }
+
+        Affinity<Integer> aff = srv.affinity(caches.get(0));
 
         final List<Integer> keys = new ArrayList<>();
 
@@ -294,38 +315,27 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
         final int THREADS = 10;
         final int UPDATES = 1000;
 
-        for (int i = 0; i < 5; i++) {
-            log.info("Iteration: " + i);
-
-            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
-            final AtomicInteger evtCnt = new AtomicInteger();
-
-            qry.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
-                @Override public void onUpdated(Iterable<CacheEntryEvent<?, 
?>> evts) {
-                    for (CacheEntryEvent evt : evts) {
-                        assertNotNull(evt.getKey());
-                        assertNotNull(evt.getValue());
-
-                        if ((Integer)evt.getValue() >= 0)
-                            evtCnt.incrementAndGet();
-                    }
-                }
-            });
+        final List<IgniteCache<Object, Object>> srvCaches = new ArrayList<>();
 
-            QueryCursor cur;
+        for (String cacheName : caches)
+            srvCaches.add(srv.cache(cacheName));
 
-            final IgniteCache<Object, Object> srvCache = 
srv.cache(DEFAULT_CACHE_NAME);
+        for (int i = 0; i < 5; i++) {
+            log.info("Iteration: " + i);
 
             final AtomicBoolean stop = new AtomicBoolean();
 
+            List<T2<AtomicInteger, QueryCursor> > qrys = new ArrayList<>();
+
             try {
                 IgniteInternalFuture fut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                     @Override public Void call() throws Exception {
                         ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                        while (!stop.get())
-                            srvCache.put(keys.get(rnd.nextInt(KEYS)), 
rnd.nextInt(100) - 200);
+                        while (!stop.get()) {
+                            for (IgniteCache<Object, Object> srvCache : 
srvCaches)
+                                srvCache.put(keys.get(rnd.nextInt(KEYS)), 
rnd.nextInt(100) - 200);
+                        }
 
                         return null;
                     }
@@ -333,7 +343,8 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
 
                 U.sleep(1000);
 
-                cur = clientCache.query(qry);
+                for (String cache : caches)
+                    qrys.add(startListener(client.cache(cache)));
 
                 U.sleep(1000);
 
@@ -349,25 +360,32 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
                 @Override public Void call() throws Exception {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                    for (int i = 0; i < UPDATES; i++)
-                        srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+                    for (int i = 0; i < UPDATES; i++) {
+                        for (IgniteCache<Object, Object> srvCache : srvCaches)
+                            srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+                    }
 
                     return null;
                 }
             }, THREADS, "update");
 
+            for (T2<AtomicInteger, QueryCursor>  qry : qrys) {
+                final AtomicInteger evtCnt = qry.get1();
 
-            GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                @Override public boolean apply() {
-                    log.info("Events: " + evtCnt.get());
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        log.info("Events: " + evtCnt.get());
 
-                    return evtCnt.get() >= THREADS * UPDATES;
-                }
-            }, 5000);
+                        return evtCnt.get() >= THREADS * UPDATES;
+                    }
+                }, 5000);
+            }
 
-            assertEquals(THREADS * UPDATES, evtCnt.get());
+            for (T2<AtomicInteger, QueryCursor>  qry : qrys) {
+                assertEquals(THREADS * UPDATES, qry.get1().get());
 
-            cur.close();
+                qry.get2().close();
+            }
         }
     }
 }

Reply via email to