Repository: ignite
Updated Branches:
  refs/heads/ignite-2004 09afb1453 -> 07d62cea1


IGNITE-2004 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07d62cea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07d62cea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07d62cea

Branch: refs/heads/ignite-2004
Commit: 07d62cea1f52a803993b36eebc137f026a68c1dd
Parents: 09afb14
Author: nikolay_tikhonov <[email protected]>
Authored: Tue Apr 12 19:37:09 2016 +0300
Committer: nikolay_tikhonov <[email protected]>
Committed: Tue Apr 12 19:37:09 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java |  32 ++-
 .../continuous/CacheContinuousQueryManager.java |  11 +-
 ...eContinuousQueryAsyncFilterListenerTest.java | 265 ++++++++++++++-----
 3 files changed, 236 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/07d62cea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 1caab3a..ed39a17 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -61,6 +61,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import 
org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
@@ -301,19 +302,34 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         assert routineId != null;
         assert ctx != null;
 
-        if (locLsnr != null)
-            ctx.resource().injectGeneric(locLsnr);
+        if (locLsnr != null) {
+            if (locLsnr instanceof JCacheQueryLocalListener) {
+                
ctx.resource().injectGeneric(((JCacheQueryLocalListener)locLsnr).impl);
 
-        final CacheEntryEventFilter filter = getEventFilter();
+                asyncCallback = ((JCacheQueryLocalListener)locLsnr).async();
+            }
+            else {
+                ctx.resource().injectGeneric(locLsnr);
 
-        asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class);
+                asyncCallback = U.hasAnnotation(locLsnr, 
IgniteAsyncCallback.class);
+            }
+        }
+
+        final CacheEntryEventFilter filter = getEventFilter();
 
         if (filter != null) {
-            ctx.resource().injectGeneric(filter);
+            if (filter instanceof JCacheQueryRemoteFilter) {
+                
ctx.resource().injectGeneric(((JCacheQueryRemoteFilter)filter).impl);
 
-            if (!asyncCallback)
-                asyncCallback = U.hasAnnotation(filter, 
IgniteAsyncCallback.class)
-                    || (filter instanceof JCacheQueryRemoteFilter && 
((JCacheQueryRemoteFilter)filter).async());
+                if (!asyncCallback)
+                    asyncCallback = ((JCacheQueryRemoteFilter)filter).async();
+            }
+            else {
+                ctx.resource().injectGeneric(filter);
+
+                if (!asyncCallback)
+                    asyncCallback = U.hasAnnotation(filter, 
IgniteAsyncCallback.class);
+            }
         }
 
         entryBufs = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/07d62cea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 48b2546..b283935 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -924,9 +924,9 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
     /**
      *
      */
-    private static class JCacheQueryLocalListener<K, V> implements 
CacheEntryUpdatedListener<K, V> {
+    static class JCacheQueryLocalListener<K, V> implements 
CacheEntryUpdatedListener<K, V> {
         /** */
-        private final CacheEntryListener<K, V> impl;
+        final CacheEntryListener<K, V> impl;
 
         /** */
         private final IgniteLogger log;
@@ -1002,6 +1002,13 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
 
             return evts;
         }
+
+        /**
+         * @return {@code True} if listener should be executed in non-system 
thread.
+         */
+        protected boolean async() {
+            return U.hasAnnotation(impl, IgniteAsyncCallback.class);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/07d62cea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
index 5f7232a..b1ed6cd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
@@ -22,6 +22,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
@@ -124,77 +126,119 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerTx() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerTxOffHeap() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxOffHeapJCacheApi() throws Exception 
{
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_TIERED), true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerTxOffHeapValues() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_VALUES), true, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_VALUES), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerAtomic() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
ONHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
ONHEAP_TIERED), true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerReplicatedAtomic() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, 
ONHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, 
ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedAtomicJCacheApi() throws 
Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, 
ONHEAP_TIERED), true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerReplicatedAtomicOffHeapValues() 
throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, 
ONHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, 
ONHEAP_TIERED), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerAtomicOffHeap() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerAtomicOffHeapValues() throws 
Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerAtomicWithoutBackup() throws 
Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, 
ONHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, 
ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicWithoutBackupJCacheApi() throws 
Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, 
ONHEAP_TIERED), true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListener() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerReplicated() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true);
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedJCacheApi() throws 
Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, true);
     }
 
     ///
@@ -205,70 +249,105 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterTx() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterTxOffHeap() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_TIERED), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_TIERED), true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterTxOffHeapValues() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_VALUES), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_VALUES), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomic() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
ONHEAP_TIERED), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
ONHEAP_TIERED), true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterReplicatedAtomic() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, 
ONHEAP_TIERED), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, 
ONHEAP_TIERED), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicOffHeap() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapJCacheApi() throws 
Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicOffHeapValues() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicWithoutBackup() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, 
ONHEAP_TIERED), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, 
ONHEAP_TIERED), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilter() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterReplicated() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true);
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
     }
 
     ///
