IGNITE-2004 Fixed review notes.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b118b685
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b118b685
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b118b685

Branch: refs/heads/ignite-2004
Commit: b118b6850458bd0946a52e4fb4cd0ecde8c56458
Parents: a9c8159
Author: nikolay_tikhonov <[email protected]>
Authored: Mon Apr 11 17:36:17 2016 +0300
Committer: nikolay_tikhonov <[email protected]>
Committed: Mon Apr 11 17:36:17 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |   4 +-
 .../processors/cache/GridCacheMapEntry.java     |  36 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  24 +-
 .../distributed/near/GridNearAtomicCache.java   |   1 +
 .../continuous/CacheContinuousQueryHandler.java | 462 +++++++++----------
 .../CacheContinuousQueryListener.java           |   8 +-
 .../continuous/CacheContinuousQueryManager.java |  23 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   4 +-
 ...ryFactoryAsyncFilterRandomOperationTest.java |  40 ++
 .../CacheContinuousQueryOrderingEventTest.java  |  26 +-
 ...acheContinuousQueryRandomOperationsTest.java |  13 +
 11 files changed, 332 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 8270c21..c7f3a38 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -24,10 +24,10 @@ import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.eviction.EvictableEntry;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -492,7 +492,7 @@ public interface GridCacheEntryEx {
         String taskName,
         @Nullable CacheObject prevVal,
         @Nullable Long updateCntr,
-        @Nullable IgniteInternalFuture fut
+        @Nullable GridDhtAtomicUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 921be85..f64803a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -42,6 +41,7 @@ import 
org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
 import 
org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@ -1243,7 +1243,6 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     internal,
                     partition(),
                     tx.local(),
-                    true,
                     false,
                     updateCntr0,
                     null,
@@ -1442,7 +1441,6 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     internal,
                     partition(),
                     tx.local(),
-                    true,
                     false,
                     updateCntr0,
                     null,
@@ -1820,7 +1818,6 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     internal,
                     partition(),
                     true,
-                    true,
                     false,
                     updateCntr,
                     null,
@@ -1874,7 +1871,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         String taskName,
         @Nullable CacheObject prevVal,
         @Nullable Long updateCntr,
-        @Nullable IgniteInternalFuture fut
+        @Nullable GridDhtAtomicUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, 
GridClosureException {
         assert cctx.atomic();
 
@@ -2011,8 +2008,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            updateCntr0 == null ? 0 : updateCntr0,
-                            null);
+                            updateCntr0 == null ? 0 : updateCntr0);
                     }
                     // Will update something.
                     else {
@@ -2096,9 +2092,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                                 isInternal() || !context().userCache(),
                                 partition(),
                                 primary,
-                                true,
                                 false,
                                 updateCntr0,
+                                null,
                                 topVer);
                         }
 
@@ -2111,8 +2107,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            0,
-                            null);
+                            0);
                     }
                 }
                 else
@@ -2189,8 +2184,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         null,
                         null,
                         false,
-                        updateCntr0 == null ? 0 : updateCntr0,
-                        null);
+                        updateCntr0 == null ? 0 : updateCntr0);
                 }
             }
 
@@ -2238,8 +2232,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         null,
                         null,
                         false,
-                        updateCntr0 == null ? 0 : updateCntr0,
-                        null);
+                        updateCntr0 == null ? 0 : updateCntr0);
                 }
             }
             else
@@ -2340,8 +2333,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            updateCntr0 == null ? 0 : updateCntr0,
-                            null);
+                            updateCntr0 == null ? 0 : updateCntr0);
                     else if (interceptorVal != updated0) {
                         updated0 = cctx.unwrapTemporary(interceptorVal);
 
@@ -2424,8 +2416,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            updateCntr0 == null ? 0 : updateCntr0,
-                            null);
+                            updateCntr0 == null ? 0 : updateCntr0);
                 }
 
                 if (writeThrough)
@@ -2527,8 +2518,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     evtOldVal = 
cctx.toCacheObject(cctx.unwrapTemporary(evtOldVal));
                 }
 
