Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-cc 3484d7a16 -> 52b7b5bad


cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52b7b5ba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52b7b5ba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52b7b5ba

Branch: refs/heads/ignite-5075-cc
Commit: 52b7b5bad8231ffa5fc89b746e1c97ad50842572
Parents: 3484d7a
Author: sboikov <[email protected]>
Authored: Wed May 24 14:26:13 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed May 24 14:26:13 2017 +0300

----------------------------------------------------------------------
 ...nuousQueryConcurrentPartitionUpdateTest.java | 133 ++++++++++++++++++-
 1 file changed, 130 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/52b7b5ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
index 643257a..9c7c836 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
@@ -29,9 +30,12 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -155,8 +159,6 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
                 }
             }, THREADS, "update");
 
-            log.info("Finished update");
-
             GridTestUtils.waitForCondition(new GridAbsPredicate() {
                 @Override public boolean apply() {
                     log.info("Events: " + evtCnt.get());
@@ -171,7 +173,132 @@ public class 
CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
         }
     }
 
-    public void testConcurrentUpdateAndQueryStart() throws Exception {
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatesAndQueryStartAtomic() throws Exception {
+        concurrentUpdatesAndQueryStart(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatesAndQueryStartTx() throws Exception {
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL);
+    }
 
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void concurrentUpdatesAndQueryStart(CacheAtomicityMode 
atomicityMode) throws Exception {
+        Ignite srv = startGrid(0);
+
+        client = true;
+
+        Ignite client = startGrid(1);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+
+        IgniteCache clientCache = client.createCache(ccfg);
+
+        Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME);
+
+        final List<Integer> keys = new ArrayList<>();
+
+        final int KEYS = 10;
+
+        for (int i = 0; i < 100_000; i++) {
+            if (aff.partition(i) == 0) {
+                keys.add(i);
+
+                if (keys.size() == KEYS)
+                    break;
+            }
+        }
+
+        assertEquals(KEYS, keys.size());
+
+        final int THREADS = 10;
+        final int UPDATES = 1000;
+
+        for (int i = 0; i < 5; i++) {
+            log.info("Iteration: " + i);
+
+            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+            final AtomicInteger evtCnt = new AtomicInteger();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<?, 
?>> evts) {
+                    for (CacheEntryEvent evt : evts) {
+                        assertNotNull(evt.getKey());
+                        assertNotNull(evt.getValue());
+
+                        if ((Integer)evt.getValue() >= 0)
+                            evtCnt.incrementAndGet();
+                    }
+                }
+            });
+
+            QueryCursor cur;
+
+            final IgniteCache<Object, Object> srvCache = 
srv.cache(DEFAULT_CACHE_NAME);
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            try {
+                IgniteInternalFuture fut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                        while (!stop.get())
+                            srvCache.put(keys.get(rnd.nextInt(KEYS)), 
rnd.nextInt(100) - 200);
+
+                        return null;
+                    }
+                }, THREADS, "update");
+
+                U.sleep(1000);
+
+                cur = clientCache.query(qry);
+
+                U.sleep(1000);
+
+                stop.set(true);
+
+                fut.get();
+            }
+            finally {
+                stop.set(true);
+            }
+
+            GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    for (int i = 0; i < UPDATES; i++)
+                        srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+
+                    return null;
+                }
+            }, THREADS, "update");
+
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    log.info("Events: " + evtCnt.get());
+
+                    return evtCnt.get() >= THREADS * UPDATES;
+                }
+            }, 5000);
+
+            assertEquals(THREADS * UPDATES, evtCnt.get());
+
+            cur.close();
+        }
     }
 }

Reply via email to