Fixed "IGNITE-2515 Make 'updateCntr' available through CacheContinuousQueryEvent public API"
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb35e1d7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb35e1d7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb35e1d7 Branch: refs/heads/ignite-2407 Commit: cb35e1d7eaaea470e0afca99d5de9b0aec3e58ae Parents: 60b6f09 Author: nikolay_tikhonov <[email protected]> Authored: Tue Feb 16 17:44:38 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Feb 16 17:44:38 2016 +0300 ---------------------------------------------------------------------- .../cache/query/CacheQueryEntryEvent.java | 48 + .../continuous/CacheContinuousQueryEvent.java | 17 +- .../continuous/CacheContinuousQueryHandler.java | 95 +- .../CacheContinuousQueryListener.java | 3 +- .../continuous/CacheContinuousQueryManager.java | 26 +- ...CacheContinuousQueryCounterAbstractTest.java | 613 +++++++++++++ ...inuousQueryCounterPartitionedAtomicTest.java | 41 + ...ContinuousQueryCounterPartitionedTxTest.java | 41 + ...tinuousQueryCounterReplicatedAtomicTest.java | 41 + ...eContinuousQueryCounterReplicatedTxTest.java | 41 + ...acheContinuousQueryRandomOperationsTest.java | 896 ++++++++++++++++--- .../IgniteCacheQuerySelfTestSuite.java | 8 + 12 files changed, 1652 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java new file mode 100644 index 0000000..2c1c5e6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import javax.cache.Cache; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.EventType; + +/** + * A Cache continuous query entry event. + * + * @param <K> the type of key + * @param <V> the type of value + */ +public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> { + /** + * Constructs a cache entry event from a given cache as source. + * + * @param source the cache that originated the event + * @param eventType Event type. + */ + public CacheQueryEntryEvent(Cache source, EventType eventType) { + super(source, eventType); + } + + /** + * Each cache update increases partition counter. The same cache updates have on the same value of counter + * on primary and backup nodes. This value can be useful to communicate with external applications. + * + * @return Value of counter for this event. + */ + public abstract long getPartitionUpdateCounter(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index d26e666..eab5dbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import javax.cache.Cache; -import javax.cache.event.CacheEntryEvent; +import org.apache.ignite.cache.query.CacheQueryEntryEvent; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; /** * Continuous query event. */ -class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { +class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> { /** */ private static final long serialVersionUID = 0L; @@ -57,8 +57,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { } /** {@inheritDoc} */ - @Override - public K getKey() { + @Override public K getKey() { return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false); } @@ -68,8 +67,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { } /** {@inheritDoc} */ - @Override - public V getOldValue() { + @Override public V getOldValue() { return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.oldValue(), e.isKeepBinary(), false); } @@ -79,8 +77,13 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { } /** {@inheritDoc} */ + @Override public long getPartitionUpdateCounter() { + return e.updateCounter(); + } + + /** {@inheritDoc} */ @Override public <T> T unwrap(Class<T> cls) { - if(cls.isAssignableFrom(getClass())) + if (cls.isAssignableFrom(getClass())) return cls.cast(this); throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 498f37d..08fe62a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -325,9 +325,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler // skipPrimaryCheck is set only when listen locally for replicated cache events. assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); - boolean notify = true; + boolean notify = !evt.entry().isFiltered(); - if (rmtFilter != null) { + if (notify && rmtFilter != null) { try { notify = rmtFilter.evaluate(evt); } @@ -472,38 +472,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); } - @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer) { - try { - assert evt != null; - - CacheContinuousQueryEntry e = evt.entry(); - - EntryBuffer buf = entryBufs.get(e.partition()); + @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, + boolean primary) { + assert evt != null; - if (buf == null) { - buf = new EntryBuffer(); - - EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf); - - if (oldRec != null) - buf = oldRec; - } + CacheContinuousQueryEntry e = evt.entry(); - e = buf.skipEntry(e); - - if (e != null && !ctx.localNodeId().equals(nodeId)) - ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true); - } - catch (ClusterTopologyCheckedException ex) { - IgniteLogger log = ctx.log(getClass()); + e.markFiltered(); - if (log.isDebugEnabled()) - log.debug("Failed to send event notification to node, node left cluster " + - "[node=" + nodeId + ", err=" + ex + ']'); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); - } + onEntryUpdated(evt, primary, false); } @Override public void onPartitionEvicted(int part) { @@ -618,20 +595,22 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler for (CacheContinuousQueryEntry e : entries) entries0.addAll(handleEvent(ctx, e)); - Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, - new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { - @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<>(cache, cctx, e); - } - }, - new IgnitePredicate<CacheContinuousQueryEntry>() { - @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.isFiltered(); + if (!entries0.isEmpty()) { + Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, + new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { + @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { + return new CacheContinuousQueryEvent<>(cache, cctx, e); + } + }, + new IgnitePredicate<CacheContinuousQueryEntry>() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.isFiltered(); + } } - } - ); + ); - locLsnr.onUpdated(evts); + locLsnr.onUpdated(evts); + } } /** @@ -731,11 +710,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param initCntr Update counters. */ public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) { - assert topVer.topologyVersion() > 0 : topVer; - this.log = log; if (initCntr != null) { + assert topVer.topologyVersion() > 0 : topVer; + this.lastFiredEvt = initCntr; curTop = topVer; @@ -878,32 +857,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** - * @param e Entry. - * @return Continuous query entry. - */ - private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) { - if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) { - e.markFiltered(); - - return e; - } - else { - buf.add(e.updateCounter()); - - // Double check. If another thread sent a event with counter higher than this event. - if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) { - buf.remove(e.updateCounter()); - - e.markFiltered(); - - return e; - } - else - return null; - } - } - - /** * Add continuous entry. * * @param e Cache continuous query entry. http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index dce04de..83ff32c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -67,8 +67,9 @@ public interface CacheContinuousQueryListener<K, V> { /** * @param evt Event * @param topVer Topology version. + * @param primary Primary */ - public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer); + public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary); /** * @param part Partition. http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 968fc23..840a61b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -166,7 +166,27 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param topVer Topology version. */ public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, - KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) { + KeyCacheObject key, + int partId, + long updCntr, + AffinityTopologyVersion topVer) { + skipUpdateEvent(lsnrs, key, partId, updCntr, true, topVer); + } + + /** + * @param lsnrs Listeners to notify. + * @param key Entry key. + * @param partId Partition id. + * @param updCntr Updated counter. + * @param topVer Topology version. + * @param primary Primary. + */ + public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, + KeyCacheObject key, + int partId, + long updCntr, + boolean primary, + AffinityTopologyVersion topVer) { assert lsnrs != null; for (CacheContinuousQueryListener lsnr : lsnrs.values()) { @@ -184,7 +204,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.skipUpdateEvent(evt, topVer); + lsnr.skipUpdateEvent(evt, topVer, primary); } } @@ -281,7 +301,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean hasOldVal = oldVal != null; if (!hasNewVal && !hasOldVal) { - skipUpdateEvent(lsnrCol, key, partId, updateCntr, topVer); + skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, topVer); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java new file mode 100644 index 0000000..d8a5006 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.CacheQueryEntryEvent; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Continuous queries counter tests. + */ +public abstract class CacheContinuousQueryCounterAbstractTest extends GridCommonAbstractTest + implements Serializable { + /** */ + protected static final String CACHE_NAME = "test_cache"; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Latch timeout. */ + protected static final long LATCH_TIMEOUT = 5000; + + /** */ + private static final String NO_CACHE_GRID_NAME = "noCacheGrid"; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(peerClassLoadingEnabled()); + + if (gridName.equals(NO_CACHE_GRID_NAME)) + cfg.setClientMode(true); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** + * @return Cache configuration. + */ + @NotNull private CacheConfiguration cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName(CACHE_NAME); + cacheCfg.setCacheMode(cacheMode()); + cacheCfg.setAtomicityMode(atomicityMode()); + cacheCfg.setNearConfiguration(nearConfiguration()); + cacheCfg.setRebalanceMode(ASYNC); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setCacheStoreFactory(new StoreFactory()); + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(true); + cacheCfg.setLoadPreviousValue(true); + + return cacheCfg; + } + + /** + * @return Peer class loading enabled flag. + */ + protected boolean peerClassLoadingEnabled() { + return true; + } + + /** + * @return Distribution. + */ + protected NearCacheConfiguration nearConfiguration() { + return new NearCacheConfiguration(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + for (int i = 0; i < gridCount(); i++) { + if (grid(i).cluster().nodes().size() != gridCount()) + return false; + } + + return true; + } + }, 3000); + + for (int i = 0; i < gridCount(); i++) + grid(i).destroyCache(CACHE_NAME); + + for (int i = 0; i < gridCount(); i++) + grid(i).getOrCreateCache(cacheConfiguration()); + } + + /** + * @return Cache mode. + */ + protected abstract CacheMode cacheMode(); + + /** + * @return Atomicity mode. + */ + protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** + * @return Grids count. + */ + protected abstract int gridCount(); + + /** + * @throws Exception If failed. + */ + public void testAllEntries() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>(); + final CountDownLatch latch = new CountDownLatch(5); + + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { + synchronized (map) { + List<T2<Integer, Long>> vals = map.get(e.getKey()); + + if (vals == null) { + vals = new ArrayList<>(); + + map.put(e.getKey(), vals); + } + + vals.add(new T2<>(e.getValue(), e + .unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter())); + } + + latch.countDown(); + } + } + }); + + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.put(1, 1); + cache.put(2, 2); + cache.put(3, 3); + + cache.remove(2); + + cache.put(1, 10); + + assert latch.await(LATCH_TIMEOUT, MILLISECONDS); + + assertEquals(3, map.size()); + + List<T2<Integer, Long>> vals = map.get(1); + + assertNotNull(vals); + assertEquals(2, vals.size()); + assertEquals(1, (int)vals.get(0).get1()); + assertEquals(1L, (long)vals.get(0).get2()); + assertEquals(10, (int)vals.get(1).get1()); + assertEquals(2L, (long)vals.get(1).get2()); + + vals = map.get(2); + + assertNotNull(vals); + assertEquals(2, vals.size()); + assertEquals(2, (int)vals.get(0).get1()); + assertEquals(1L, (long)vals.get(0).get2()); + assertNull(vals.get(1).get1()); + + vals = map.get(3); + + assertNotNull(vals); + assertEquals(1, vals.size()); + assertEquals(3, (int)vals.get(0).get1()); + assertEquals(1L, (long)vals.get(0).get2()); + } + } + + /** + * @throws Exception If failed. + */ + public void testTwoQueryListener() throws Exception { + if (cacheMode() == LOCAL) + return; + + final IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME); + final IgniteCache<Integer, Integer> cache1 = grid(1).cache(CACHE_NAME); + + final AtomicInteger cntr = new AtomicInteger(0); + final AtomicInteger cntr1 = new AtomicInteger(0); + + final ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>(); + final ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>(); + + final Map<Integer, List<T2<Integer, Long>>> map1 = new HashMap<>(); + final Map<Integer, List<T2<Integer, Long>>> map2 = new HashMap<>(); + + qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { + cntr.incrementAndGet(); + + synchronized (map1) { + List<T2<Integer, Long>> vals = map1.get(e.getKey()); + + if (vals == null) { + vals = new ArrayList<>(); + + map1.put(e.getKey(), vals); + } + + vals.add(new T2<>(e.getValue(), + e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter())); + } + } + } + }); + + qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { + cntr1.incrementAndGet(); + + synchronized (map2) { + List<T2<Integer, Long>> vals = map2.get(e.getKey()); + + if (vals == null) { + vals = new ArrayList<>(); + + map2.put(e.getKey(), vals); + } + + vals.add(new T2<>(e.getValue(), + e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter())); + } + } + } + }); + + try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2); + QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) { + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Object, Object> cache0 = grid(i).cache(CACHE_NAME); + + cache0.put(1, 1); + cache0.put(2, 2); + cache0.put(3, 3); + + cache0.remove(1); + cache0.remove(2); + cache0.remove(3); + + final int iter = i + 1; + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return iter * 6 /* count operation */ * 2 /* count continues queries*/ + == (cntr.get() + cntr1.get()); + } + }, 5000L); + + checkEvents(map1, i); + + map1.clear(); + + checkEvents(map2, i); + + map2.clear(); + } + } + } + + /** + * @param evnts Events. + * @param iter Iteration. + */ + private void checkEvents(Map<Integer, List<T2<Integer, Long>>> evnts, long iter) { + List<T2<Integer, Long>> val = evnts.get(1); + + assertEquals(val.size(), 2); + + // Check put 1 + assertEquals(iter * 2 + 1, (long)val.get(0).get2()); + assertEquals(1L, (long)val.get(0).get1()); + + // Check remove 1 + assertEquals(iter * 2 + 2, (long)val.get(1).get2()); + assertNull(val.get(1).get1()); + + val = evnts.get(2); + + assertEquals(val.size(), 2); + + // Check put 2 + assertEquals(iter * 2 + 1, (long)val.get(0).get2()); + assertEquals(2L, (long)val.get(0).get1()); + + // Check remove 2 + assertEquals(iter * 2 + 2, (long)val.get(1).get2()); + assertNull(val.get(1).get1()); + + val = evnts.get(3); + + assertEquals(val.size(), 2); + + // Check put 3 + assertEquals(iter * 2 + 1, (long)val.get(0).get2()); + assertEquals(3L, (long)val.get(0).get1()); + + // Check remove 3 + assertEquals(iter * 2 + 2, (long)val.get(1).get2()); + assertNull(val.get(1).get1()); + } + + /** + * @throws Exception If failed. + */ + public void testRestartQuery() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME); + + final int keyCnt = 300; + + final int updateKey = 1; + + for (int i = 0; i < keyCnt; i++) + cache.put(updateKey, i); + + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + final AtomicInteger cntr = new AtomicInteger(0); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + final List<T2<Integer, Long>> vals = new ArrayList<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated( + Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { + cntr.incrementAndGet(); + + synchronized (vals) { + vals.add(new T2<>(e.getValue(), + e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter())); + } + } + } + }); + + try (QueryCursor<Cache.Entry<Integer, Integer>> ignore = cache.query(qry)) { + for (int key = 0; key < keyCnt; key++) + cache.put(updateKey, cache.get(updateKey) + 1); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cntr.get() == keyCnt; + } + }, 2000L); + + for (T2<Integer, Long> val : vals) { + assertEquals(vals.size(), keyCnt); + + assertEquals((long)val.get1() + 1, (long)val.get2()); + } + } + } + else { + for (int key = 0; key < keyCnt; key++) + cache.put(updateKey, cache.get(updateKey) + 1); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testEntriesByFilter() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>(); + final CountDownLatch latch = new CountDownLatch(8); + + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { + synchronized (map) { + List<T2<Integer, Long>> vals = map.get(e.getKey()); + + if (vals == null) { + vals = new ArrayList<>(); + + map.put(e.getKey(), vals); + } + + vals.add(new T2<>(e.getValue(), + e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter())); + } + + latch.countDown(); + } + } + }); + + qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer,Integer>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) { + return evt.getValue() % 2 == 0; + } + }); + + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.put(1, 1); + cache.put(1, 2); + cache.put(1, 3); + cache.put(1, 4); + + cache.put(2, 1); + cache.put(2, 2); + cache.put(2, 3); + cache.put(2, 4); + + cache.remove(1); + cache.remove(2); + + cache.put(1, 10); + cache.put(2, 40); + + assert latch.await(LATCH_TIMEOUT, MILLISECONDS); + + assertEquals(2, map.size()); + + List<T2<Integer, Long>> vals = map.get(1); + + assertNotNull(vals); + assertEquals(4, vals.size()); + + assertEquals((int)vals.get(0).get1(), 2); + assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2()); + + assertEquals((int)vals.get(1).get1(), 4); + assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2()); + + assertNull(vals.get(2).get1()); + assertEquals(5, (long)vals.get(2).get2()); + + assertEquals((int)vals.get(3).get1(), 10); + assertEquals(6, (long)vals.get(3).get2()); + + vals = map.get(2); + + assertNotNull(vals); + assertEquals(4, vals.size()); + + assertEquals((int)vals.get(0).get1(), 2); + assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2()); + + assertEquals((int)vals.get(1).get1(), 4); + assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2()); + + assertNull(vals.get(2).get1()); + assertEquals(5, (long)vals.get(2).get2()); + + assertEquals((int)vals.get(3).get1(), 40); + assertEquals(6, (long)vals.get(3).get2()); + } + } + + /** + * @throws Exception If failed. + */ + public void testLoadCache() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + final Map<Integer, T2<Integer, Long>> map = new ConcurrentHashMap8<>(); + final CountDownLatch latch = new CountDownLatch(10); + + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) { + map.put(e.getKey(), new T2<>(e.getValue(), + e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter())); + + latch.countDown(); + } + } + }); + + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { + cache.loadCache(null, 0); + + assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount(); + + assertEquals(10, map.size()); + + for (int i = 0; i < 10; i++) { + assertEquals(i, (int)map.get(i).get1()); + assertEquals((long)1, (long)map.get(i).get2()); + } + } + } + + /** + * + */ + private static class StoreFactory implements Factory<CacheStore> { + @Override public CacheStore create() { + return new TestStore(); + } + } + + /** + * Store. + */ + private static class TestStore extends CacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { + for (int i = 0; i < 10; i++) + clo.apply(i, i); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java new file mode 100644 index 0000000..7b97928 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * + */ +public class CacheContinuousQueryCounterPartitionedAtomicTest extends CacheContinuousQueryCounterAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java new file mode 100644 index 0000000..aa42832 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * + */ +public class CacheContinuousQueryCounterPartitionedTxTest extends CacheContinuousQueryCounterAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java new file mode 100644 index 0000000..afa7a22 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * + */ +public class CacheContinuousQueryCounterReplicatedAtomicTest extends CacheContinuousQueryCounterAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java new file mode 100644 index 0000000..4ee12de --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * + */ +public class CacheContinuousQueryCounterReplicatedTxTest extends CacheContinuousQueryCounterAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java index d9b2091..62ed66f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@ -18,7 +18,14 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -31,10 +38,13 @@ import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.CacheQueryEntryEvent; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.store.CacheStore; @@ -46,6 +56,9 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -58,6 +71,12 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * @@ -70,12 +89,15 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract private static final int NODES = 5; /** */ - private static final int KEYS = 10; + private static final int KEYS = 50; /** */ private static final int VALS = 10; /** */ + public static final int ITERATION_CNT = 100; + + /** */ private boolean client; /** {@inheritDoc} */ @@ -110,6 +132,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract /** * @throws Exception If failed. */ + public void testAtomicClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); + } + + /** + * @throws Exception If failed. + */ public void testAtomic() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, @@ -117,7 +152,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicAllNodes() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, ALL); } /** @@ -130,7 +178,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedAllNodes() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + ATOMIC, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, ALL); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + ATOMIC, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); } /** @@ -143,7 +217,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_VALUES, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesAllNodes() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_VALUES, + false); + + testContinuousQuery(ccfg, ALL); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_VALUES, + false); + + testContinuousQuery(ccfg, CLIENT); } /** @@ -156,7 +256,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_TIERED, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTieredAllNodes() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_TIERED, + false); + + testContinuousQuery(ccfg, ALL); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTieredClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); } /** @@ -169,7 +295,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoBackupsAllNodes() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + ATOMIC, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, ALL); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoBackupsClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + ATOMIC, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); } /** @@ -182,7 +334,59 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testTxAllNodes() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, ALL); + } + + /** + * @throws Exception If failed. + */ + public void testTxExplicit() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testTxClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); + } + + /** + * @throws Exception If failed. + */ + public void testTxClientExplicit() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); } /** @@ -195,7 +399,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); } /** @@ -208,7 +425,46 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_VALUES, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapValuesAllNodes() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_VALUES, + false); + + testContinuousQuery(ccfg, ALL); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapValuesExplicit() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_VALUES, + false); + + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapValuesClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_VALUES, + false); + + testContinuousQuery(ccfg, CLIENT); } /** @@ -221,7 +477,46 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract OFFHEAP_TIERED, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTieredAllNodes() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_TIERED, + false); + + testContinuousQuery(ccfg, ALL); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTieredClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTieredClientExplicit() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); } /** @@ -234,54 +529,141 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract ONHEAP_TIERED, false); - testContinuousQuery(ccfg); + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoBackupsAllNodes() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, ALL); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoBackupsExplicit() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, SERVER); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoBackupsClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg, CLIENT); } /** * @param ccfg Cache configuration. + * @param deploy The place where continuous query will be started. * @throws Exception If failed. */ - private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg) throws Exception { + private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) + throws Exception { ignite(0).createCache(ccfg); try { - IgniteCache<Object, Object> cache = ignite(NODES - 1).cache(ccfg.getName()); - long seed = System.currentTimeMillis(); Random rnd = new Random(seed); log.info("Random seed: " + seed); - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>(); - final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = - new ArrayBlockingQueue<>(10_000); + Collection<QueryCursor<?>> curs = new ArrayList<>(); - qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { - for (CacheEntryEvent<?, ?> evt : evts) { - // System.out.println("Event: " + evt); + if (deploy == CLIENT) { + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - evtsQueue.add(evt); + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); } - } - }); + }); + + evtsQueues.add(evtsQueue); + + QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry); + + curs.add(cur); + } + else if (deploy == SERVER) { + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }); + + evtsQueues.add(evtsQueue); + + QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry); + + curs.add(cur); + } + else { + for (int i = 0; i < NODES - 1; i++) { + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }); + + evtsQueues.add(evtsQueue); - QueryCursor<?> cur = cache.query(qry); + QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry); + + curs.add(cur); + } + } ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); + Map<Integer, Long> partCntr = new ConcurrentHashMap<>(); + try { - for (int i = 0; i < 1000; i++) { - if (i % 100 == 0) + for (int i = 0; i < ITERATION_CNT; i++) { + if (i % 20 == 0) log.info("Iteration: " + i); - randomUpdate(rnd, evtsQueue, expData, cache); + for (int idx = 0; idx < NODES; idx++) + randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName())); } } finally { - cur.close(); + for (QueryCursor<?> cur : curs) + cur.close(); } } finally { @@ -291,176 +673,389 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract /** * @param rnd Random generator. - * @param evtsQueue Events queue. + * @param evtsQueues Events queue. * @param expData Expected cache data. + * @param partCntr Partition counter. * @param cache Cache. * @throws Exception If failed. */ private void randomUpdate( Random rnd, - BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue, + List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, ConcurrentMap<Object, Object> expData, + Map<Integer, Long> partCntr, IgniteCache<Object, Object> cache) throws Exception { Object key = new QueryTestKey(rnd.nextInt(KEYS)); Object newVal = value(rnd); Object oldVal = expData.get(key); - int op = rnd.nextInt(11); + int op = rnd.nextInt(13); - // log.info("Random operation [key=" + key + ", op=" + op + ']'); + Ignite ignite = cache.unwrap(Ignite.class); - switch (op) { - case 0: { - cache.put(key, newVal); + Transaction tx = null; - waitEvent(evtsQueue, key, newVal, oldVal); + if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean()) + tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); - expData.put(key, newVal); + try { + // log.info("Random operation [key=" + key + ", op=" + op + ']'); - break; - } + switch (op) { + case 0: { + cache.put(key, newVal); - case 1: { - cache.getAndPut(key, newVal); + if (tx != null) + tx.commit(); - waitEvent(evtsQueue, key, newVal, oldVal); + updatePartitionCounter(cache, key, partCntr); - expData.put(key, newVal); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - break; - } + expData.put(key, newVal); - case 2: { - cache.remove(key); + break; + } - waitEvent(evtsQueue, key, null, oldVal); + case 1: { + cache.getAndPut(key, newVal); - expData.remove(key); + if (tx != null) + tx.commit(); - break; - } + updatePartitionCounter(cache, key, partCntr); - case 3: { - cache.getAndRemove(key); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - waitEvent(evtsQueue, key, null, oldVal); + expData.put(key, newVal); - expData.remove(key); + break; + } - break; - } + case 2: { + cache.remove(key); - case 4: { - cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + if (tx != null) + tx.commit(); - waitEvent(evtsQueue, key, newVal, oldVal); + updatePartitionCounter(cache, key, partCntr); - expData.put(key, newVal); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); - break; - } + expData.remove(key); - case 5: { - cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); + break; + } - waitEvent(evtsQueue, key, null, oldVal); + case 3: { + cache.getAndRemove(key); - expData.remove(key); + if (tx != null) + tx.commit(); - break; - } + updatePartitionCounter(cache, key, partCntr); - case 6: { - cache.putIfAbsent(key, newVal); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); - if (oldVal == null) { - waitEvent(evtsQueue, key, newVal, null); + expData.remove(key); - expData.put(key, newVal); + break; } - else - checkNoEvent(evtsQueue); - break; - } + case 4: { + cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); - case 7: { - cache.getAndPutIfAbsent(key, newVal); + updatePartitionCounter(cache, key, partCntr); - if (oldVal == null) { - waitEvent(evtsQueue, key, newVal, null); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); expData.put(key, newVal); + + break; } - else - checkNoEvent(evtsQueue); - break; - } + case 5: { + cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); - case 8: { - cache.replace(key, newVal); + if (tx != null) + tx.commit(); - if (oldVal != null) { - waitEvent(evtsQueue, key, newVal, oldVal); + updatePartitionCounter(cache, key, partCntr); - expData.put(key, newVal); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; } - else - checkNoEvent(evtsQueue); - break; - } + case 6: { + cache.putIfAbsent(key, newVal); - case 9: { - cache.getAndReplace(key, newVal); + if (tx != null) + tx.commit(); - if (oldVal != null) { - waitEvent(evtsQueue, key, newVal, oldVal); + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); - expData.put(key, newVal); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; } - else - checkNoEvent(evtsQueue); - break; - } + case 7: { + cache.getAndPutIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } - case 10: { - if (oldVal != null) { - Object replaceVal = value(rnd); + case 8: { + cache.replace(key, newVal); - boolean success = replaceVal.equals(oldVal); + if (tx != null) + tx.commit(); - if (success) { - cache.replace(key, replaceVal, newVal); + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); - waitEvent(evtsQueue, key, newVal, oldVal); + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); expData.put(key, newVal); } + else + checkNoEvent(evtsQueues); + + break; + } + + case 9: { + cache.getAndReplace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 10: { + if (oldVal != null) { + Object replaceVal = value(rnd); + + boolean success = replaceVal.equals(oldVal); + + if (success) { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(evtsQueues); + } + } else { - cache.replace(key, replaceVal, newVal); + cache.replace(key, value(rnd), newVal); + + if (tx != null) + tx.commit(); - checkNoEvent(evtsQueue); + checkNoEvent(evtsQueues); } + + break; } - else { - cache.replace(key, value(rnd), newVal); - checkNoEvent(evtsQueue); + case 11: { + SortedMap<Object, Object> vals = new TreeMap<>(); + + while (vals.size() < KEYS / 5) + vals.put(new QueryTestKey(rnd.nextInt(KEYS)), value(rnd)); + + cache.putAll(vals); + + if (tx != null) + tx.commit(); + + for (Map.Entry<Object, Object> e : vals.entrySet()) + updatePartitionCounter(cache, e.getKey(), partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData); + + expData.putAll(vals); + + break; } - break; + case 12: { + SortedMap<Object, Object> vals = new TreeMap<>(); + + while (vals.size() < KEYS / 5) + vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal); + + cache.invokeAll(vals.keySet(), new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + for (Map.Entry<Object, Object> e : vals.entrySet()) + updatePartitionCounter(cache, e.getKey(), partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData); + + for (Object o : vals.keySet()) + expData.put(o, newVal); + + break; + } + + default: + fail("Op:" + op); } + } finally { + if (tx != null) + tx.close(); + } + } + + /** + * @param evtsQueues Queue. + * @param partCntrs Counters. + * @param aff Affinity. + * @param vals Values. + * @param expData Expected data. + */ + private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, + Map<Integer, Long> partCntrs, + Affinity<Object> aff, + SortedMap<Object, Object> vals, + Map<Object, Object> expData) + throws Exception { + for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { + Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>(); + + for (int i = 0; i < vals.size(); i++) { + CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); + + rcvEvts.put(evt.getKey(), evt); + } + + assertEquals(vals.size(), rcvEvts.size()); + + for (Map.Entry<Object, Object> e : vals.entrySet()) { + Object key = e.getKey(); + Object val = e.getValue(); + Object oldVal = expData.get(key); + + if (val == null && oldVal == null) { + checkNoEvent(evtsQueues); - default: - fail(); + continue; + } + + CacheEntryEvent evt = rcvEvts.get(key); + + assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', + evt); + assertEquals(key, evt.getKey()); + assertEquals(val, evt.getValue()); + assertEquals(oldVal, evt.getOldValue()); + + long cntr = partCntrs.get(aff.partition(key)); + CacheQueryEntryEvent qryEntryEvt = (CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class); + + assertNotNull(cntr); + assertNotNull(qryEntryEvt); + + assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); + } } } /** + * @param rnd {@link Random}. + * @return {@link TransactionIsolation}. + */ + private TransactionIsolation txRandomIsolation(Random rnd) { + int val = rnd.nextInt(3); + + if (val == 0) + return READ_COMMITTED; + else if (val == 1) + return REPEATABLE_READ; + else + return SERIALIZABLE; + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionConcurrency}. + */ + private TransactionConcurrency txRandomConcurrency(Random rnd) { + return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC; + } + + /** + * @param cache Cache. + * @param key Key + * @param cntrs Partition counters. + */ + private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) { + Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName()); + + int part = aff.partition(key); + + Long partCntr = cntrs.get(part); + + if (partCntr == null) + partCntr = 0L; + + cntrs.put(part, ++partCntr); + } + + /** * @param rnd Random generator. * @return Cache value. */ @@ -469,38 +1064,55 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } /** - * @param evtsQueue Event queue. + * @param evtsQueues Event queue. + * @param partCntrs Partition counters. + * @param aff Affinity function. * @param key Key. * @param val Value. * @param oldVal Old value. * @throws Exception If failed. */ - private void waitEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue, - Object key, Object val, Object oldVal) throws Exception { + private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, + Map<Integer, Long> partCntrs, + Affinity<Object> aff, + Object key, + Object val, + Object oldVal) + throws Exception { if (val == null && oldVal == null) { - checkNoEvent(evtsQueue); + checkNoEvent(evtsQueues); return; } - CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); + for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { + CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); + + assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt); + assertEquals(key, evt.getKey()); + assertEquals(val, evt.getValue()); + assertEquals(oldVal, evt.getOldValue()); - assertNotNull("Failed to wait for event [key=" + key + - ", val=" + val + - ", oldVal=" + oldVal + ']', evt); - assertEquals(key, evt.getKey()); - assertEquals(val, evt.getValue()); - assertEquals(oldVal, evt.getOldValue()); + long cntr = partCntrs.get(aff.partition(key)); + CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class); + + assertNotNull(cntr); + assertNotNull(qryEntryEvt); + + assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); + } } /** - * @param evtsQueue Event queue. + * @param evtsQueues Event queue. * @throws Exception If failed. */ - private void checkNoEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue) throws Exception { - CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS); + private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception { + for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { + CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS); - assertNull(evt); + assertNull(evt); + } } /** @@ -564,7 +1176,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract /** * */ - static class QueryTestKey implements Serializable { + static class QueryTestKey implements Serializable, Comparable { /** */ private final Integer key; @@ -597,6 +1209,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract @Override public String toString() { return S.toString(QueryTestKey.class, this); } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } } /** @@ -644,6 +1261,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract return S.toString(QueryTestValue.class, this); } } + /** * */ @@ -681,4 +1299,10 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } } + /** + * + */ + protected enum ContinuousDeploy { + CLIENT, SERVER, ALL + } }