-                clsrs = cctx.continuousQueries().onEntryUpdated(lsnrs, key, 
evtVal, evtOldVal, internal,
-                    partition(), primary, false, false, updateCntr0, topVer);
+                cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, 
evtOldVal, internal,
+                    partition(), primary, false, updateCntr0, fut, topVer);
             }
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE, keepBinary);
@@ -2556,8 +2547,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             enqueueVer,
             conflictCtx,
             true,
-            updateCntr0 == null ? 0 : updateCntr0,
-            clsrs);
+            updateCntr0 == null ? 0 : updateCntr0);
     }
 
     /**
@@ -3331,9 +3321,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         this.isInternal() || !this.context().userCache(),
                         this.partition(),
                         true,
-                        true,
                         preload,
                         updateCntr,
+                        null,
                         topVer);
 
                     cctx.dataStructures().onEntryUpdated(key, false, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 73a2ede..b0fc948 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -20,7 +20,6 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -42,6 +41,7 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -97,6 +97,9 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
     /** Future keys. */
     private final Collection<KeyCacheObject> keys;
 
+    /** Continuous query closures. */
+    private Collection<CI1<Boolean>> cntQryClsrs;
+
     /** */
     private final boolean waitForExchange;
 
@@ -336,16 +339,33 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
         }
     }
 
