Repository: ignite Updated Branches: refs/heads/ignite-2004 09afb1453 -> 07d62cea1
IGNITE-2004 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07d62cea Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07d62cea Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07d62cea Branch: refs/heads/ignite-2004 Commit: 07d62cea1f52a803993b36eebc137f026a68c1dd Parents: 09afb14 Author: nikolay_tikhonov <[email protected]> Authored: Tue Apr 12 19:37:09 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Apr 12 19:37:09 2016 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 32 ++- .../continuous/CacheContinuousQueryManager.java | 11 +- ...eContinuousQueryAsyncFilterListenerTest.java | 265 ++++++++++++++----- 3 files changed, 236 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/07d62cea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 1caab3a..ed39a17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter; import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; @@ -301,19 +302,34 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert routineId != null; assert ctx != null; - if (locLsnr != null) - ctx.resource().injectGeneric(locLsnr); + if (locLsnr != null) { + if (locLsnr instanceof JCacheQueryLocalListener) { + ctx.resource().injectGeneric(((JCacheQueryLocalListener)locLsnr).impl); - final CacheEntryEventFilter filter = getEventFilter(); + asyncCallback = ((JCacheQueryLocalListener)locLsnr).async(); + } + else { + ctx.resource().injectGeneric(locLsnr); - asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class); + asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class); + } + } + + final CacheEntryEventFilter filter = getEventFilter(); if (filter != null) { - ctx.resource().injectGeneric(filter); + if (filter instanceof JCacheQueryRemoteFilter) { + ctx.resource().injectGeneric(((JCacheQueryRemoteFilter)filter).impl); - if (!asyncCallback) - asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class) - || (filter instanceof JCacheQueryRemoteFilter && ((JCacheQueryRemoteFilter)filter).async()); + if (!asyncCallback) + asyncCallback = ((JCacheQueryRemoteFilter)filter).async(); + } + else { + ctx.resource().injectGeneric(filter); + + if (!asyncCallback) + asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class); + } } entryBufs = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/07d62cea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 48b2546..b283935 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -924,9 +924,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * */ - private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> { + static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> { /** */ - private final CacheEntryListener<K, V> impl; + final CacheEntryListener<K, V> impl; /** */ private final IgniteLogger log; @@ -1002,6 +1002,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { return evts; } + + /** + * @return {@code True} if listener should be executed in non-system thread. + */ + protected boolean async() { + return U.hasAnnotation(impl, IgniteAsyncCallback.class); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/07d62cea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java index 5f7232a..b1ed6cd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java @@ -22,6 +22,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; @@ -124,77 +126,119 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr * @throws Exception If failed. */ public void testNonDeadLockInListenerTx() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true); } /** * @throws Exception If failed. */ public void testNonDeadLockInListenerTxOffHeap() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxOffHeapJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true); } /** * @throws Exception If failed. */ public void testNonDeadLockInListenerTxOffHeapValues() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInListenerAtomic() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true); } /** * @throws Exception If failed. */ public void testNonDeadLockInListenerReplicatedAtomic() throws Exception { - testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedAtomicJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, true); } /** * @throws Exception If failed. */ public void testNonDeadLockInListenerReplicatedAtomicOffHeapValues() throws Exception { - testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInListenerAtomicOffHeap() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInListenerAtomicOffHeapValues() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInListenerAtomicWithoutBackup() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicWithoutBackupJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, true); } /** * @throws Exception If failed. */ public void testNonDeadLockInListener() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInListenerReplicated() throws Exception { - testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true); + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true); } /// @@ -205,70 +249,105 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr * @throws Exception If failed. */ public void testNonDeadLockInFilterTx() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true); + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterTxOffHeap() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true); + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeapJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterTxOffHeapValues() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true); + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterAtomic() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true); + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterReplicatedAtomic() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true); + testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterAtomicOffHeap() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true); + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeapJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, true); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterAtomicOffHeapValues() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true); + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterAtomicWithoutBackup() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true); + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilter() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true); + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterReplicated() throws Exception { - testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true); + testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicatedJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); } /// @@ -279,81 +358,83 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr * @throws Exception If failed. */ public void testNonDeadLockInFilterTxSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterTxOffHeapSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), false, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), false, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterTxOffHeapValuesSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), false, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), false, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterAtomicSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), false, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), false, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterReplicatedAtomicSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), false, true); + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), false, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterAtomicOffHeapSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterAtomicOffHeapValuesSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterAtomicWithoutBackupSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), false, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), false, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true); + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false); } /** * @throws Exception If failed. */ public void testNonDeadLockInFilterReplicatedSyncFilter() throws Exception { - testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true); + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false); } /** * @param ccfg Cache configuration. - * @param asyncFilter Async filter. - * @param asyncListener Async listener. + * @param asyncFltr Async filter. + * @param asyncLsnr Async listener. + * @param jcacheApi Use JCache api for registration entry update listener. * @throws Exception If failed. */ - public void testNonDeadLockInListener(CacheConfiguration ccfg, - final boolean asyncFilter, - boolean asyncListener) throws Exception { + private void testNonDeadLockInListener(CacheConfiguration ccfg, + final boolean asyncFltr, + boolean asyncLsnr, + boolean jcacheApi) throws Exception { ignite(0).createCache(ccfg); ThreadLocalRandom rnd = ThreadLocalRandom.current(); @@ -371,8 +452,6 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr final QueryTestValue val0 = new QueryTestValue(1); final QueryTestValue newVal = new QueryTestValue(2); - ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); - final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1); @@ -380,7 +459,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) { - if (asyncFilter) { + if (asyncFltr) { assertFalse("Failed: " + Thread.currentThread().getName(), Thread.currentThread().getName().contains("sys-")); @@ -435,20 +514,39 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr } }; - if (asyncListener) - conQry.setLocalListener(new CacheInvokeListenerAsync(lsnrClsr)); - else - conQry.setLocalListener(new CacheInvokeListener(lsnrClsr)); + QueryCursor qry = null; + MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = null; + + CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr) + : new CacheInvokeListener(lsnrClsr); + + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> rmtFltr = asyncFltr ? + new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr); + + if (jcacheApi) { + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(locLsnr), + FactoryBuilder.factoryOf(rmtFltr), + true, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + } + else { + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); + + conQry.setLocalListener(locLsnr); - if (asyncFilter) - conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(fltrClsr))); - else - conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr))); + conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr)); - try (QueryCursor qry = cache.query(conQry)) { + qry = cache.query(conQry); + } + + try { if (rnd.nextBoolean()) cache.put(key, val0); - else + else { cache.invoke(key, new CacheEntryProcessor() { @Override public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException { @@ -457,6 +555,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr return null; } }); + } assertTrue("Failed to waiting event.", U.await(latch, 3, SECONDS)); @@ -464,6 +563,13 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr assertTrue("Failed to waiting event from listener.", U.await(latch, 3, SECONDS)); } + finally { + if (qry != null) + qry.close(); + + if (lsnrCfg != null) + cache.deregisterCacheEntryListener(lsnrCfg); + } log.info("Iteration finished: " + i); } @@ -477,11 +583,13 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr * @param ccfg Cache configuration. * @param asyncFilter Async filter. * @param asyncLsnr Async listener. + * @param jcacheApi Use JCache api for start update listener. * @throws Exception If failed. */ private void testNonDeadLockInFilter(CacheConfiguration ccfg, final boolean asyncFilter, - final boolean asyncLsnr) throws Exception { + final boolean asyncLsnr, + boolean jcacheApi) throws Exception { ignite(0).createCache(ccfg); ThreadLocalRandom rnd = ThreadLocalRandom.current(); @@ -499,8 +607,6 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr final QueryTestValue val0 = new QueryTestValue(1); final QueryTestValue newVal = new QueryTestValue(2); - ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); - final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1); @@ -581,17 +687,37 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr } }; - if (asyncFilter) - conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(fltrClsr))); - else - conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr))); - if (asyncLsnr) - conQry.setLocalListener(new CacheInvokeListenerAsync(lsnrClsr)); - else - conQry.setLocalListener(new CacheInvokeListener(lsnrClsr)); + QueryCursor qry = null; + MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = null; + + CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr) + : new CacheInvokeListener(lsnrClsr); + + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> rmtFltr = asyncFilter ? + new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr); + + if (jcacheApi) { + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(locLsnr), + FactoryBuilder.factoryOf(rmtFltr), + true, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + } + else { + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); + + conQry.setLocalListener(locLsnr); + + conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr)); + + qry = cache.query(conQry); + } - try (QueryCursor qry = cache.query(conQry)) { + try { if (rnd.nextBoolean()) cache.put(key, val0); else @@ -610,6 +736,13 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr assertTrue("Failed to waiting event from filter.", U.await(latch, 3, SECONDS)); } + finally { + if (qry != null) + qry.close(); + + if (lsnrCfg != null) + cache.deregisterCacheEntryListener(lsnrCfg); + } log.info("Iteration finished: " + i); } @@ -701,7 +834,8 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr /** * */ - private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> { + private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>, + CacheEntryCreatedListener<QueryTestKey, QueryTestValue>, Serializable { @IgniteInstanceResource private Ignite ignite; @@ -723,6 +857,13 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) clsr.apply(ignite, e); } + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> events) throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) + clsr.apply(ignite, e); + } } /**