@@ -279,81 +358,83 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterTxSyncFilter() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), false, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterTxOffHeapSyncFilter() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_TIERED), false, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_TIERED), false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterTxOffHeapValuesSyncFilter() throws 
Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_VALUES), false, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, OFFHEAP_VALUES), false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicSyncFilter() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
ONHEAP_TIERED), false, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
ONHEAP_TIERED), false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterReplicatedAtomicSyncFilter() throws 
Exception {
-        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, 
ONHEAP_TIERED), false, true);
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, 
ONHEAP_TIERED), false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicOffHeapSyncFilter() throws 
Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), false, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicOffHeapValuesSyncFilter() throws 
Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), false, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, 
OFFHEAP_TIERED), false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicWithoutBackupSyncFilter() throws 
Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, 
ONHEAP_TIERED), false, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, 
ONHEAP_TIERED), false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterSyncFilter() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), false, true);
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), false, true, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterReplicatedSyncFilter() throws Exception 
{
-        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), false, true);
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, 
TRANSACTIONAL, ONHEAP_TIERED), false, true, false);
     }
 
     /**
      * @param ccfg Cache configuration.
-     * @param asyncFilter Async filter.
-     * @param asyncListener Async listener.
+     * @param asyncFltr Async filter.
+     * @param asyncLsnr Async listener.
+     * @param jcacheApi Use JCache api for registration entry update listener.
      * @throws Exception If failed.
      */
-    public void testNonDeadLockInListener(CacheConfiguration ccfg,
-        final boolean asyncFilter,
-        boolean asyncListener) throws Exception {
+    private void testNonDeadLockInListener(CacheConfiguration ccfg,
+        final boolean asyncFltr,
+        boolean asyncLsnr,
+        boolean jcacheApi) throws Exception {
         ignite(0).createCache(ccfg);
 
         ThreadLocalRandom rnd = ThreadLocalRandom.current();
@@ -371,8 +452,6 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
                 final QueryTestValue val0 = new QueryTestValue(1);
                 final QueryTestValue newVal = new QueryTestValue(2);
 
-                ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new 
ContinuousQuery<>();
-
                 final CountDownLatch latch = new CountDownLatch(1);
                 final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1);
 
@@ -380,7 +459,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
                     new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends 
QueryTestKey, ? extends QueryTestValue>>() {
                         @Override public void apply(Ignite ignite, 
CacheEntryEvent<? extends QueryTestKey,
                             ? extends QueryTestValue> e) {
-                            if (asyncFilter) {
+                            if (asyncFltr) {
                                 assertFalse("Failed: " + 
Thread.currentThread().getName(),
                                     
Thread.currentThread().getName().contains("sys-"));
 
@@ -435,20 +514,39 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
                         }
                     };
 
-                if (asyncListener)
-                    conQry.setLocalListener(new 
CacheInvokeListenerAsync(lsnrClsr));
-                else
-                    conQry.setLocalListener(new CacheInvokeListener(lsnrClsr));
+                QueryCursor qry = null;
+                MutableCacheEntryListenerConfiguration<QueryTestKey, 
QueryTestValue> lsnrCfg = null;
+
+                CacheInvokeListener locLsnr = asyncLsnr ? new 
CacheInvokeListenerAsync(lsnrClsr)
+                    : new CacheInvokeListener(lsnrClsr);
+
+                CacheEntryEventSerializableFilter<QueryTestKey, 
QueryTestValue> rmtFltr = asyncFltr ?
+                    new CacheTestRemoteFilterAsync(fltrClsr) : new 
CacheTestRemoteFilter(fltrClsr);
+
+                if (jcacheApi) {
+                    lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+                        FactoryBuilder.factoryOf(locLsnr),
+                        FactoryBuilder.factoryOf(rmtFltr),
+                        true,
+                        false
+                    );
+
+                    cache.registerCacheEntryListener(lsnrCfg);
+                }
+                else {
+                    ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new 
ContinuousQuery<>();
+
+                    conQry.setLocalListener(locLsnr);
 
-                if (asyncFilter)
-                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new 
CacheTestRemoteFilterAsync(fltrClsr)));
-                else
-                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new 
CacheTestRemoteFilter(fltrClsr)));
+                    
conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr));
 