+    /**
+     * @param clsr Continuous query closure.
+     */
+    public void addContinuousQueryClosure(CI1<Boolean> clsr){
+        if (cntQryClsrs == null)
+            cntQryClsrs = new ArrayList<>(10);
+
+        cntQryClsrs.add(clsr);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable Void res, @Nullable Throwable 
err) {
         if (super.onDone(res, err)) {
             cctx.mvcc().removeAtomicFuture(version());
 
-            if (err != null) {
+            boolean suc = err == null;
+
+            if (!suc) {
                 for (KeyCacheObject key : keys)
                     updateRes.addFailedKey(key, err);
             }
 
+            if (cntQryClsrs != null) {
+                for (CI1<Boolean> clsr : cntQryClsrs)
+                    clsr.apply(suc);
+            }
+
             if (updateReq.writeSynchronizationMode() == FULL_SYNC)
                 completionCb.apply(updateReq, updateRes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index e0c7187..d8cbe6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -373,6 +373,7 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
                             req.subjectId(),
                             taskName,
                             null,
+                            null,
                             null);
 
                         if (updRes.removeVersion() != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 3ecac40..79d19c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -34,17 +34,13 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
@@ -53,7 +49,6 @@ import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -64,6 +59,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
@@ -73,15 +69,13 @@ import 
org.apache.ignite.internal.processors.platform.cache.query.PlatformContin
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -367,11 +361,10 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                 return keepBinary;
             }
 
-            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, 
V> evt,
+            @Override public void onEntryUpdated(final 
CacheContinuousQueryEvent<K, V> evt,
                 boolean primary,
-                boolean recordIgniteEvt,
-                boolean fireEvent,
-                IgniteInternalFuture<?> fut) {
+                final boolean recordIgniteEvt,
+                GridDhtAtomicUpdateFuture fut) {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
                     return ;
 
@@ -385,7 +378,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                 assert !skipPrimaryCheck || (cctx.isReplicated() && 
ctx.localNodeId().equals(nodeId));
 
                 if (asyncCallback) {
-                    ContinuousQueryClosureImpl clsr = new 
ContinuousQueryClosureImpl(
+                    ContinuousQueryAsyncClosure clsr = new 
ContinuousQueryAsyncClosure(
                         primary,
                         evt,
                         recordIgniteEvt,
@@ -393,6 +386,23 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
 
                     ctx.continuousQueryPool().execute(clsr, evt.partitionId());
                 }
+                else {
+                    final boolean notify = filter(evt, primary);
+
+                    if (primary || skipPrimaryCheck) {
+                        if (fut == null)
+                            onEntryUpdate(evt, notify, loc, recordIgniteEvt);
+                        else
+                            fut.addContinuousQueryClosure(new CI1<Boolean>() {
+                                @Override public void apply(Boolean suc) {
+                                    if (!suc)
+                                        evt.entry().markFiltered();
+
+                                    onEntryUpdate(evt, notify, loc, 
recordIgniteEvt);
+                                }
+                            });
+                    }
+                }
             }
 
             @Override public void onUnregister() {
@@ -439,14 +449,14 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
             }
 
             @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, 
V> evt,
-                AffinityTopologyVersion topVer, boolean primary, boolean 
fireEvt) {
+                AffinityTopologyVersion topVer, boolean primary) {
                 assert evt != null;
 
                 CacheContinuousQueryEntry e = evt.entry();
 
                 e.markFiltered();
 
-                onEntryUpdated(evt, primary, false, fireEvt, null);
+                onEntryUpdated(evt, primary, false, null);
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -543,85 +553,85 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void notifyCallback(UUID nodeId, UUID routineId, 
Collection<?> objs, final GridKernalContext ctx) {
+    @Override public void notifyCallback(final UUID nodeId,
+        final UUID routineId,
+        Collection<?> objs,
+        final GridKernalContext ctx) {
         assert nodeId != null;
         assert routineId != null;
         assert objs != null;
         assert ctx != null;
 
-        Collection<CacheContinuousQueryEntry> entries = 
(Collection<CacheContinuousQueryEntry>)objs;
+        final Collection<CacheContinuousQueryEntry> entries = 
(Collection<CacheContinuousQueryEntry>)objs;
+
+        if (entries.iterator().hasNext()) {
+            if (asyncCallback) {
+                int partId = entries.iterator().next().partition();
 
+                ctx.continuousQueryPool().execute(new Runnable() {
+                    @Override public void run() {
+                        notifyCallback0(nodeId, ctx, entries);
+                    }
+                }, partId);
+            }
+            else
+                notifyCallback0(nodeId, ctx, entries);
+        }
+    }
+
+    /**
+     * @param nodeId Node id.
+     * @param ctx Kernal context.
+     * @param entries Entries.
+     */
+    private void notifyCallback0(UUID nodeId,
+        final GridKernalContext ctx,
+        Collection<CacheContinuousQueryEntry> entries) {
         final GridCacheContext cctx = cacheContext(ctx);
 
         final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = 
new ArrayList<>();
 
-        final List<PartitionRecovery> rcvs = new ArrayList<>();
+        for (CacheContinuousQueryEntry e : entries) {
+            GridCacheDeploymentManager depMgr = cctx.deploy();
 
-        try {
-            for (CacheContinuousQueryEntry e : entries) {
-                GridCacheDeploymentManager depMgr = cctx.deploy();
+            ClassLoader ldr = depMgr.globalLoader();
 
-                ClassLoader ldr = depMgr.globalLoader();
+            if (ctx.config().isPeerClassLoadingEnabled()) {
+                GridDeploymentInfo depInfo = e.deployInfo();
 
-                if (ctx.config().isPeerClassLoadingEnabled()) {
-                    GridDeploymentInfo depInfo = e.deployInfo();
-
-                    if (depInfo != null) {
-                        depMgr.p2pContext(nodeId, depInfo.classLoaderId(), 
depInfo.userVersion(), depInfo.deployMode(),
-                            depInfo.participants(), 
depInfo.localDeploymentOwner());
-                    }
+                if (depInfo != null) {
+                    depMgr.p2pContext(nodeId, depInfo.classLoaderId(), 
depInfo.userVersion(), depInfo.deployMode(),
+                        depInfo.participants(), 
depInfo.localDeploymentOwner());
                 }
+            }
 
-                try {
-                    e.unmarshal(cctx, ldr);
-
-                    if (!asyncCallback) {
-                        T2<Collection<CacheEntryEvent<? extends K, ? extends 
V>>, PartitionRecovery> evts =
-                            handleEvent(ctx, e, false);
+            try {
+                e.unmarshal(cctx, ldr);
 
-                        if (evts.get2() != null)
-                            rcvs.add(evts.get2());
+                Collection<CacheEntryEvent<? extends K, ? extends V>> evts = 
handleEvent(ctx, e);
 
-                        entries0.addAll(evts.get1());
-                    }
-                }
-                catch (IgniteCheckedException ex) {
-                    if (ignoreClsNotFound)
-                        assert internal;
-                    else
-                        U.error(ctx.log(getClass()), "Failed to unmarshal 
entry.", ex);
-                }
+                if (evts != null && !evts.isEmpty())
+                    entries0.addAll(evts);
             }
-
-            if (asyncCallback) {
-                for (final CacheContinuousQueryEntry e : entries) {
-                    ctx.continuousQueryPool().execute(new Runnable() {
-                        @Override public void run() {
-                            T2<Collection<CacheEntryEvent<? extends K, ? 
extends V>>, PartitionRecovery> evts =
-                                handleEvent(ctx, e, false);
-
-                            locLsnr.onUpdated(evts.get1());
-                        }
-                    }, e.partition());
-                }
+            catch (IgniteCheckedException ex) {
+                if (ignoreClsNotFound)
+                    assert internal;
+                else
+                    U.error(ctx.log(getClass()), "Failed to unmarshal entry.", 
ex);
             }
-            else if (!entries0.isEmpty())
-                locLsnr.onUpdated(entries0);
-        }
-        finally {
-            for (PartitionRecovery rec : rcvs)
-                rec.unlock();
         }
+
+        if (!entries0.isEmpty())
+            locLsnr.onUpdated(entries0);
     }
 
     /**
      * @param ctx Context.
      * @param e entry.
-     * @param async Async.
      * @return Entry collection.
      */
-    private T2<Collection<CacheEntryEvent<? extends K, ? extends V>>, 
PartitionRecovery>
-        handleEvent(GridKernalContext ctx, CacheContinuousQueryEntry e, 
boolean async) {
+    private Collection<CacheEntryEvent<? extends K, ? extends V>> 
handleEvent(GridKernalContext ctx,
+        CacheContinuousQueryEntry e) {
         assert e != null;
 
         GridCacheContext<K, V> cctx = cacheContext(ctx);
@@ -630,22 +640,137 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
 
         if (internal) {
             if (e.isFiltered())
-                return new T2(Collections.emptyList(), null);
+                return Collections.emptyList();
             else
-                return new T2(F.<CacheEntryEvent<? extends K, ? extends 
V>>asList(
-                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e)), 
null);
+                return F.<CacheEntryEvent<? extends K, ? extends V>>asList(
+                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
         }
 
         // Initial query entry or evicted entry. These events should be fired 
immediately.
         if (e.updateCounter() == -1L) {
-            return !e.isFiltered() ? new T2(F.<CacheEntryEvent<? extends K, ? 
extends V>>asList(
-                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e)), 
null) :
-                new T2(Collections.<CacheEntryEvent<? extends K, ? extends 
V>>emptyList(), null);
+            return !e.isFiltered() ? F.<CacheEntryEvent<? extends K, ? extends 
V>>asList(
+                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) :
+                Collections.<CacheEntryEvent<? extends K, ? extends 
V>>emptyList();
         }
 
         PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, 
e.partition());
 
-        return new T2<>(rec.<K, V>collectEntries(e, cctx, async, cache), rec);
+        return rec.collectEntries(e, cctx, cache);
+    }
+
+    /**
+     * @param primary Primary.
+     * @param evt Query event.
+     * @return {@code True} if event passed filter otherwise {@code true}.
+     */
+    public boolean filter(CacheContinuousQueryEvent evt, boolean primary) {
+        CacheContinuousQueryEntry entry = evt.entry();
+
+        boolean notify = !entry.isFiltered();
+
+        try {
+            if (notify && getEventFilter() != null)
+                notify = getEventFilter().evaluate(evt);
+        }
+        catch (Exception e) {
+            U.error(log, "CacheEntryEventFilter failed: " + e);
+        }
+
+        if (!notify)
+            entry.markFiltered();
+
+        if (!primary) {
+            if (!internal) {
+                // Skip init query and expire entries.
+                if (entry.updateCounter() != -1L) {
+                    entry.markBackup();
+
+                    backupQueue.add(entry);
+                }
+            }
+        }
+
+        return notify;
+    }
+
+    /**
+     * @param evt Continuous query event.
+     * @param notify Notify flag.
+     * @param loc Listener deployed on this node.
+     * @param recordIgniteEvt Record ignite event.
+     */
+    private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, 
boolean loc, boolean recordIgniteEvt) {
+        try {
+            GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+            if (cctx == null)
+                return;
+
+            final CacheContinuousQueryEntry entry = evt.entry();
+
+            if (loc) {
+                if (!locCache) {
+                    Collection<CacheEntryEvent<? extends K, ? extends V>> evts 
= handleEvent(ctx, entry);
+
+                    if (!evts.isEmpty()) {
+                        locLsnr.onUpdated(evts);
+
+                        if (!internal && !skipPrimaryCheck)
+                            
sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+                    }
+                }
+                else {
+                    if (!entry.isFiltered())
+                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? 
extends V>>asList(evt));
+                }
+            }
+            else {
+                if (!entry.isFiltered())
+                    prepareEntry(cctx, nodeId, entry);
+
+                CacheContinuousQueryEntry e = handleEntry(entry);
+
+                if (e != null)
+                    ctx.continuous().addNotification(nodeId, routineId, entry, 
topic, sync, true);
+            }
+        }
+        catch (ClusterTopologyCheckedException ex) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send event notification to node, node 
left cluster " +
+                    "[node=" + nodeId + ", err=" + ex + ']');
+        }
+        catch (IgniteCheckedException ex) {
+            U.error(ctx.log(getClass()), "Failed to send event notification to 
node: " + nodeId, ex);
+        }
+
+        if (recordIgniteEvt && notify) {
+            ctx.event().record(new CacheQueryReadEvent<>(
+                ctx.discovery().localNode(),
+                "Continuous query executed.",
+                EVT_CACHE_QUERY_OBJECT_READ,
+                CacheQueryType.CONTINUOUS.name(),
+                cacheName,
+                null,
+                null,
+                null,
+                getEventFilter() instanceof CacheEntryEventSerializableFilter ?
+                    (CacheEntryEventSerializableFilter)getEventFilter() : null,
+                null,
+                nodeId,
+                taskName(),
+                evt.getKey(),
+                evt.getValue(),
+                evt.getOldValue(),
+                null
+            ));
+        }
+    }
+
+    /**
+     * @return Task name.
+     */
+    private String taskName() {
+        return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) 
: null;
     }
 
     /**
@@ -748,9 +873,6 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         /** */
         private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new 
TreeMap<>();
 
-        /** */
-        private Lock lock = new ReentrantLock();
-
         /**
          * @param log Logger.
          * @param topVer Topology version.
@@ -778,23 +900,19 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
          */
         <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> 
collectEntries(CacheContinuousQueryEntry entry,
             GridCacheContext cctx,
-            boolean async,
             IgniteCache cache) {
-            if (!async)
-                lock.lock();
+            assert entry != null;
 
-            try {
-                assert entry != null;
+            if (entry.topologyVersion() == null) { // Possible if entry is 
sent from old node.
+                assert entry.updateCounter() == 0L : entry;
 
-                if (entry.topologyVersion() == null) { // Possible if entry is 
sent from old node.
-                    assert entry.updateCounter() == 0L : entry;
-
-                    return F.<CacheEntryEvent<? extends K, ? extends V>>
-                        asList(new CacheContinuousQueryEvent<K, V>(cache, 
cctx, entry));
-                }
+                return F.<CacheEntryEvent<? extends K, ? extends V>>
+                    asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, 
entry));
+            }
 
-                List<CacheEntryEvent<? extends K, ? extends V>> entries;
+            List<CacheEntryEvent<? extends K, ? extends V>> entries;
 
+            synchronized (pendingEvts) {
                 // Received first event.
                 if (curTop == AffinityTopologyVersion.NONE) {
                     lastFiredEvt = entry.updateCounter();
@@ -886,22 +1004,9 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                             break;
                     }
                 }
-
-                return entries;
             }
-            catch (Exception e) {
-                if (!async)
-                    lock.unlock();
-
-                throw new IgniteException("Failed to collect entries.");
-            }
-        }
 
-        /**
-         * Unlock.
-         */
-        public void unlock() {
-            lock.unlock();
+            return entries;
         }
     }
 
@@ -1255,7 +1360,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     /**
      *
      */
-    private class ContinuousQueryClosureImpl implements Runnable {
+    private class ContinuousQueryAsyncClosure implements Runnable {
         /** */
         private CacheContinuousQueryEvent<K, V> evt;
 
@@ -1263,9 +1368,6 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         private boolean primary;
 
         /** */
-        private boolean notify;
-
-        /** */
         private boolean recordIgniteEvt;
 
         /** */
@@ -1275,8 +1377,9 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
          * @param primary Primary flag.
          * @param evt Event.
          * @param recordIgniteEvt Fired event.
+         * @param fut Dht future.
          */
-        ContinuousQueryClosureImpl(
+        ContinuousQueryAsyncClosure(
             boolean primary,
             CacheContinuousQueryEvent<K, V> evt,
             boolean recordIgniteEvt,
@@ -1289,20 +1392,21 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
 
         /** {@inheritDoc} */
         @Override public void run() {
-            if (!filter())
-                return;
+            boolean notify = filter(evt, primary);
 
-            if (fut != null) {
-                if (waitIfAsync())
-                    onEntryUpdate0();
-                else {
-                    evt.entry().markFiltered();
+            if (primary()) {
+                if (fut != null) {
+                    if (waitFuture())
+                        onEntryUpdate(evt, notify, 
nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+                    else {
+                        evt.entry().markFiltered();
 
-                    onEntryUpdate0();
+                        onEntryUpdate(evt, notify, 
nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+                    }
                 }
+                else
+                    onEntryUpdate(evt, notify, 
nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
             }
-            else
-                onEntryUpdate0();
         }
 
         /**
@@ -1313,9 +1417,9 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         }
 
         /**
-         * @return {@code False} if filter sync.
+         * @return {@code False} if future completed with error otherwise 
{@code true}.
          */
-        private boolean waitIfAsync() {
+        private boolean waitFuture() {
             try {
                 fut.get();
             }
@@ -1325,124 +1429,6 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
 
             return true;
         }
-
-        /**
-         *
-         */
-        private void onEntryUpdate0() {
-            try {
-                GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-                if (cctx == null)
-                    return;
-
-                final CacheContinuousQueryEntry entry = evt.entry();
-
-                if (routineId.equals(nodeId)) {
-                    if (!locCache) {
-                        T2<Collection<CacheEntryEvent<? extends K, ? extends 
V>>, PartitionRecovery> events =
-                            handleEvent(ctx, entry, asyncCallback);
-
-                        try {
-                            Collection<CacheEntryEvent<? extends K, ? extends 
V>> evts = events.get1();
-
-                            if (!evts.isEmpty()) {
-                                locLsnr.onUpdated(evts);
-
-                                if (!internal && !skipPrimaryCheck)
-                                    
sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
-                            }
-                        }
-                        finally {
-                            if (events.get2() != null)
-                                events.get2().unlock();
-                        }
-                    }
-                    else {
-                        if (!entry.isFiltered())
-                            locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, 
? extends V>>asList(evt));
-                    }
-                }
-                else {
-                    if (!entry.isFiltered())
-                        prepareEntry(cctx, nodeId, entry);
-
-                    CacheContinuousQueryEntry e = handleEntry(entry);
-
-                    if (e != null)
-                        ctx.continuous().addNotification(nodeId, routineId, 
entry, topic, sync, true);
-                }
-            }
-            catch (ClusterTopologyCheckedException ex) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send event notification to node, node 
left cluster " +
-                        "[node=" + nodeId + ", err=" + ex + ']');
-            }
-            catch (IgniteCheckedException ex) {
-                U.error(ctx.log(getClass()), "Failed to send event 
notification to node: " + nodeId, ex);
-            }
-
-            if (recordIgniteEvt && notify) {
-                ctx.event().record(new CacheQueryReadEvent<>(
-                    ctx.discovery().localNode(),
-                    "Continuous query executed.",
-                    EVT_CACHE_QUERY_OBJECT_READ,
-                    CacheQueryType.CONTINUOUS.name(),
-                    cacheName,
-                    null,
-                    null,
-                    null,
-                    getEventFilter() instanceof 
CacheEntryEventSerializableFilter ?
-                        (CacheEntryEventSerializableFilter)getEventFilter() : 
null,
-                    null,
-                    nodeId,
-                    taskName(),
-                    evt.getKey(),
-                    evt.getValue(),
-                    evt.getOldValue(),
-                    null
-                ));
-            }
-        }
-
-        /**
-         * @return {@code True} if event happen on primary node otherwise 
{@code false}.
-         */
-        public boolean filter() {
-            CacheContinuousQueryEntry entry = evt.entry();
-
-            notify = !entry.isFiltered();
-
-            try {
-                if (notify && getEventFilter() != null)
-                    notify = getEventFilter().evaluate(evt);
-            }
-            catch (Exception e) {
-                U.error(log, "CacheEntryEventFilter failed: " + e);
-            }
-
-            if (!notify)
-                entry.markFiltered();
-
-            if (!primary()) {
-                if (!internal) {
-                    // Skip init query and expire entries.
-                    if (entry.updateCounter() != -1L) {
-                        entry.markBackup();
-
-                        backupQueue.add(entry);
-                    }
-                }
-
-                return false;
-            }
-
-            return true;
-        }
-
-        private String taskName() {
-            return ctx.security().enabled() ? 
ctx.task().resolveTaskName(taskHash) : null;
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index e86ec47..8eca81c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -19,8 +19,8 @@ package 
org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -38,11 +38,10 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param evt Event
      * @param primary Primary flag.
      * @param recordIgniteEvt Whether to record event.
-     * @param fireEvent Immediately fired events.
      * @param fut Dht atomic future.
      */
     public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean 
primary,
-        boolean recordIgniteEvt, boolean fireEvent, @Nullable 
IgniteInternalFuture<?> fut);
+        boolean recordIgniteEvt, @Nullable GridDhtAtomicUpdateFuture fut);
 
     /**
      * Listener unregistered callback.
@@ -74,8 +73,7 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param topVer Topology version.
      * @param primary Primary
      */
-    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
-        AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt);
+    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, 
AffinityTopologyVersion topVer, boolean primary);
 
     /**
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index f6ab8b5..3a5e891 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -25,7 +25,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
@@ -47,8 +46,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
@@ -57,11 +54,13 @@ import 
org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import 
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -170,7 +169,6 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
      * @param partId Partition id.
      * @param updCntr Updated counter.
      * @param primary Primary.
-     * @param fireEvnt
      * @param topVer Topology version.
      */
     public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
@@ -178,7 +176,6 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         int partId,
         long updCntr,
         boolean primary,
-        boolean fireEvnt,
         AffinityTopologyVersion topVer) {
         assert lsnrs != null;
 
@@ -197,7 +194,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            lsnr.skipUpdateEvent(evt, topVer, primary, fireEvnt);
+            lsnr.skipUpdateEvent(evt, topVer, primary);
         }
     }
 
@@ -242,10 +239,9 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         boolean internal,
         int partId,
         boolean primary,
-        boolean fireEvnt,
         boolean preload,
         long updateCntr,
-        @Nullable IgniteInternalFuture<?> fut,
+        @Nullable GridDhtAtomicUpdateFuture fut,
         AffinityTopologyVersion topVer) throws IgniteCheckedException {
         Map<UUID, CacheContinuousQueryListener> lsnrCol = 
updateListeners(internal, preload);
 
@@ -258,7 +254,6 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                 internal,
                 partId,
                 primary,
-                fireEvnt,
                 preload,
                 updateCntr,
                 fut,
@@ -274,7 +269,6 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
      * @param internal Internal entry (internal key or not user cache),
      * @param partId Partition.
      * @param primary {@code True} if called on primary node.
-     * @param fireEvnt Fired event immediately.
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
      * @param topVer Topology version.
@@ -289,10 +283,9 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         boolean internal,
         int partId,
         boolean primary,
-        boolean fireEvnt,
         boolean preload,
         long updateCntr,
-        @Nullable IgniteInternalFuture<?> fut,
+        @Nullable GridDhtAtomicUpdateFuture fut,
         AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
@@ -303,7 +296,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         boolean hasOldVal = oldVal != null;
 
         if (!hasNewVal && !hasOldVal)
-            skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, 
fireEvnt, topVer);
+            skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, topVer);
 
         EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : 
UPDATED;
 
@@ -343,7 +336,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fireEvnt, fut);
+            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut);
         }
     }
 
