Repository: ignite Updated Branches: refs/heads/ignite-5075-cc 3484d7a16 -> 52b7b5bad
cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52b7b5ba Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52b7b5ba Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52b7b5ba Branch: refs/heads/ignite-5075-cc Commit: 52b7b5bad8231ffa5fc89b746e1c97ad50842572 Parents: 3484d7a Author: sboikov <[email protected]> Authored: Wed May 24 14:26:13 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 24 14:26:13 2017 +0300 ---------------------------------------------------------------------- ...nuousQueryConcurrentPartitionUpdateTest.java | 133 ++++++++++++++++++- 1 file changed, 130 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/52b7b5ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java index 643257a..9c7c836 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; @@ -29,9 +30,12 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -155,8 +159,6 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo } }, THREADS, "update"); - log.info("Finished update"); - GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { log.info("Events: " + evtCnt.get()); @@ -171,7 +173,132 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo } } - public void testConcurrentUpdateAndQueryStart() throws Exception { + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatesAndQueryStartAtomic() throws Exception { + concurrentUpdatesAndQueryStart(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatesAndQueryStartTx() throws Exception { + concurrentUpdatesAndQueryStart(TRANSACTIONAL); + } + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode) throws Exception { + Ignite srv = startGrid(0); + + client = true; + + Ignite client = startGrid(1); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + + IgniteCache clientCache = client.createCache(ccfg); + + Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME); + + final List<Integer> keys = new ArrayList<>(); + + final int KEYS = 10; + + for (int i = 0; i < 100_000; i++) { + if (aff.partition(i) == 0) { + keys.add(i); + + if (keys.size() == KEYS) + break; + } + } + + assertEquals(KEYS, keys.size()); + + final int THREADS = 10; + final int UPDATES = 1000; + + for (int i = 0; i < 5; i++) { + log.info("Iteration: " + i); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + final AtomicInteger evtCnt = new AtomicInteger(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent evt : evts) { + assertNotNull(evt.getKey()); + assertNotNull(evt.getValue()); + + if ((Integer)evt.getValue() >= 0) + evtCnt.incrementAndGet(); + } + } + }); + + QueryCursor cur; + + final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); + + final AtomicBoolean stop = new AtomicBoolean(); + + try { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) + srvCache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200); + + return null; + } + }, THREADS, "update"); + + U.sleep(1000); + + cur = clientCache.query(qry); + + U.sleep(1000); + + stop.set(true); + + fut.get(); + } + finally { + stop.set(true); + } + + GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < UPDATES; i++) + srvCache.put(keys.get(rnd.nextInt(KEYS)), i); + + return null; + } + }, THREADS, "update"); + + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + log.info("Events: " + evtCnt.get()); + + return evtCnt.get() >= THREADS * UPDATES; + } + }, 5000); + + assertEquals(THREADS * UPDATES, evtCnt.get()); + + cur.close(); + } } }
