Repository: ignite Updated Branches: refs/heads/ignite-2515 1bfd9e470 -> 0295a56e1
IGNITE-2515 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0295a56e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0295a56e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0295a56e Branch: refs/heads/ignite-2515 Commit: 0295a56e174713bdf19dc79dd27e37b16178a931 Parents: 1bfd9e4 Author: nikolay_tikhonov <[email protected]> Authored: Mon Feb 15 21:05:51 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Feb 15 21:05:51 2016 +0300 ---------------------------------------------------------------------- .../cache/query/CacheQueryEntryEvent.java | 47 +++ .../continuous/CacheContinuousQueryEvent.java | 11 +- .../continuous/CacheContinuousQueryHandler.java | 67 ++-- ...acheContinuousQueryRandomOperationsTest.java | 352 +++++++++++++------ 4 files changed, 353 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0295a56e/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java new file mode 100644 index 0000000..6fda0cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import javax.cache.Cache; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.EventType; + +/** + * A Cache continuous query entry event. + * + * @param <K> the type of key + * @param <V> the type of value + */ +public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> { + /** + * Constructs a cache entry event from a given cache as source. + * + * @param source the cache that originated the event + * @param eventType Event type. + */ + public CacheQueryEntryEvent(Cache source, EventType eventType) { + super(source, eventType); + } + + /** + * On each update occurred to increment counter. For + * + * @return Value of counter for this event. + */ + public abstract long getPartitionUpdateCounter(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0295a56e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index d1c7c28..eab5dbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import javax.cache.Cache; -import javax.cache.event.CacheEntryEvent; +import org.apache.ignite.cache.query.CacheQueryEntryEvent; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; /** * Continuous query event. */ -class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { +class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> { /** */ private static final long serialVersionUID = 0L; @@ -77,11 +77,14 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { } /** {@inheritDoc} */ + @Override public long getPartitionUpdateCounter() { + return e.updateCounter(); + } + + /** {@inheritDoc} */ @Override public <T> T unwrap(Class<T> cls) { if (cls.isAssignableFrom(getClass())) return cls.cast(this); - else if (cls == Long.class) - return cls.cast(e.updateCounter()); throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0295a56e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 267948d..cf622a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -368,11 +368,55 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (!internal && !skipPrimaryCheck) sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + + if (recordIgniteEvt) { + for (CacheEntryEvent<? extends K, ? extends V> e : evts) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS.name(), + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName(), + e.getKey(), + e.getValue(), + e.getOldValue(), + null + )); + } + } } } else { - if (!entry.isFiltered()) + if (!entry.isFiltered()) { locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + + if (recordIgniteEvt) + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS.name(), + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } } } else { @@ -406,27 +450,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler catch (IgniteCheckedException ex) { U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); } - - if (recordIgniteEvt && notify) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.CONTINUOUS.name(), - cacheName, - null, - null, - null, - rmtFilter, - null, - nodeId, - taskName(), - evt.getKey(), - evt.getValue(), - evt.getOldValue(), - null - )); - } } @Override public void onUnregister() { http://git-wip-us.apache.org/repos/asf/ignite/blob/0295a56e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java index 3cd58d6..abbb807 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@ -20,13 +20,16 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ThreadLocalRandom; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEvent; @@ -41,6 +44,7 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.CacheQueryEntryEvent; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.store.CacheStore; @@ -53,6 +57,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -65,6 +71,9 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; /** * @@ -77,13 +86,13 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract private static final int NODES = 5; /** */ - private static final int KEYS = 10; + private static final int KEYS = 50; /** */ private static final int VALS = 10; /** */ - public static final int ITERATION_CNT = 1000; + public static final int ITERATION_CNT = 100; /** */ private boolean client; @@ -127,7 +136,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -140,7 +149,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -153,7 +162,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, null, false); + testContinuousQuery(ccfg, ALL); } /** @@ -166,7 +175,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -179,7 +188,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, ALL); } /** @@ -192,7 +201,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -205,7 +214,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_VALUES, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -218,7 +227,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_VALUES, false); - testContinuousQuery(ccfg, null, false); + testContinuousQuery(ccfg, ALL); } /** @@ -231,7 +240,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_VALUES, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -244,7 +253,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -257,7 +266,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_TIERED, false); - testContinuousQuery(ccfg, null, false); + testContinuousQuery(ccfg, ALL); } /** @@ -270,7 +279,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_TIERED, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -283,7 +292,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -296,7 +305,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, null, false); + testContinuousQuery(ccfg, ALL); } /** @@ -309,7 +318,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -322,7 +331,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -335,7 +344,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, null, false); + testContinuousQuery(ccfg, ALL); } /** @@ -348,7 +357,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, false, true); + testContinuousQuery(ccfg, SERVER); } /** @@ -361,7 +370,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -374,7 +383,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -387,7 +396,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -400,7 +409,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -413,7 +422,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_VALUES, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -426,7 +435,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_VALUES, false); - testContinuousQuery(ccfg, null, false); + testContinuousQuery(ccfg, ALL); } /** @@ -439,7 +448,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_VALUES, false); - testContinuousQuery(ccfg, false, true); + testContinuousQuery(ccfg, SERVER); } /** @@ -452,7 +461,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_VALUES, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -465,7 +474,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -478,7 +487,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_TIERED, false); - testContinuousQuery(ccfg, null, false); + testContinuousQuery(ccfg, ALL); } /** @@ -491,7 +500,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_TIERED, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** @@ -504,7 +513,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_TIERED, false); - testContinuousQuery(ccfg, true, true); + testContinuousQuery(ccfg, CLIENT); } /** @@ -517,7 +526,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -530,7 +539,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, null, false); + testContinuousQuery(ccfg, ALL); } /** @@ -543,7 +552,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, false, false); + testContinuousQuery(ccfg, SERVER); } /** @@ -556,43 +565,52 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg, true, false); + testContinuousQuery(ccfg, CLIENT); } /** * @param ccfg Cache configuration. - * @param client Client. If {@code null} then listener will be registered on all nodes. - * @param expTx Explicit tx. + * @param deploy The place where continuous query will be started. * @throws Exception If failed. */ - private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, Boolean client, boolean expTx) + private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) throws Exception { ignite(0).createCache(ccfg); try { - IgniteCache<Object, Object> cache = null; - - if (client != null) { - if (client) - cache = ignite(NODES - 1).cache(ccfg.getName()); - else - cache = ignite(0).cache(ccfg.getName()); - } - long seed = System.currentTimeMillis(); Random rnd = new Random(seed); log.info("Random seed: " + seed); - final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = - new ArrayBlockingQueue<>(50_000); + List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>(); Collection<QueryCursor<?>> curs = new ArrayList<>(); - if (cache != null) { + if (deploy == CLIENT) { + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }); + + evtsQueues.add(evtsQueue); + + QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry); + + curs.add(cur); + } + else if (deploy == SERVER) { ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { for (CacheEntryEvent<?, ?> evt : evts) @@ -600,7 +618,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } }); - QueryCursor<?> cur = cache.query(qry); + evtsQueues.add(evtsQueue); + + QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry); curs.add(cur); } @@ -608,6 +628,8 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract for (int i = 0; i < NODES - 1; i++) { ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { for (CacheEntryEvent<?, ?> evt : evts) @@ -615,12 +637,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } }); + evtsQueues.add(evtsQueue); + QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry); curs.add(cur); } - - cache = ignite(ThreadLocalRandom.current().nextInt(NODES - 1)).cache(ccfg.getName()); } ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); @@ -629,10 +651,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract try { for (int i = 0; i < ITERATION_CNT; i++) { - if (i % 100 == 0) + if (i % 20 == 0) log.info("Iteration: " + i); - randomUpdate(rnd, evtsQueue, expData, partCntr, cache, expTx, curs.size()); + for (int idx = 0; idx < NODES; idx++) + randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName())); } } finally { @@ -647,35 +670,31 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract /** * @param rnd Random generator. - * @param evtsQueue Events queue. + * @param evtsQueues Events queue. * @param expData Expected cache data. * @param partCntr Partition counter. * @param cache Cache. - * @param expTx Explicit TX. - * @param qryCnt Query count. * @throws Exception If failed. */ private void randomUpdate( Random rnd, - BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue, + List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, ConcurrentMap<Object, Object> expData, Map<Integer, Long> partCntr, - IgniteCache<Object, Object> cache, - boolean expTx, - int qryCnt) + IgniteCache<Object, Object> cache) throws Exception { Object key = new QueryTestKey(rnd.nextInt(KEYS)); Object newVal = value(rnd); Object oldVal = expData.get(key); - int op = rnd.nextInt(11); + int op = rnd.nextInt(13); Ignite ignite = cache.unwrap(Ignite.class); Transaction tx = null; - if (expTx && cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) - tx = ignite.transactions().txStart(); + if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean()) + tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); try { // log.info("Random operation [key=" + key + ", op=" + op + ']'); @@ -689,7 +708,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); expData.put(key, newVal); @@ -704,7 +723,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); expData.put(key, newVal); @@ -719,7 +738,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); expData.remove(key); @@ -734,7 +753,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); expData.remove(key); @@ -749,7 +768,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); expData.put(key, newVal); @@ -764,7 +783,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); expData.remove(key); @@ -780,12 +799,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract if (oldVal == null) { updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, null, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); expData.put(key, newVal); } else - checkNoEvent(evtsQueue); + checkNoEvent(evtsQueues); break; } @@ -799,12 +818,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract if (oldVal == null) { updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, null, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); expData.put(key, newVal); } else - checkNoEvent(evtsQueue); + checkNoEvent(evtsQueues); break; } @@ -818,12 +837,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract if (oldVal != null) { updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); expData.put(key, newVal); } else - checkNoEvent(evtsQueue); + checkNoEvent(evtsQueues); break; } @@ -837,12 +856,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract if (oldVal != null) { updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); expData.put(key, newVal); } else - checkNoEvent(evtsQueue); + checkNoEvent(evtsQueues); break; } @@ -861,7 +880,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract updatePartitionCounter(cache, key, partCntr); - waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal, qryCnt); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); expData.put(key, newVal); } @@ -871,7 +890,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract if (tx != null) tx.commit(); - checkNoEvent(evtsQueue); + checkNoEvent(evtsQueues); } } else { @@ -880,14 +899,57 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract if (tx != null) tx.commit(); - checkNoEvent(evtsQueue); + checkNoEvent(evtsQueues); } break; } + case 11: { + SortedMap<Object, Object> vals = new TreeMap<>(); + + while (vals.size() < KEYS / 5) + vals.put(new QueryTestKey(rnd.nextInt(KEYS)), value(rnd)); + + cache.putAll(vals); + + if (tx != null) + tx.commit(); + + for (Map.Entry<Object, Object> e : vals.entrySet()) + updatePartitionCounter(cache, e.getKey(), partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData); + + expData.putAll(vals); + + break; + } + + case 12: { + SortedMap<Object, Object> vals = new TreeMap<>(); + + while (vals.size() < KEYS / 5) + vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal); + + cache.invokeAll(vals.keySet(), new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + for (Map.Entry<Object, Object> e : vals.entrySet()) + updatePartitionCounter(cache, e.getKey(), partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData); + + for (Object o : vals.keySet()) + expData.put(o, newVal); + + break; + } + default: - fail(); + fail("Op:" + op); } } finally { if (tx != null) @@ -896,6 +958,83 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } /** + * @param evtsQueues Queue. + * @param partCntrs Counters. + * @param aff Affinity. + * @param vals Values. + * @param expData Expected data. + */ + private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, + Map<Integer, Long> partCntrs, + Affinity<Object> aff, + SortedMap<Object, Object> vals, + Map<Object, Object> expData) + throws Exception { + for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { + Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>(); + + for (int i = 0; i < vals.size(); i++) { + CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); + + rcvEvts.put(evt.getKey(), evt); + } + + assertEquals(vals.size(), rcvEvts.size()); + + for (Map.Entry<Object, Object> e : vals.entrySet()) { + Object key = e.getKey(); + Object val = e.getValue(); + Object oldVal = expData.get(key); + + if (val == null && oldVal == null) { + checkNoEvent(evtsQueues); + + return; + } + + CacheEntryEvent evt = rcvEvts.get(key); + + assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', + evt); + assertEquals(key, evt.getKey()); + assertEquals(val, evt.getValue()); + assertEquals(oldVal, evt.getOldValue()); + + long cntr = partCntrs.get(aff.partition(key)); + CacheQueryEntryEvent qryEntryEvt = (CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class); + + assertNotNull(cntr); + assertNotNull(qryEntryEvt); + + assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); + } + } + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionIsolation}. + */ + private TransactionIsolation txRandomIsolation(Random rnd) { + double val = rnd.nextDouble(); + + if (val < 1/3) + return TransactionIsolation.READ_COMMITTED; + else if (val < 2/3) + return TransactionIsolation.REPEATABLE_READ; + else + return TransactionIsolation.SERIALIZABLE; + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionConcurrency}. + */ + private TransactionConcurrency txRandomConcurrency(Random rnd) { + return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC; + } + + /** * @param cache Cache. * @param key Key * @param cntrs Partition counters. @@ -922,28 +1061,28 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } /** - * @param evtsQueue Event queue. + * @param evtsQueues Event queue. * @param partCntrs Partition counters. * @param aff Affinity function. * @param key Key. * @param val Value. * @param oldVal Old value. - * @param qryCnt Query count. * @throws Exception If failed. */ - private void waitAndCheckEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue, + private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, Map<Integer, Long> partCntrs, Affinity<Object> aff, Object key, Object val, - Object oldVal, int qryCnt) throws Exception { + Object oldVal) + throws Exception { if (val == null && oldVal == null) { - checkNoEvent(evtsQueue); + checkNoEvent(evtsQueues); return; } - for (int i = 0; i < qryCnt; i++) { + for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt); @@ -951,21 +1090,26 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract assertEquals(val, evt.getValue()); assertEquals(oldVal, evt.getOldValue()); - Long cntr = partCntrs.get(aff.partition(key)); + long cntr = partCntrs.get(aff.partition(key)); + CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class); assertNotNull(cntr); - assertEquals(cntr, evt.unwrap(Long.class)); + assertNotNull(qryEntryEvt); + + assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); } } /** - * @param evtsQueue Event queue. + * @param evtsQueues Event queue. * @throws Exception If failed. */ - private void checkNoEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue) throws Exception { - CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS); + private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception { + for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { + CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS); - assertNull(evt); + assertNull(evt); + } } /** @@ -1029,7 +1173,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract /** * */ - static class QueryTestKey implements Serializable { + static class QueryTestKey implements Serializable, Comparable { /** */ private final Integer key; @@ -1062,6 +1206,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract @Override public String toString() { return S.toString(QueryTestKey.class, this); } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } } /** @@ -1146,4 +1295,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract return S.toString(EntrySetValueProcessor.class, this); } } + + /** + * + */ + protected enum ContinuousDeploy { + CLIENT, SERVER, ALL + } }