@@ -397,7 +390,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, 
e0);
 
-                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, true, null);
+                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, null);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 82b3f4b..3127dcb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -26,6 +26,7 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.eviction.EvictableEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -537,7 +538,8 @@ public class GridCacheTestEntryEx extends 
GridMetadataAwareAdapter implements Gr
         UUID subjId,
         String taskName,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr) throws IgniteCheckedException,
+        @Nullable Long updateCntr,
+        @Nullable GridDhtAtomicUpdateFuture fut) throws IgniteCheckedException,
         GridCacheEntryRemovedException {
         assert false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
index 5d7afdc..a556abd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListenerException;
@@ -90,4 +91,43 @@ public class 
CacheContinuousQueryFactoryAsyncFilterRandomOperationTest
             return new NonSerializableAsyncFilter();
         }
     }
+
+    /** {@inheritDoc} */
+    @Override protected Factory<? extends CacheEntryEventFilter<QueryTestKey, 
QueryTestValue>> noOpFilterFactory() {
+        return FactoryBuilder.factoryOf(NoopAsyncFilter.class);
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    protected static class NoopAsyncFilter implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, 
Externalizable {
+        /** */
+        public NoopAsyncFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends 
QueryTestKey, ? extends QueryTestValue> event)
+            throws CacheEntryListenerException {
+            assertTrue("Failed. Current thread name: " + 
Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("contQry-"));
+
+            assertFalse("Failed. Current thread name: " + 
Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("sys-"));
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            // No-op.
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
index e728b91..8a7eb86 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
@@ -62,7 +62,7 @@ import org.apache.ignite.transactions.Transaction;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  *
@@ -163,16 +163,6 @@ public class CacheContinuousQueryOrderingEventTest extends 
GridCommonAbstractTes
     /**
      * @throws Exception If failed.
      */
-    public void testAtomicReplicated() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC,
-            CacheMemoryMode.ONHEAP_TIERED);
-
-        doOrderingTest(ccfg, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testAtomicReplicatedOffheap() throws Exception {
         CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC,
             CacheMemoryMode.OFFHEAP_TIERED);
@@ -183,16 +173,6 @@ public class CacheContinuousQueryOrderingEventTest extends 
GridCommonAbstractTes
     /**
      * @throws Exception If failed.
      */
-    public void testAtomicOnheapWithoutBackup() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.ATOMIC,
-            CacheMemoryMode.ONHEAP_TIERED);
-
-        doOrderingTest(ccfg, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testTxOnheapTwoBackup() throws Exception {
         CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.TRANSACTIONAL,
             CacheMemoryMode.ONHEAP_TIERED);
@@ -203,7 +183,7 @@ public class CacheContinuousQueryOrderingEventTest extends 
GridCommonAbstractTes
     /**
      * @throws Exception If failed.
      */
-    public void testTxOnheap() throws Exception {
+    public void testTxOnheapWithoutBackup() throws Exception {
         CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.TRANSACTIONAL,
             CacheMemoryMode.ONHEAP_TIERED);
 
@@ -585,7 +565,7 @@ public class CacheContinuousQueryOrderingEventTest extends 
GridCommonAbstractTes
         ccfg.setAtomicityMode(atomicityMode);
         ccfg.setCacheMode(cacheMode);
         ccfg.setMemoryMode(memoryMode);
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setWriteSynchronizationMode(PRIMARY_SYNC);
         ccfg.setAtomicWriteOrderMode(PRIMARY);
 
         if (cacheMode == PARTITIONED)

http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index e9fbf70..e5d007d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -598,6 +598,9 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             final List<CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue>> evts =
                 new CopyOnWriteArrayList<>();
 
+            if (noOpFilterFactory() != null)
+                qry.setRemoteFilterFactory(noOpFilterFactory());
+
             qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, 
QueryTestValue>() {
                 @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends QueryTestKey,
                     ? extends QueryTestValue>> events) throws 
CacheEntryListenerException {
@@ -699,6 +702,13 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
     }
 
     /**
+     * @return No-op filter factory for batch operations.
+     */
+    protected Factory<? extends CacheEntryEventFilter<QueryTestKey, 
QueryTestValue>> noOpFilterFactory() {
+        return null;
+    }
+
+    /**
      * @param ccfg Cache configuration.
      * @throws Exception If failed.
      */
@@ -711,6 +721,9 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             final List<CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue>> evts =
                 new CopyOnWriteArrayList<>();
 
+            if (noOpFilterFactory() != null)
+                qry.setRemoteFilterFactory(noOpFilterFactory());
+
             qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, 
QueryTestValue>() {
                 @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends QueryTestKey,
                     ? extends QueryTestValue>> events) throws 
CacheEntryListenerException {

Reply via email to