-                try (QueryCursor qry = cache.query(conQry)) {
+                    qry = cache.query(conQry);
+                }
+
+                try {
                     if (rnd.nextBoolean())
                         cache.put(key, val0);
-                    else
+                    else {
                         cache.invoke(key, new CacheEntryProcessor() {
                             @Override public Object process(MutableEntry 
entry, Object... arguments)
                                 throws EntryProcessorException {
@@ -457,6 +555,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
                                 return null;
                             }
                         });
+                    }
 
                     assertTrue("Failed to waiting event.", U.await(latch, 3, 
SECONDS));
 
@@ -464,6 +563,13 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
 
                     assertTrue("Failed to waiting event from listener.", 
U.await(latch, 3, SECONDS));
                 }
+                finally {
+                    if (qry != null)
+                        qry.close();
+
+                    if (lsnrCfg != null)
+                        cache.deregisterCacheEntryListener(lsnrCfg);
+                }
 
                 log.info("Iteration finished: " + i);
             }
@@ -477,11 +583,13 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
      * @param ccfg Cache configuration.
      * @param asyncFilter Async filter.
      * @param asyncLsnr Async listener.
+     * @param jcacheApi Use JCache api for start update listener.
      * @throws Exception If failed.
      */
     private void testNonDeadLockInFilter(CacheConfiguration ccfg,
         final boolean asyncFilter,
-        final boolean asyncLsnr) throws Exception {
+        final boolean asyncLsnr,
+        boolean jcacheApi) throws Exception {
         ignite(0).createCache(ccfg);
 
         ThreadLocalRandom rnd = ThreadLocalRandom.current();
@@ -499,8 +607,6 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
                 final QueryTestValue val0 = new QueryTestValue(1);
                 final QueryTestValue newVal = new QueryTestValue(2);
 
-                ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new 
ContinuousQuery<>();
-
                 final CountDownLatch latch = new CountDownLatch(1);
                 final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1);
 
@@ -581,17 +687,37 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
                         }
                     };
 
-                if (asyncFilter)
-                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new 
CacheTestRemoteFilterAsync(fltrClsr)));
-                else
-                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new 
CacheTestRemoteFilter(fltrClsr)));
 
-                if (asyncLsnr)
-                    conQry.setLocalListener(new 
CacheInvokeListenerAsync(lsnrClsr));
-                else
-                    conQry.setLocalListener(new CacheInvokeListener(lsnrClsr));
+                QueryCursor qry = null;
+                MutableCacheEntryListenerConfiguration<QueryTestKey, 
QueryTestValue> lsnrCfg = null;
+
+                CacheInvokeListener locLsnr = asyncLsnr ? new 
CacheInvokeListenerAsync(lsnrClsr)
+                    : new CacheInvokeListener(lsnrClsr);
+
+                CacheEntryEventSerializableFilter<QueryTestKey, 
QueryTestValue> rmtFltr = asyncFilter ?
+                    new CacheTestRemoteFilterAsync(fltrClsr) : new 
CacheTestRemoteFilter(fltrClsr);
+
+                if (jcacheApi) {
+                    lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+                        FactoryBuilder.factoryOf(locLsnr),
+                        FactoryBuilder.factoryOf(rmtFltr),
+                        true,
+                        false
+                    );
+
+                    cache.registerCacheEntryListener(lsnrCfg);
+                }
+                else {
+                    ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new 
ContinuousQuery<>();
+
+                    conQry.setLocalListener(locLsnr);
+
+                    
conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr));
+
+                    qry = cache.query(conQry);
+                }
 
-                try (QueryCursor qry = cache.query(conQry)) {
+                try {
                     if (rnd.nextBoolean())
                         cache.put(key, val0);
                     else
@@ -610,6 +736,13 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
 
                     assertTrue("Failed to waiting event from filter.", 
U.await(latch, 3, SECONDS));
                 }
+                finally {
+                    if (qry != null)
+                        qry.close();
+
+                    if (lsnrCfg != null)
+                        cache.deregisterCacheEntryListener(lsnrCfg);
+                }
 
                 log.info("Iteration finished: " + i);
             }
@@ -701,7 +834,8 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
     /**
      *
      */
-    private static class CacheInvokeListener implements 
CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> {
+    private static class CacheInvokeListener implements 
CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryCreatedListener<QueryTestKey, QueryTestValue>, Serializable {
         @IgniteInstanceResource
         private Ignite ignite;
 
@@ -723,6 +857,13 @@ public class CacheContinuousQueryAsyncFilterListenerTest 
extends GridCommonAbstr
             for (CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue> e : events)
                 clsr.apply(ignite, e);
         }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends 
QueryTestKey,
+            ? extends QueryTestValue>> events) throws 
CacheEntryListenerException {
+            for (CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue> e : events)
+                clsr.apply(ignite, e);
+        }
     }
 
     /**

Reply via email to