Repository: ignite Updated Branches: refs/heads/ignite-2004 09b56444d -> 80b134e67
IGNITE-2004 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80b134e6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80b134e6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80b134e6 Branch: refs/heads/ignite-2004 Commit: 80b134e67942237400b17e79b27af5adf0c877fd Parents: 09b5644 Author: Tikhonov Nikolay <[email protected]> Authored: Tue Apr 5 21:50:16 2016 +0300 Committer: Tikhonov Nikolay <[email protected]> Committed: Tue Apr 5 21:50:16 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 9 +- .../cache/GridCacheUpdateAtomicResult.java | 7 - .../continuous/CacheContinuousQueryClosure.java | 6 +- .../continuous/CacheContinuousQueryEvent.java | 1 - .../continuous/CacheContinuousQueryHandler.java | 110 ++-- .../CacheContinuousQueryListener.java | 1 - .../thread/IgniteStripedThreadPoolExecutor.java | 11 +- ...eContinuousQueryAsyncFilterListenerTest.java | 392 +++++++++++--- .../CacheContinuousQueryDeadlockTest.java | 523 +++++++++++++++++++ ...usQueryFactoryFilterRandomOperationTest.java | 4 +- .../CacheContinuousQueryOrderingEventTest.java | 195 +++++-- ...ridCacheContinuousQueryAbstractSelfTest.java | 8 - .../IgniteCacheQuerySelfTestSuite3.java | 6 +- 13 files changed, 1078 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index c317e56..a0ae1c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; @@ -36,7 +35,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.eviction.EvictableEntry; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -49,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntry import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -2512,13 +2509,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Continuous query filter should be perform under lock. if (lsnrs != null) { - CacheObject evtVal = val; + CacheObject evtVal = updated; CacheObject evtOldVal = oldVal; if (isOffHeapValuesOnly()) { - evtVal = cctx.toCacheObject(cctx.unwrapTemporary(updated)); + evtVal = cctx.toCacheObject(cctx.unwrapTemporary(evtVal)); - evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(updated0)); + evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(evtOldVal)); } clsrs = cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal, http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index cbd9707..a96675b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -179,13 +179,6 @@ public class GridCacheUpdateAtomicResult { } /** - * @param clsrs Closures. - */ - private void continuousQueryClosures(List<CacheContinuousQueryClosure> clsrs) { - this.cntQryClsrs = clsrs; - } - - /** * @return Continuous query closures. */ public List<CacheContinuousQueryClosure> continuousQueryClosures() { http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java index f000b93..3fd9e57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java @@ -18,16 +18,16 @@ package org.apache.ignite.internal.processors.cache.query.continuous; /** - * + * Continuous query closure. */ public interface CacheContinuousQueryClosure extends Runnable { /** - * + * Callback for case when future completed successfully. */ public void onEntryUpdate(); /** - * + * Callback for case when future completed with error.. */ public void skipEvent(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index 2bfd53d..7b70290 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import javax.cache.Cache; import org.apache.ignite.cache.query.CacheQueryEntryEvent; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index b3d5028..fff8a92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -42,13 +42,11 @@ import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; -import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; -import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; @@ -80,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.NotNull; @@ -538,7 +537,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { + @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, final GridKernalContext ctx) { assert nodeId != null; assert routineId != null; assert objs != null; @@ -548,9 +547,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler final GridCacheContext cctx = cacheContext(ctx); - Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>(); + final Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>(); - List<PartitionRecovery> recoveries = new ArrayList<>(); + final List<PartitionRecovery> rcvs = new ArrayList<>(); try { for (CacheContinuousQueryEntry e : entries) { @@ -570,12 +569,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler try { e.unmarshal(cctx, ldr); - T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> evts = handleEvent(ctx, e); + if (!asyncCallback) { + T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> evts = handleEvent(ctx, e, false); - if (evts.get2() != null) - recoveries.add(evts.get2()); + if (evts.get2() != null) + rcvs.add(evts.get2()); - entries0.addAll(evts.get1()); + entries0.addAll(evts.get1()); + } } catch (IgniteCheckedException ex) { if (ignoreClsNotFound) @@ -587,51 +588,43 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - if (!entries0.isEmpty()) { - if (asyncCallback) { - Iterable<CacheContinuousQueryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, - new C1<CacheContinuousQueryEntry, CacheContinuousQueryEvent<? extends K, ? extends V>>() { - @Override public CacheContinuousQueryEvent<? extends K, ? extends V> apply( - CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<>(cache, cctx, e); - } - }, - new IgnitePredicate<CacheContinuousQueryEntry>() { - @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.isFiltered(); - } - } - ); + if (asyncCallback) { + for (final CacheContinuousQueryEntry e : entries) { + ctx.continuousQueryPool().execute(new Runnable() { + @Override public void run() { + T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> evts = + handleEvent(ctx, e, false); - for (final CacheContinuousQueryEvent<? extends K, ? extends V> e : evts) { - ctx.continuousQueryPool().execute(new Runnable() { - @Override public void run() { - locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends K, ? extends V>>singleton(e)); + for (CacheContinuousQueryEntry entry : evts.get1()) { + CacheContinuousQueryEvent evt = + new CacheContinuousQueryEvent<>(cache, cctx, entry); + + locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends K, ? extends V>> + singleton(evt)); } - }, e.partitionId()); - } + } + }, e.partition()); } - else { - Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, - new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { - @Override - public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<>(cache, cctx, e); - } - }, - new IgnitePredicate<CacheContinuousQueryEntry>() { - @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.isFiltered(); - } + } + else if (!entries0.isEmpty()) { + Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, + new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { + @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { + return new CacheContinuousQueryEvent<>(cache, cctx, e); + } + }, + new IgnitePredicate<CacheContinuousQueryEntry>() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.isFiltered(); } - ); + } + ); - locLsnr.onUpdated(evts); - } + locLsnr.onUpdated(evts); } } finally { - for (PartitionRecovery rec : recoveries) + for (PartitionRecovery rec : rcvs) rec.unlock(); } } @@ -639,10 +632,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** * @param ctx Context. * @param e entry. + * @param async Async. * @return Entry collection. */ private T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> handleEvent(GridKernalContext ctx, - CacheContinuousQueryEntry e) { + CacheContinuousQueryEntry e, boolean async) { assert e != null; if (internal) { @@ -658,7 +652,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition()); - return new T2<>(rec.collectEntries(e), rec); + return new T2<>(rec.collectEntries(e, async), async ? null : rec); } /** @@ -788,8 +782,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param entry Cache continuous query entry. * @return Collection entries which will be fired. */ - public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) { - lock.lock(); + public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry, boolean async) { + if (!async) + lock.lock(); try { assert entry != null; @@ -893,7 +888,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return entries; } catch (Exception e) { - lock.unlock(); + if (!async) + lock.unlock(); throw new IgniteException("Failed to collect entries."); } @@ -1352,7 +1348,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler filter(); if (fireEvent || waitIfAsync()) - onEntryUpdate(); + onEntryUpdate0(); } /** @@ -1398,12 +1394,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return; } + onEntryUpdate0(); + } + + /** + * + */ + private void onEntryUpdate0() { try { final CacheContinuousQueryEntry entry = evt.entry(); if (loc) { if (!locCache) { - T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> events = handleEvent(ctx, entry); + T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> events = + handleEvent(ctx, entry, asyncCallback); try { Collection<CacheContinuousQueryEntry> entries = events.get1(); http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index bf1d4a4..3aefafe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.Map; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java index 4cc6416..44ea823 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java @@ -69,7 +69,10 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { ThreadFactory factory = new IgniteThreadFactory(gridName, threadNamePrefix); for (int i = 0; i < concurrentLvl; i++) - execs[i] = Executors.newFixedThreadPool(poolSize, factory); + if (poolSize == 1) + execs[i] = Executors.newSingleThreadExecutor(factory); + else + execs[i] = Executors.newFixedThreadPool(poolSize, factory); // Find power-of-two sizes best matching arguments int sshift = 0; @@ -161,8 +164,8 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { public void execute(Runnable task, int idx) { if (idx < execs.length) execs[idx].execute(task); - - execs[idx % execs.length].execute(task); + else + execs[idx % execs.length].execute(task); } /** {@inheritDoc} */ @@ -248,7 +251,7 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { private <T> ExecutorService execForTask(T cmd) { assert cmd != null; - return execs[(hash(cmd.hashCode()) >>> segShift) & segMask]; + return execs[(hash(System.identityHashCode(cmd)) >>> segShift) & segMask]; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java index 7958ac3..6780c18 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java @@ -18,20 +18,28 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.io.Serializable; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import javax.cache.configuration.FactoryBuilder; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; @@ -46,6 +54,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; @@ -350,20 +359,27 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr public void testNonDeadLockInListener(CacheConfiguration ccfg, final boolean asyncFilter, boolean asyncListener) throws Exception { - final IgniteCache cache = ignite(0).createCache(ccfg); + ignite(0).createCache(ccfg); - try { - final QueryTestKey key = affinityKey(cache); - - final QueryTestValue val0 = new QueryTestValue(1); - final QueryTestValue newVal = new QueryTestValue(2); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + try { for (int i = 0; i < ITERATION_CNT; i++) { log.info("Start iteration: " + i); + int nodeIdx = i % NODES; + + final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName()); + + final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1); IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr = new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { @@ -381,41 +397,48 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr = new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { - @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue> e) { - IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); - QueryTestValue val = e.getValue(); + QueryTestValue val = e.getValue(); - if (val == null || !val.equals(new QueryTestValue(1))) - return; + if (val == null) + return; + else if (val.equals(newVal)) { + evtFromLsnrLatch.countDown(); - Transaction tx = null; + return; + } + else if (!val.equals(val0)) + return; - try { - if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) - tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + Transaction tx = null; - assertEquals(val, val0); + try { + if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); - cache0.put(key, newVal); + assertEquals(val, val0); - if (tx != null) - tx.commit(); + cache0.put(key, newVal); - latch.countDown(); - } - catch (Exception exp) { - log.error("Failed: ", exp); + if (tx != null) + tx.commit(); - throw new IgniteException(exp); - } - finally { - if (tx != null) - tx.close(); + latch.countDown(); + } + catch (Exception exp) { + log.error("Failed: ", exp); + + throw new IgniteException(exp); + } + finally { + if (tx != null) + tx.close(); + } } - } - }; + }; if (asyncListener) conQry.setLocalListener(new CacheInvokeListenerAsync(lsnrClsr)); @@ -428,11 +451,23 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr))); try (QueryCursor qry = cache.query(conQry)) { - cache.put(key, val0); + if (rnd.nextBoolean()) + cache.put(key, val0); + else + cache.invoke(key, new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { + entry.setValue(val0); + + return null; + } + }); - assert U.await(latch, 3, SECONDS) : "Failed to waiting event."; + assertTrue("Failed to waiting event.", U.await(latch, 3, SECONDS)); assertEquals(cache.get(key), new QueryTestValue(2)); + + assertTrue("Failed to waiting event from listener.", U.await(latch, 3, SECONDS)); } log.info("Iteration finished: " + i); @@ -444,77 +479,270 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr } /** + * @throws Exception If failed. + */ + public void testDeadLockInListenerAtomic() throws Exception { + testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void testDeadLockInListener(CacheConfiguration ccfg) throws Exception { + ignite(0).createCache(ccfg); + + final IgniteCache cache = grid(0).cache(ccfg.getName()); + + final QueryTestKey key = affinityKey(cache); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); + + final CountDownLatch latch = new CountDownLatch(1); + + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr = + new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); + + QueryTestValue val = e.getValue(); + + if (val == null || !val.equals(val0)) + return; + + Transaction tx = null; + + try { + if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + assertEquals(val, val0); + + latch.countDown(); + + cache0.put(key, newVal); + + if (tx != null) + tx.commit(); + } + catch (Exception exp) { + log.error("Failed: ", exp); + + throw new IgniteException(exp); + } + finally { + if (tx != null) + tx.close(); + } + } + }; + + conQry.setLocalListener(new CacheInvokeListener(lsnrClsr)); + + try (QueryCursor qry = cache.query(conQry)) { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + cache.put(key, val0); + + return null; + } + }); + + f.get(1, SECONDS); + + return null; + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS)); + } + } + + /** + * @throws Exception If failed. + */ + public void testDeadLockInFilterAtomic() throws Exception { + testDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void testDeadLockInFilter(CacheConfiguration ccfg) throws Exception { + ignite(0).createCache(ccfg); + + final IgniteCache cache = grid(0).cache(ccfg.getName()); + + final QueryTestKey key = affinityKey(cache); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); + + final CountDownLatch latch = new CountDownLatch(1); + + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr = + new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); + + QueryTestValue val = e.getValue(); + + if (val == null || !val.equals(val0)) + return; + + Transaction tx = null; + + try { + if (cache0.getConfiguration(CacheConfiguration.class) + .getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + assertEquals(val, val0); + + latch.countDown(); + + cache0.put(key, newVal); + + if (tx != null) + tx.commit(); + } + catch (Exception exp) { + log.error("Failed: ", exp); + + throw new IgniteException(exp); + } + finally { + if (tx != null) + tx.close(); + } + } + }; + + conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr))); + + conQry.setLocalListener(new CacheInvokeListener( + new CI2<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, + CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) { + // No-op. + } + })); + + try (QueryCursor qry = cache.query(conQry)) { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + cache.put(key, val0); + + return null; + } + }); + + f.get(1, SECONDS); + + return null; + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS)); + } + } + + /** * @param ccfg Cache configuration. * @param asyncFilter Async filter. * @param asyncListener Async listener. * @throws Exception If failed. */ - public void testNonDeadLockInFilter(CacheConfiguration ccfg, + private void testNonDeadLockInFilter(CacheConfiguration ccfg, final boolean asyncFilter, final boolean asyncListener) throws Exception { - final IgniteCache cache = ignite(0).createCache(ccfg); + ignite(0).createCache(ccfg); - try { - final QueryTestKey key = affinityKey(cache); - - final QueryTestValue val0 = new QueryTestValue(1); - final QueryTestValue newVal = new QueryTestValue(2); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + try { for (int i = 0; i < ITERATION_CNT; i++) { log.info("Start iteration: " + i); + int nodeIdx = i % NODES; + + final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName()); + + final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1); IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr = new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { - @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue> e) { - if (asyncFilter) { - assertFalse("Failed: " + Thread.currentThread().getName(), - Thread.currentThread().getName().contains("sys-")); - - assertTrue("Failed: " + Thread.currentThread().getName(), - Thread.currentThread().getName().contains("contQry-")); - } + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + if (asyncFilter) { + assertFalse("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + assertTrue("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("contQry-")); + } + IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); - IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); + QueryTestValue val = e.getValue(); - QueryTestValue val = e.getValue(); + if (val == null) + return; + else if (val.equals(newVal)) { + evtFromLsnrLatch.countDown(); - if (val == null || !val.equals(new QueryTestValue(1))) - return; + return; + } + else if (!val.equals(val0)) + return; - Transaction tx = null; + Transaction tx = null; - try { - if (cache0.getConfiguration(CacheConfiguration.class) - .getAtomicityMode() == TRANSACTIONAL) - tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + try { + if (cache0.getConfiguration(CacheConfiguration.class) + .getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); - assertEquals(val, val0); + assertEquals(val, val0); - cache0.put(key, newVal); + cache0.put(key, newVal); - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); - latch.countDown(); - } - catch (Exception exp) { - log.error("Failed: ", exp); + latch.countDown(); + } + catch (Exception exp) { + log.error("Failed: ", exp); - throw new IgniteException(exp); - } - finally { - if (tx != null) - tx.close(); + throw new IgniteException(exp); + } + finally { + if (tx != null) + tx.close(); + } } - } - }; + }; IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr = new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { @@ -550,11 +778,23 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr conQry.setLocalListener(new CacheInvokeListener(lsnrClsr)); try (QueryCursor qry = cache.query(conQry)) { - cache.put(key, val0); + if (rnd.nextBoolean()) + cache.put(key, val0); + else + cache.invoke(key, new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { + entry.setValue(val0); + + return null; + } + }); assert U.await(latch, 3, SECONDS) : "Failed to waiting event."; assertEquals(cache.get(key), new QueryTestValue(2)); + + assertTrue("Failed to waiting event from filter.", U.await(latch, 3, SECONDS)); } log.info("Iteration finished: " + i); @@ -584,7 +824,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr /** {@inheritDoc} */ @Override protected long getTestTimeout() { - return TimeUnit.SECONDS.toMillis(10); + return TimeUnit.SECONDS.toMillis(15); } /** @@ -769,7 +1009,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr if (o == null || getClass() != o.getClass()) return false; - QueryTestValue that = (QueryTestValue) o; + QueryTestValue that = (QueryTestValue)o; return val1.equals(that.val1) && val2.equals(that.val2); } http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java new file mode 100644 index 0000000..59d5382 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class CacheContinuousQueryDeadlockTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(1000); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testDeadLockInListenerAtomic() throws Exception { + testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED)); + } + + /** + * @throws Exception If failed. + */ + public void testDeadLockInListenerAtomicWithOffheap() throws Exception { + testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED)); + } + + /** + * @throws Exception If failed. + */ + public void testDeadLockInListenerAtomicWithOffheapValues() throws Exception { + testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_VALUES)); + } + + /** + * @throws Exception If failed. + */ + public void testDeadLockInListenerReplicatedAtomic() throws Exception { + testDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void testDeadLockInListener(CacheConfiguration ccfg) throws Exception { + ignite(0).createCache(ccfg); + + final IgniteCache cache = grid(0).cache(ccfg.getName()); + + final QueryTestKey key = affinityKey(cache); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); + + final CountDownLatch latch = new CountDownLatch(1); + + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr = + new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); + + QueryTestValue val = e.getValue(); + + if (val == null || !val.equals(val0)) + return; + + Transaction tx = null; + + try { + if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + assertEquals(val, val0); + + latch.countDown(); + + cache0.put(key, newVal); + + if (tx != null) + tx.commit(); + } + catch (Exception exp) { + log.error("Failed: ", exp); + + throw new IgniteException(exp); + } + } + }; + + conQry.setLocalListener(new CacheInvokeListener(lsnrClsr)); + + try (QueryCursor qry = cache.query(conQry)) { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + cache.put(key, val0); + + return null; + } + }); + + f.get(1, SECONDS); + + return null; + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS)); + } + } + + /** + * @throws Exception If failed. + */ + public void testDeadLockInFilterAtomic() throws Exception { + testDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED)); + } + + /** + * @throws Exception If failed. + */ + public void testDeadLockInFilterAtomicOffheapValues() throws Exception { + testDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_VALUES)); + } + + /** + * @throws Exception If failed. + */ + public void testDeadLockInFilterReplicated() throws Exception { + testDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void testDeadLockInFilter(CacheConfiguration ccfg) throws Exception { + ignite(0).createCache(ccfg); + + final IgniteCache cache = grid(0).cache(ccfg.getName()); + + final QueryTestKey key = affinityKey(cache); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); + + final CountDownLatch latch = new CountDownLatch(1); + + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr = + new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); + + QueryTestValue val = e.getValue(); + + if (val == null || !val.equals(val0)) + return; + + Transaction tx = null; + + try { + if (cache0.getConfiguration(CacheConfiguration.class) + .getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + assertEquals(val, val0); + + latch.countDown(); + + cache0.put(key, newVal); + + if (tx != null) + tx.commit(); + } + catch (Exception exp) { + log.error("Failed: ", exp); + + throw new IgniteException(exp); + } + finally { + if (tx != null) + tx.close(); + } + } + }; + + conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr))); + + conQry.setLocalListener(new CacheInvokeListener( + new CI2<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, + CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) { + // No-op. + } + })); + + try (QueryCursor qry = cache.query(conQry)) { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + cache.put(key, val0); + + return null; + } + }); + + f.get(1, SECONDS); + + return null; + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS)); + } + } + + /** + * @param cache Ignite cache. + * @return Key. + */ + private QueryTestKey affinityKey(IgniteCache cache) { + Affinity aff = affinity(cache); + + for (int i = 0; i < 10_000; i++) { + QueryTestKey key = new QueryTestKey(i); + + if (aff.isPrimary(localNode(cache), key)) + return key; + } + + throw new IgniteException("Failed to found primary key."); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TimeUnit.SECONDS.toMillis(30); + } + + /** + * + */ + private static class CacheTestRemoteFilter implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr; + + /** + * @param clsr Closure. + */ + public CacheTestRemoteFilter(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> clsr) { + this.clsr = clsr; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) + throws CacheEntryListenerException { + clsr.apply(ignite, e); + + return true; + } + } + + /** + * + */ + private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> { + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr; + + /** + * @param clsr Closure. + */ + public CacheInvokeListener(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> clsr) { + this.clsr = clsr; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> events) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) + clsr.apply(ignite, e); + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue)o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java index ec6ed4a..9800b56 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java @@ -266,7 +266,7 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC Map<Integer, Long> partCntr, IgniteCache<Object, Object> cache) throws Exception { - Object key = rnd.nextInt(KEYS); + Object key = new QueryTestKey(rnd.nextInt(KEYS)); Object newVal = value(rnd); Object oldVal = expData.get(key); @@ -280,7 +280,7 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); try { - log.info("Random operation [key=" + key + ", op=" + op + ']'); + // log.info("Random operation [key=" + key + ", op=" + op + ']'); switch (op) { case 0: { http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java index c42533e..e728b91 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java @@ -24,9 +24,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.FactoryBuilder; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; @@ -39,7 +41,6 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; @@ -48,7 +49,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -68,7 +69,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; */ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTest { /** */ - public static final int LISTENER_CNT = 20; + public static final int LISTENER_CNT = 3; /** */ public static final int KEYS = 10; @@ -85,6 +86,9 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes /** */ private boolean client; + /** */ + private static volatile boolean fail; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -94,7 +98,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes cfg.setClientMode(client); MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); - storeSpi.setExpireCount(1000); + storeSpi.setExpireCount(100); cfg.setEventStorageSpi(storeSpi); @@ -119,6 +123,13 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes super.afterTestsStopped(); } + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + fail = false; + } + /** * @throws Exception If failed. */ @@ -152,7 +163,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes /** * @throws Exception If failed. */ - public void testAtomicReplicared() throws Exception { + public void testAtomicReplicated() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC, CacheMemoryMode.ONHEAP_TIERED); @@ -162,7 +173,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes /** * @throws Exception If failed. */ - public void testAtomicReplicaredOffheap() throws Exception { + public void testAtomicReplicatedOffheap() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC, CacheMemoryMode.OFFHEAP_TIERED); @@ -199,6 +210,88 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes doOrderingTest(ccfg, false); } + // ASYNC + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTwoBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.OFFHEAP_TIERED); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesTwoBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.OFFHEAP_VALUES); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedOffheapAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.OFFHEAP_TIERED); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapWithoutBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapTwoBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.TRANSACTIONAL, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.TRANSACTIONAL, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, true); + } + /** * @param ccfg Cache configuration. * @param async Async filter. @@ -214,28 +307,40 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes try { List<BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>> rcvdEvts = - new ArrayList<>(LISTENER_CNT); + new ArrayList<>(LISTENER_CNT * NODES); final AtomicInteger qryCntr = new AtomicInteger(0); - final int threadCnt = LISTENER_CNT; + final int threadCnt = 20; - for (int i = 0; i < LISTENER_CNT; i++) { - BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue = - new ArrayBlockingQueue<>(ITERATION_CNT * threadCnt); + for (int idx = 0; idx < NODES; idx++) { + for (int i = 0; i < LISTENER_CNT; i++) { + BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue = + new ArrayBlockingQueue<>(ITERATION_CNT * threadCnt); - ContinuousQuery qry = new ContinuousQuery(); + ContinuousQuery qry = new ContinuousQuery(); - qry.setLocalListener(new TestCacheEventListener(queue, qryCntr)); + if (async) { + qry.setLocalListener(new TestCacheAsyncEventListener(queue, qryCntr)); - rcvdEvts.add(queue); + qry.setRemoteFilterFactory(FactoryBuilder.factoryOf( + new CacheTestRemoteFilterAsync(ccfg.getName()))); + } + else { + qry.setLocalListener(new TestCacheEventListener(queue, qryCntr)); - IgniteCache<Object, Object> cache = - grid(ThreadLocalRandom.current().nextInt(NODES)).cache(ccfg.getName()); + qry.setRemoteFilterFactory(FactoryBuilder.factoryOf( + new CacheTestRemoteFilter(ccfg.getName()))); + } + + rcvdEvts.add(queue); + + IgniteCache<Object, Object> cache = grid(idx).cache(ccfg.getName()); - QueryCursor qryCursor = cache.query(qry); + QueryCursor qryCursor = cache.query(qry); - qries.add(qryCursor); + qries.add(qryCursor); + } } IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() { @@ -297,12 +402,14 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return qryCntr.get() >= ITERATION_CNT * threadCnt * LISTENER_CNT; + return qryCntr.get() >= ITERATION_CNT * threadCnt * LISTENER_CNT * NODES; } }, 1000L); for (BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue : rcvdEvts) checkEvents(queue, ITERATION_CNT * threadCnt); + + assertFalse("Ordering invocations of filter broken.", fail); } finally { for (QueryCursor<?> qry : qries) @@ -332,7 +439,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes assertEquals(new QueryTestValue(0), evt.getValue()); else { if (!new QueryTestValue(preVal + 1).equals(evt.getValue())) - assertEquals(new QueryTestValue(preVal + 1), evt.getValue()); + assertEquals("Key event: " + evt.getKey(), new QueryTestValue(preVal + 1), evt.getValue()); } vals.put(evt.getKey(), evt.getValue().val1); @@ -354,11 +461,10 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes @IgniteAsyncCallback private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter { /** - * @param clsr Closure. + * @param cacheName Cache name. */ - public CacheTestRemoteFilterAsync( - IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) { - super(clsr); + public CacheTestRemoteFilterAsync(String cacheName) { + super(cacheName); } } @@ -368,24 +474,33 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes private static class CacheTestRemoteFilter implements CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> { /** */ + private Map<QueryTestKey, QueryTestValue> prevVals = new ConcurrentHashMap<>(); + + /** */ @IgniteInstanceResource private Ignite ignite; /** */ - private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr; + private String cacheName; /** - * @param clsr Closure. + * @param cacheName Cache name. */ - public CacheTestRemoteFilter(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> clsr) { - this.clsr = clsr; + public CacheTestRemoteFilter(String cacheName) { + this.cacheName = cacheName; } /** {@inheritDoc} */ @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) throws CacheEntryListenerException { - clsr.apply(ignite, e); + if (affinity(ignite.cache(cacheName)).isPrimary(ignite.cluster().localNode(), e.getKey())) { + QueryTestValue prevVal = prevVals.put(e.getKey(), e.getValue()); + + if (prevVal != null) { + if (!new QueryTestValue(prevVal.val1 + 1).equals(e.getValue())) + fail = true; + } + } return true; } @@ -395,12 +510,12 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes * */ @IgniteAsyncCallback - private static class TestCacheEventListenerAsync extends TestCacheEventListener { + private static class TestCacheAsyncEventListener extends TestCacheEventListener { /** * @param queue Queue. * @param cntr Received events counter. */ - public TestCacheEventListenerAsync(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue, + public TestCacheAsyncEventListener(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue, AtomicInteger cntr) { super(queue, cntr); } @@ -427,13 +542,27 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes } /** {@inheritDoc} */ - @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> events) throws CacheEntryListenerException { + Integer prevVal = null; + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) { + if (prevVal == null) + prevVal = e.getValue().val1; + queue.add((CacheEntryEvent<QueryTestKey, QueryTestValue>)e); cntr.incrementAndGet(); + + if (prevVal > e.getValue().val1) { + int z = 0; + + ++z; + } + else + prevVal = e.getValue().val1; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index fc101d4..dbe282e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.integration.CacheWriterException; import org.apache.ignite.Ignite; @@ -312,13 +311,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } }); - qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, Integer>() { - @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event) - throws CacheEntryListenerException { - return true; - } - }); - try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { cache.put(1, 1); cache.put(2, 2); http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index 912c8f9..b7d9daf 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -21,9 +21,11 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryDeadlockTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOrderingEventTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; @@ -91,7 +93,9 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class); suite.addTestSuite(CacheContinuousQueryAsyncFilterListenerTest.class); suite.addTestSuite(CacheContinuousQueryFactoryFilterRandomOperationTest.class); - suite.addTestSuite(CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.class);; + suite.addTestSuite(CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.class); + suite.addTestSuite(CacheContinuousQueryDeadlockTest.class); + suite.addTestSuite(CacheContinuousQueryOrderingEventTest.class); suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class); suite.addTestSuite(CacheContinuousBatchAckTest.class); suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
