IGNITE-143 - Continuous queries refactoring

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3520bfde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3520bfde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3520bfde

Branch: refs/heads/ignite-143
Commit: 3520bfdeb3248f8305978bf35a804e9f2ac2827c
Parents: 4759d02
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Thu Feb 12 13:03:24 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Thu Feb 12 13:03:24 2015 -0800

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  21 +-
 .../processors/cache/GridCacheProcessor.java    |  11 +-
 .../processors/cache/GridCacheProjectionEx.java |   5 +
 .../processors/cache/IgniteCacheProxy.java      |  76 +-
 .../CacheDataStructuresManager.java             |  71 +-
 .../continuous/CacheContinuousQueryEntry.java   |   8 -
 .../continuous/CacheContinuousQueryHandler.java |  94 +--
 .../CacheContinuousQueryListener.java           |   4 +-
 .../continuous/CacheContinuousQueryManager.java | 709 +++++++++----------
 .../service/GridServiceProcessor.java           |  37 +-
 .../hadoop/jobtracker/GridHadoopJobTracker.java |  53 +-
 11 files changed, 504 insertions(+), 585 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 47a0d10..1783352 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1166,7 +1166,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             }
 
             if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdate(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1325,7 +1325,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 }
 
                 if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                    cctx.continuousQueries().onEntryUpdate(this, key, null, 
null, old, oldBytes, false);
+                    cctx.continuousQueries().onEntryUpdated(this, key, null, 
null, old, oldBytes);
 
                 cctx.dataStructures().onEntryUpdated(key, true);
             }
@@ -1602,7 +1602,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             if (res)
                 updateMetrics(op, metrics);
 
-            cctx.continuousQueries().onEntryUpdate(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
+            cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -2139,7 +2139,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 updateMetrics(op, metrics);
 
             if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdate(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -3137,16 +3137,9 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 drReplicate(drType, val, valBytes, ver);
 
                 if (!skipQryNtf) {
-                    if (cctx.isLocal() || cctx.isReplicated() ||
-                        cctx.affinity().primary(cctx.localNode(), key, 
topVer)) {
-                        cctx.continuousQueries().onEntryUpdate(this,
-                            key,
-                            val,
-                            valueBytesUnlocked(),
-                            null,
-                            null,
-                            preload);
-                    }
+                    if (!preload && (cctx.isLocal() || cctx.isReplicated() ||
+                        cctx.affinity().primary(cctx.localNode(), key, 
topVer)))
+                        cctx.continuousQueries().onEntryUpdated(this, key, 
val, valueBytesUnlocked(), null, null);
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e7fa211..e596605 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1589,15 +1589,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      *
      * @return Utility cache.
      */
-    public <K, V> GridCache<K, V> utilityCache() {
-        return cache(CU.UTILITY_CACHE_NAME);
-    }
-
-    /**
-     * @return Utility cache.
-     */
-    public <K, V> IgniteCache<K, V> utilityJCache() {
-        return jcache(CU.UTILITY_CACHE_NAME);
+    public <K, V> GridCacheAdapter<K, V> utilityCache() {
+        return internalCache(CU.UTILITY_CACHE_NAME);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
index 1d30458..d080ddf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
@@ -453,4 +453,9 @@ public interface GridCacheProjectionEx<K, V> extends 
CacheProjection<K, V> {
     public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> 
invokeAllAsync(
         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
         Object... args);
+
+    /**
+     * @return Context.
+     */
+    public GridCacheContext<K, V> context();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 3e8d461..72e7e46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -24,8 +24,6 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.internal.processors.cache.query.continuous.*;
-import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -33,7 +31,6 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.mxbean.*;
-import org.apache.ignite.plugin.security.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
@@ -46,8 +43,6 @@ import java.io.*;
 import java.util.*;
 import java.util.concurrent.locks.*;
 
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-
 /**
  * Cache proxy.
  */
@@ -337,7 +332,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * Executes continuous query.
      *
      * @param qry Query.
-     * @param loc Whether query is local.
+     * @param loc Local flag.
      * @return Initial iteration cursor.
      */
     private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, 
boolean loc) {
@@ -348,63 +343,14 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         if (qry.getLocalListener() == null)
             throw new IgniteException("Mandatory local listener is not set for 
the query: " + qry);
 
-        ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
-
-        IgniteEx ignite = ctx.kernalContext().grid();
-
-        ClusterGroup grp = (loc ? ignite.forLocal() : 
ignite).forCacheNodes(ctx.name());
-
-        Collection<ClusterNode> nodes = grp.nodes();
-
-        if (nodes.isEmpty())
-            throw new ClusterTopologyException("Failed to execute continuous 
query (empty cluster group is " +
-                "provided): " + qry);
-
-        boolean skipPrimaryCheck = false;
-
-        switch (ctx.config().getCacheMode()) {
-            case LOCAL:
-                if (!nodes.contains(ctx.localNode()))
-                    throw new ClusterTopologyException("Continuous query for 
LOCAL cache can be executed " +
-                        "only locally (provided projection contains remote 
nodes only): " + qry);
-                else if (nodes.size() > 1)
-                    U.warn(log, "Continuous query for LOCAL cache will be 
executed locally (provided projection is " +
-                        "ignored): " + this);
-
-                grp = grp.forNode(ctx.localNode());
-
-                break;
-
-            case REPLICATED:
-                if (nodes.size() == 1 && 
F.first(nodes).equals(ctx.localNode())) {
-                    CacheDistributionMode distributionMode = 
ctx.config().getDistributionMode();
-
-                    if (distributionMode == PARTITIONED_ONLY || 
distributionMode == NEAR_PARTITIONED)
-                        skipPrimaryCheck = true;
-                }
-
-                break;
-        }
-
-        int taskNameHash = ctx.kernalContext().security().enabled() ?
-            ctx.kernalContext().job().currentTaskNameHash() : 0;
-
-        GridContinuousHandler hnd = new CacheContinuousQueryHandler<>(
-            ctx.name(),
-            ctx.continuousQueries().topic(),
-            qry.getLocalListener(),
-            qry.getRemoteFilter(),
-            false,
-            false,
-            false,
-            true,
-            skipPrimaryCheck,
-            taskNameHash,
-            false);
-
         try {
-            final UUID routineId = 
ctx.kernalContext().continuous().startRoutine(hnd, qry.getBufferSize(),
-                qry.getTimeInterval(), qry.isAutoUnsubscribe(), 
grp.predicate()).get();
+            final UUID routineId = ctx.continuousQueries().executeQuery(
+                qry.getLocalListener(),
+                qry.getRemoteFilter(),
+                qry.getBufferSize(),
+                qry.getTimeInterval(),
+                qry.isAutoUnsubscribe(),
+                loc ? ctx.grid().forLocal() : null);
 
             final QueryCursor<Cache.Entry<K, V>> cur;
 
@@ -1226,7 +1172,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            ctx.continuousQueries().registerCacheEntryListener(lsnrCfg, true);
+            ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
         }
         catch (IgniteCheckedException e) {
             throw cacheException(e);
@@ -1237,11 +1183,11 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) {
+    @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            ctx.continuousQueries().deregisterCacheEntryListener(lsnrCfg);
+            ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
         }
         catch (IgniteCheckedException e) {
             throw cacheException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 5ee2b31..11c85ca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.processors.cache.datastructures;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -32,8 +31,6 @@ import org.apache.ignite.resources.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
-import javax.cache.*;
-import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.*;
 import java.io.*;
 import java.util.*;
@@ -60,8 +57,11 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
     /** Queue header view.  */
     private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> 
queueHdrView;
 
+    /** System cache. */
+    private GridCacheAdapter<Object, Object> sysCache;
+
     /** Query notifying about queue update. */
-    private QueryCursor<Cache.Entry<Object, Object>> queueQryCur;
+    private UUID queueQryId;
 
     /** Queue query creation guard. */
     private final AtomicBoolean queueQryGuard = new AtomicBoolean();
@@ -89,6 +89,8 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
         try {
             queueHdrView = 
cctx.cache().projection(GridCacheQueueHeaderKey.class, 
GridCacheQueueHeader.class);
 
+            sysCache = cctx.kernalContext().cache().utilityCache();
+
             initFlag = true;
         }
         finally {
@@ -100,8 +102,8 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
     @Override protected void onKernalStop0(boolean cancel) {
         busyLock.block();
 
-        if (queueQryCur != null)
-            queueQryCur.close();
+        if (queueQryId != null)
+            cctx.continuousQueries().cancelInternalQuery(queueQryId);
 
         for (GridCacheQueueProxy q : queuesMap.values())
             q.delegate().onKernalStop();
@@ -184,48 +186,43 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
                 return null;
 
             if (queueQryGuard.compareAndSet(false, true)) {
-                ContinuousQuery<Object, Object> qry = Query.continuous();
-
-                qry.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
-                    @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                        if (!busyLock.enterBusy())
-                            return;
+                queueQryId = 
sysCache.context().continuousQueries().executeInternalQuery(
+                    new CacheEntryUpdatedListener<Object, Object>() {
+                        @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                            if (!busyLock.enterBusy())
+                                return;
 
-                        try {
-                            for (CacheEntryEvent<?, ?> e : evts) {
-                                GridCacheQueueHeaderKey key = 
(GridCacheQueueHeaderKey)e.getKey();
-                                GridCacheQueueHeader hdr = 
(GridCacheQueueHeader)e.getValue();
+                            try {
+                                for (CacheEntryEvent<?, ?> e : evts) {
+                                    GridCacheQueueHeaderKey key = 
(GridCacheQueueHeaderKey)e.getKey();
+                                    GridCacheQueueHeader hdr = 
(GridCacheQueueHeader)e.getValue();
 
-                                for (final GridCacheQueueProxy queue : 
queuesMap.values()) {
-                                    if (queue.name().equals(key.queueName())) {
-                                        if (hdr == null) {
-                                            GridCacheQueueHeader oldHdr = 
(GridCacheQueueHeader)e.getOldValue();
+                                    for (final GridCacheQueueProxy queue : 
queuesMap.values()) {
+                                        if 
(queue.name().equals(key.queueName())) {
+                                            if (hdr == null) {
+                                                GridCacheQueueHeader oldHdr = 
(GridCacheQueueHeader)e.getOldValue();
 
-                                            assert oldHdr != null;
+                                                assert oldHdr != null;
 
-                                            if 
(oldHdr.id().equals(queue.delegate().id())) {
-                                                
queue.delegate().onRemoved(false);
+                                                if 
(oldHdr.id().equals(queue.delegate().id())) {
+                                                    
queue.delegate().onRemoved(false);
 
-                                                
queuesMap.remove(queue.delegate().id());
+                                                    
queuesMap.remove(queue.delegate().id());
+                                                }
                                             }
+                                            else
+                                                
queue.delegate().onHeaderChanged(hdr);
                                         }
-                                        else
-                                            
queue.delegate().onHeaderChanged(hdr);
                                     }
                                 }
                             }
+                            finally {
+                                busyLock.leaveBusy();
+                            }
                         }
-                        finally {
-                            busyLock.leaveBusy();
-                        }
-                    }
-                });
-
-                qry.setRemoteFilter(new QueueHeaderPredicate());
-
-                IgniteCache<Object, Object> jCache = 
cctx.kernalContext().cache().utilityJCache();
-
-                queueQryCur = cctx.isLocal() || cctx.isReplicated() ? 
jCache.localQuery(qry) : jCache.query(qry);
+                    },
+                    new QueueHeaderPredicate(),
+                    cctx.isLocal() || cctx.isReplicated());
             }
 
             GridCacheQueueProxy queue = queuesMap.get(hdr.id());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 92f0f0a..c8d9fec 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -172,14 +172,6 @@ class CacheContinuousQueryEntry<K, V> implements 
GridCacheDeployable, Externaliz
         return oldVal;
     }
 
-    /**
-     * Nullifies old value.
-     */
-    void nullifyOldValue() {
-        oldVal = null;
-        oldValBytes = null;
-    }
-
     /** {@inheritDoc} */
     @Override public void prepare(GridDeploymentInfo depInfo) {
         this.depInfo = depInfo;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 5c03488..ece5a01 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -29,8 +29,8 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
-import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.*;
+import javax.cache.event.EventType;
 import java.io.*;
 import java.util.*;
 
@@ -49,33 +49,30 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     /** Topic for ordered messages. */
     private Object topic;
 
-    /** Local callback. */
-    private CacheEntryUpdatedListener<K, V> locLsnr;
+    /** Local listener. */
+    private transient CacheEntryUpdatedListener<K, V> locLsnr;
 
-    /** Filter. */
+    /** Remote filter. */
     private CacheEntryEventFilter<K, V> rmtFilter;
 
     /** Deployable object for filter. */
-    private DeployableObject filterDep;
+    private DeployableObject rmtFilterDep;
 
     /** Internal flag. */
     private boolean internal;
 
-    /** Entry listener flag. */
-    private boolean entryLsnr;
+    /** Old value required flag. */
+    private boolean oldValRequired;
 
-    /** Synchronous listener flag. */
+    /** Synchronous flag. */
     private boolean sync;
 
-    /** {@code True} if old value is required. */
-    private boolean oldVal;
+    /** Ignore expired events flag. */
+    private boolean ignoreExpired;
 
     /** Task name hash code. */
     private int taskHash;
 
-    /** Keep portable flag. */
-    private boolean keepPortable;
-
     /** Whether to skip primary check for REPLICATED cache. */
     private transient boolean skipPrimaryCheck;
 
@@ -93,30 +90,36 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
      * @param topic Topic for ordered messages.
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
-     * @param internal If {@code true} then query is notified about internal 
entries updates.
-     * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
-     * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
-     * @param oldVal {@code True} if old value is required.
+     * @param internal Internal flag.
+     * @param oldValRequired Old value required flag.
+     * @param sync Synchronous flag.
+     * @param ignoreExpired Ignore expired events flag.
      * @param skipPrimaryCheck Whether to skip primary check for REPLICATED 
cache.
      * @param taskHash Task name hash code.
      */
-    public CacheContinuousQueryHandler(@Nullable String cacheName, Object 
topic,
-        CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> 
rmtFilter, boolean internal,
-        boolean entryLsnr, boolean sync, boolean oldVal, boolean 
skipPrimaryCheck, int taskHash, boolean keepPortable) {
+    public CacheContinuousQueryHandler(
+        String cacheName,
+        Object topic,
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        CacheEntryEventFilter<K, V> rmtFilter,
+        boolean internal,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        int taskHash,
+        boolean skipPrimaryCheck) {
         assert topic != null;
         assert locLsnr != null;
-        assert !sync || entryLsnr;
 
         this.cacheName = cacheName;
         this.topic = topic;
         this.locLsnr = locLsnr;
         this.rmtFilter = rmtFilter;
         this.internal = internal;
-        this.entryLsnr = entryLsnr;
+        this.oldValRequired = oldValRequired;
         this.sync = sync;
-        this.oldVal = oldVal;
+        this.ignoreExpired = ignoreExpired;
         this.taskHash = taskHash;
-        this.keepPortable = keepPortable;
         this.skipPrimaryCheck = skipPrimaryCheck;
     }
 
@@ -170,7 +173,11 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                 }
             }
 
-            @Override public void onEntryUpdate(CacheContinuousQueryEvent<K, 
V> evt, boolean primary, boolean recordIgniteEvt) {
+            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, 
V> evt, boolean primary,
+                boolean recordIgniteEvt) {
+                if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
+                    return;
+
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 if (cctx.isReplicated() && !skipPrimaryCheck && !primary)
@@ -190,9 +197,6 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                 }
 
                 if (notify) {
-                    if (!oldVal)
-                        evt.entry().nullifyOldValue();
-
                     if (loc)
                         locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? 
extends V>>asList(evt));
                     else {
@@ -218,7 +222,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                         }
                     }
 
-                    if (!entryLsnr && recordIgniteEvt) {
+                    if (recordIgniteEvt) {
                         ctx.event().record(new CacheQueryReadEvent<>(
                             ctx.discovery().localNode(),
                             "Continuous query executed.",
@@ -242,16 +246,20 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
             }
 
             @Override public void onUnregister() {
-                if (rmtFilter != null && rmtFilter instanceof 
CacheContinuousQueryFilterEx)
+                if (rmtFilter instanceof CacheContinuousQueryFilterEx)
                     
((CacheContinuousQueryFilterEx)rmtFilter).onQueryUnregister();
             }
 
-            @Nullable private String taskName() {
+            @Override public boolean oldValueRequired() {
+                return oldValRequired;
+            }
+
+            private String taskName() {
                 return ctx.security().enabled() ? 
ctx.task().resolveTaskName(taskHash) : null;
             }
         };
 
-        return manager(ctx).registerListener(routineId, lsnr, internal, 
entryLsnr);
+        return manager(ctx).registerListener(routineId, lsnr, internal);
     }
 
     /** {@inheritDoc} */
@@ -332,7 +340,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         assert ctx.config().isPeerClassLoadingEnabled();
 
         if (rmtFilter != null && !U.isGrid(rmtFilter.getClass()))
-            filterDep = new DeployableObject(rmtFilter, ctx);
+            rmtFilterDep = new DeployableObject(rmtFilter, ctx);
     }
 
     /** {@inheritDoc} */
@@ -341,8 +349,8 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         assert ctx != null;
         assert ctx.config().isPeerClassLoadingEnabled();
 
-        if (filterDep != null)
-            rmtFilter = filterDep.unmarshal(nodeId, ctx);
+        if (rmtFilterDep != null)
+            rmtFilter = rmtFilterDep.unmarshal(nodeId, ctx);
     }
 
     /** {@inheritDoc} */
@@ -355,21 +363,20 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         U.writeString(out, cacheName);
         out.writeObject(topic);
 
-        boolean b = filterDep != null;
+        boolean b = rmtFilterDep != null;
 
         out.writeBoolean(b);
 
         if (b)
-            out.writeObject(filterDep);
+            out.writeObject(rmtFilterDep);
         else
             out.writeObject(rmtFilter);
 
         out.writeBoolean(internal);
-        out.writeBoolean(entryLsnr);
+        out.writeBoolean(oldValRequired);
         out.writeBoolean(sync);
-        out.writeBoolean(oldVal);
+        out.writeBoolean(ignoreExpired);
         out.writeInt(taskHash);
-        out.writeBoolean(keepPortable);
     }
 
     /** {@inheritDoc} */
@@ -381,16 +388,15 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         boolean b = in.readBoolean();
 
         if (b)
-            filterDep = (DeployableObject)in.readObject();
+            rmtFilterDep = (DeployableObject)in.readObject();
         else
             rmtFilter = (CacheEntryEventFilter<K, V>)in.readObject();
 
         internal = in.readBoolean();
-        entryLsnr = in.readBoolean();
+        oldValRequired = in.readBoolean();
         sync = in.readBoolean();
-        oldVal = in.readBoolean();
+        ignoreExpired = in.readBoolean();
         taskHash = in.readInt();
-        keepPortable = in.readBoolean();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 69e9523..21c45f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -32,10 +32,12 @@ interface CacheContinuousQueryListener<K, V> {
      * @param e Entry.
      * @param recordIgniteEvt Whether to record event.
      */
-    public void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt, boolean 
primary, boolean recordIgniteEvt);
+    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean 
primary, boolean recordIgniteEvt);
 
     /**
      * Listener unregistered callback.
      */
     public void onUnregister();
+
+    public boolean oldValueRequired();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 52e915e..92416b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -18,13 +18,15 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
+import org.apache.ignite.plugin.security.*;
 import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
 
 import javax.cache.configuration.*;
 import javax.cache.event.*;
@@ -34,6 +36,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static javax.cache.event.EventType.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.GridTopic.*;
 
@@ -41,8 +44,17 @@ import static org.apache.ignite.internal.GridTopic.*;
  * Continuous queries manager.
  */
 public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
-    /** Ordered topic prefix. */
-    private String topicPrefix;
+    /** */
+    private static final byte CREATED_FLAG = 0b0001;
+
+    /** */
+    private static final byte UPDATED_FLAG = 0b0010;
+
+    /** */
+    private static final byte REMOVED_FLAG = 0b0100;
+
+    /** */
+    private static final byte EXPIRED_FLAG = 0b1000;
 
     /** Listeners. */
     private final ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> 
lsnrs = new ConcurrentHashMap8<>();
@@ -59,9 +71,12 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
     /** Query sequence number for message topic. */
     private final AtomicLong seq = new AtomicLong();
 
-//    /** Continues queries created for cache event listeners. */
-//    private final ConcurrentMap<CacheEntryListenerConfiguration, 
CacheContinuousQuery<K, V>> lsnrQrys =
-//        new ConcurrentHashMap8<>();
+    /** JCache listeners. */
+    private final ConcurrentMap<CacheEntryListenerConfiguration, JCacheQuery> 
jCacheLsnrs =
+        new ConcurrentHashMap8<>();
+
+    /** Ordered topic prefix. */
+    private String topicPrefix;
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
@@ -72,11 +87,11 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected void onKernalStart0() throws IgniteCheckedException {
-        Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = 
cctx.config().getCacheEntryListenerConfigurations();
+        Iterable<CacheEntryListenerConfiguration<K, V>> cfgs = 
cctx.config().getCacheEntryListenerConfigurations();
 
-        if (lsnrCfgs != null) {
-            for (CacheEntryListenerConfiguration<K, V> cfg : lsnrCfgs)
-                registerCacheEntryListener(cfg, false);
+        if (cfgs != null) {
+            for (CacheEntryListenerConfiguration<K, V> cfg : cfgs)
+                executeJCacheQuery(cfg, true);
         }
     }
 
@@ -84,22 +99,15 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
     @Override protected void onKernalStop0(boolean cancel) {
         super.onKernalStop0(cancel);
 
-//        for (CacheEntryListenerConfiguration lsnrCfg : lsnrQrys.keySet()) {
-//            try {
-//                deregisterCacheEntryListener(lsnrCfg);
-//            }
-//            catch (IgniteCheckedException e) {
-//                if (log.isDebugEnabled())
-//                    log.debug("Failed to remove cache entry listener: " + e);
-//            }
-//        }
-    }
-
-    /**
-     * @return New topic.
-     */
-    public Object topic() {
-        return TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement());
+        for (JCacheQuery lsnr : jCacheLsnrs.values()) {
+            try {
+                lsnr.cancel();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to stop JCache entry listener: " + 
e.getMessage());
+            }
+        }
     }
 
     /**
@@ -109,16 +117,10 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @param newBytes New value bytes.
      * @param oldVal Old value.
      * @param oldBytes Old value bytes.
-     * @param preload {@code True} if entry is updated during preloading.
      * @throws IgniteCheckedException In case of error.
      */
-    public void onEntryUpdate(GridCacheEntryEx<K, V> e,
-        K key,
-        @Nullable V newVal,
-        @Nullable GridCacheValueBytes newBytes,
-        V oldVal,
-        @Nullable GridCacheValueBytes oldBytes,
-        boolean preload) throws IgniteCheckedException {
+    public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, 
GridCacheValueBytes newBytes,
+        V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException {
         assert e != null;
         assert key != null;
 
@@ -147,12 +149,8 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
         boolean primary = e.wrap(false).primary();
         boolean recordIgniteEvt = !e.isInternal() && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
-        for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
-//            if (preload && lsnr.entryListener())
-//                continue;
-
-            lsnr.onEntryUpdate(evt, primary, recordIgniteEvt);
-        }
+        for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values())
+            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
     }
 
     /**
@@ -161,10 +159,10 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @param oldVal Old value.
      * @param oldBytes Old value bytes.
      */
-    public void onEntryExpired(GridCacheEntryEx<K, V> e,
-        K key,
-        V oldVal,
-        @Nullable GridCacheValueBytes oldBytes) {
+    public void onEntryExpired(GridCacheEntryEx<K, V> e, K key, V oldVal, 
GridCacheValueBytes oldBytes) {
+        assert e != null;
+        assert key != null;
+
         if (e.isInternal())
             return;
 
@@ -179,106 +177,185 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
             CacheContinuousQueryEvent<K, V> evt = new 
CacheContinuousQueryEvent<>(
                 cctx.kernalContext().grid().jcache(cctx.name()), EXPIRED, e0);
 
-            for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
-//                if (!lsnr.entryListener())
-//                    continue;
+            for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values())
+                lsnr.onEntryUpdated(evt, true, false);
+        }
+    }
 
-                lsnr.onEntryUpdate(evt, true, false);
-            }
+    /**
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @param grp Cluster group.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public UUID executeQuery(CacheEntryUpdatedListener<K, V> locLsnr, 
CacheEntryEventFilter<K, V> rmtFilter,
+        int bufSize, long timeInterval, boolean autoUnsubscribe, ClusterGroup 
grp) throws IgniteCheckedException {
+        return executeQuery0(
+            locLsnr,
+            rmtFilter,
+            bufSize,
+            timeInterval,
+            autoUnsubscribe,
+            false,
+            true,
+            false,
+            true,
+            grp);
+    }
+
+    /**
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param loc Local flag.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public UUID executeInternalQuery(CacheEntryUpdatedListener<K, V> locLsnr, 
CacheEntryEventFilter<K, V> rmtFilter,
+        boolean loc) throws IgniteCheckedException {
+        return executeQuery0(
+            locLsnr,
+            rmtFilter,
+            ContinuousQuery.DFLT_BUF_SIZE,
+            ContinuousQuery.DFLT_TIME_INTERVAL,
+            ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
+            true,
+            true,
+            false,
+            true,
+            loc ? cctx.grid().forLocal() : null);
+    }
+
+    public void cancelInternalQuery(UUID routineId) {
+        try {
+            cctx.kernalContext().continuous().stopRoutine(routineId).get();
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to stop internal continuous query: " + 
e.getMessage());
         }
     }
 
     /**
-     * @param lsnrCfg Listener configuration.
-     * @param addToCfg If {@code true} adds listener configuration to cache 
configuration.
-     * @throws IgniteCheckedException If failed.
+     * @param cfg Listener configuration.
+     * @param onStart Whether listener is created on node start.
+     * @throws IgniteCheckedException
      */
-    @SuppressWarnings("unchecked")
-    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, 
V> lsnrCfg, boolean addToCfg)
+    public void executeJCacheQuery(CacheEntryListenerConfiguration<K, V> cfg, 
boolean onStart)
         throws IgniteCheckedException {
-//        GridCacheContinuousQueryAdapter<K, V> qry = null;
-//
-//        try {
-//            A.notNull(lsnrCfg, "lsnrCfg");
-//
-//            Factory<CacheEntryListener<? super K, ? super V>> factory = 
lsnrCfg.getCacheEntryListenerFactory();
-//
-//            A.notNull(factory, "cacheEntryListenerFactory");
-//
-//            CacheEntryListener lsnr = factory.create();
-//
-//            A.notNull(lsnr, "lsnr");
-//
-//            IgniteCacheProxy<K, V> cache= 
cctx.kernalContext().cache().jcache(cctx.name());
-//
-//            EntryListenerCallback cb = new EntryListenerCallback(cache, 
lsnr);
-//
-//            if (!(cb.create() || cb.update() || cb.remove() || cb.expire()))
-//                throw new IllegalArgumentException("Listener must implement 
one of CacheEntryListener sub-interfaces.");
-//
-//            qry = (GridCacheContinuousQueryAdapter<K, 
V>)cctx.cache().queries().createContinuousQuery();
-//
-//            CacheContinuousQuery<K, V> old = lsnrQrys.putIfAbsent(lsnrCfg, 
qry);
-//
-//            if (old != null)
-//                throw new IllegalArgumentException("Listener is already 
registered for configuration: " + lsnrCfg);
-//
-//            qry.autoUnsubscribe(true);
-//
-//            qry.bufferSize(1);
-//
-////            qry.localCallback(cb);
-//
-//            EntryListenerFilter<K, V> fltr = new 
EntryListenerFilter<>(cb.create(),
-//                cb.update(),
-//                cb.remove(),
-//                cb.expire(),
-//                lsnrCfg.getCacheEntryEventFilterFactory(),
-//                cctx.kernalContext().grid(),
-//                cctx.name());
-//
-////            qry.remoteFilter(fltr);
-//
-//            qry.execute(null, false, true, lsnrCfg.isSynchronous(), 
lsnrCfg.isOldValueRequired());
-//
-//            if (addToCfg)
-//                cctx.config().addCacheEntryListenerConfiguration(lsnrCfg);
-//        }
-//        catch (IgniteCheckedException e) {
-//            lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to 
execute it.
-//
-//            throw e;
-//        }
+        JCacheQuery lsnr = new JCacheQuery(cfg, onStart);
+
+        JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr);
+
+        if (old != null)
+            throw new IgniteCheckedException("Listener is already registered 
for configuration: " + cfg);
+
+        try {
+            lsnr.execute();
+        }
+        catch (IgniteCheckedException e) {
+            cancelJCacheQuery(cfg);
+
+            throw e;
+        }
     }
 
     /**
-     * @param lsnrCfg Listener configuration.
-     * @throws IgniteCheckedException If failed.
+     * @param cfg Listener configuration.
+     * @throws IgniteCheckedException In case of error.
      */
-    @SuppressWarnings("unchecked")
-    public void deregisterCacheEntryListener(CacheEntryListenerConfiguration 
lsnrCfg) throws IgniteCheckedException {
-        A.notNull(lsnrCfg, "lsnrCfg");
-
-//        CacheContinuousQuery<K, V> qry = lsnrQrys.remove(lsnrCfg);
-//
-//        if (qry != null) {
-//            cctx.config().removeCacheEntryListenerConfiguration(lsnrCfg);
-//
-//            qry.close();
-//        }
+    public void cancelJCacheQuery(CacheEntryListenerConfiguration<K, V> cfg) 
throws IgniteCheckedException {
+        JCacheQuery lsnr = jCacheLsnrs.remove(cfg);
+
+        if (lsnr != null)
+            lsnr.cancel();
+    }
+
+    /**
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @param internal Internal flag.
+     * @param oldValRequired Old value required flag.
+     * @param sync Synchronous flag.
+     * @param ignoreExpired Ignore expired event flag.
+     * @param grp Cluster group.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, 
CacheEntryEventFilter<K, V> rmtFilter,
+        int bufSize, long timeInterval, boolean autoUnsubscribe, boolean 
internal, boolean oldValRequired,
+        boolean sync, boolean ignoreExpired, ClusterGroup grp) throws 
IgniteCheckedException {
+        cctx.checkSecurity(GridSecurityPermission.CACHE_READ);
+
+        if (grp == null)
+            grp = cctx.kernalContext().grid();
+
+        Collection<ClusterNode> nodes = grp.nodes();
+
+        if (nodes.isEmpty())
+            throw new ClusterTopologyException("Failed to execute continuous 
query (empty cluster group is " +
+                "provided).");
+
+        boolean skipPrimaryCheck = false;
+
+        switch (cctx.config().getCacheMode()) {
+            case LOCAL:
+                if (!nodes.contains(cctx.localNode()))
+                    throw new ClusterTopologyException("Continuous query for 
LOCAL cache can be executed " +
+                        "only locally (provided projection contains remote 
nodes only).");
+                else if (nodes.size() > 1)
+                    U.warn(log, "Continuous query for LOCAL cache will be 
executed locally (provided projection is " +
+                        "ignored).");
+
+                grp = grp.forNode(cctx.localNode());
+
+                break;
+
+            case REPLICATED:
+                if (nodes.size() == 1 && 
F.first(nodes).equals(cctx.localNode())) {
+                    CacheDistributionMode distributionMode = 
cctx.config().getDistributionMode();
+
+                    if (distributionMode == PARTITIONED_ONLY || 
distributionMode == NEAR_PARTITIONED)
+                        skipPrimaryCheck = true;
+                }
+
+                break;
+        }
+
+        int taskNameHash = !internal && 
cctx.kernalContext().security().enabled() ?
+            cctx.kernalContext().job().currentTaskNameHash() : 0;
+
+        GridContinuousHandler hnd = new CacheContinuousQueryHandler<>(
+            cctx.name(),
+            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement()),
+            locLsnr,
+            rmtFilter,
+            internal,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            taskNameHash,
+            skipPrimaryCheck);
+
+        return cctx.kernalContext().continuous().startRoutine(hnd, bufSize, 
timeInterval,
+            autoUnsubscribe, grp.predicate()).get();
     }
 
     /**
      * @param lsnrId Listener ID.
      * @param lsnr Listener.
      * @param internal Internal flag.
-     * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
      * @return Whether listener was actually registered.
      */
     boolean registerListener(UUID lsnrId,
         CacheContinuousQueryListener<K, V> lsnr,
-        boolean internal,
-        boolean entryLsnr) {
+        boolean internal) {
         boolean added;
 
         if (internal) {
@@ -324,295 +401,215 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
     }
 
     /**
-     *
      */
-    static class EntryListenerFilter<K1, V1> implements
-        IgnitePredicate<CacheContinuousQueryEntry<K1, V1>>, Externalizable {
+    private class JCacheQuery {
         /** */
-        private static final long serialVersionUID = 0L;
+        private final CacheEntryListenerConfiguration<K, V> cfg;
 
         /** */
-        private boolean create;
+        private final boolean onStart;
 
         /** */
-        private boolean update;
+        private volatile UUID routineId;
 
-        /** */
-        private boolean rmv;
+        /**
+         * @param cfg Listener configuration.
+         */
+        private JCacheQuery(CacheEntryListenerConfiguration<K, V> cfg, boolean 
onStart) {
+            this.cfg = cfg;
+            this.onStart = onStart;
+        }
 
-        /** */
-        private boolean expire;
+        /**
+         * @throws IgniteCheckedException In case of error.
+         */
+        @SuppressWarnings("unchecked")
+        void execute() throws IgniteCheckedException {
+            if (!onStart)
+                cctx.config().addCacheEntryListenerConfiguration(cfg);
+
+            CacheEntryListener<? super K, ? super V> locLsnrImpl = 
cfg.getCacheEntryListenerFactory().create();
+
+            if (locLsnrImpl == null)
+                throw new IgniteCheckedException("Local CacheEntryListener is 
mandatory and can't be null.");
+
+            byte types = 0;
+
+            types |= locLsnrImpl instanceof CacheEntryCreatedListener ? 
CREATED_FLAG : 0;
+            types |= locLsnrImpl instanceof CacheEntryUpdatedListener ? 
UPDATED_FLAG : 0;
+            types |= locLsnrImpl instanceof CacheEntryRemovedListener ? 
REMOVED_FLAG : 0;
+            types |= locLsnrImpl instanceof CacheEntryExpiredListener ? 
EXPIRED_FLAG : 0;
+
+            if (types == 0)
+                throw new IgniteCheckedException("Listener must implement one 
of CacheEntryListener sub-interfaces.");
+
+            CacheEntryUpdatedListener<K, V> locLsnr = 
(CacheEntryUpdatedListener<K, V>)new JCacheQueryLocalListener<>(
+                locLsnrImpl);
+
+            CacheEntryEventFilter<K, V> rmtFilter = (CacheEntryEventFilter<K, 
V>)new JCacheQueryRemoteFilter<>(
+                cfg.getCacheEntryEventFilterFactory().create(), types);
+
+            routineId = executeQuery0(
+                locLsnr,
+                rmtFilter,
+                ContinuousQuery.DFLT_BUF_SIZE,
+                ContinuousQuery.DFLT_TIME_INTERVAL,
+                ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
+                false,
+                cfg.isOldValueRequired(),
+                cfg.isSynchronous(),
+                false,
+                null);
+        }
 
-        /** */
-        private Factory<CacheEntryEventFilter<? super K1, ? super V1>> 
fltrFactory;
+        /**
+         * @throws IgniteCheckedException In case of error.
+         */
+        @SuppressWarnings("unchecked")
+        void cancel() throws IgniteCheckedException {
+            UUID routineId0 = routineId;
 
-        /** */
-        private CacheEntryEventFilter fltr;
+            assert routineId0 != null;
 
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
+            cctx.kernalContext().continuous().stopRoutine(routineId0).get();
 
-        /** */
-        private IgniteCache cache;
+            cctx.config().removeCacheEntryListenerConfiguration(cfg);
+        }
+    }
 
+    /**
+     */
+    private static class JCacheQueryLocalListener<K, V> implements 
CacheEntryUpdatedListener<K, V> {
         /** */
-        private String cacheName;
+        private CacheEntryListener<K, V> impl;
 
         /**
-         *
+         * @param impl Listener.
          */
-        public EntryListenerFilter() {
-            // No-op.
-        }
+        private JCacheQueryLocalListener(CacheEntryListener<K, V> impl) {
+            assert impl != null;
 
-        /**
-         * @param create {@code True} if listens for create events.
-         * @param update {@code True} if listens for create events.
-         * @param rmv {@code True} if listens for remove events.
-         * @param expire {@code True} if listens for expire events.
-         * @param fltrFactory Filter factory.
-         * @param ignite Ignite instance.
-         * @param cacheName Cache name.
-         */
-        EntryListenerFilter(
-            boolean create,
-            boolean update,
-            boolean rmv,
-            boolean expire,
-            Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory,
-            Ignite ignite,
-            @Nullable String cacheName) {
-            this.create = create;
-            this.update = update;
-            this.rmv = rmv;
-            this.expire = expire;
-            this.fltrFactory = fltrFactory;
-            this.ignite = ignite;
-            this.cacheName = cacheName;
-
-            if (fltrFactory != null)
-                fltr = fltrFactory.create();
-
-            cache = ignite.jcache(cacheName);
-
-            assert cache != null : cacheName;
+            this.impl = impl;
         }
 
         /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public boolean apply(CacheContinuousQueryEntry<K1, V1> 
entry) {
-            return false;
-
-//            try {
-//                EventType evtType = 
(((GridCacheContinuousQueryEntry)entry).eventType());
-//
-//                switch (evtType) {
-//                    case EXPIRED:
-//                        if (!expire)
-//                            return false;
-//
-//                        break;
-//
-//                    case REMOVED:
-//                        if (!rmv)
-//                            return false;
-//
-//                        break;
-//
-//                    case CREATED:
-//                        if (!create)
-//                            return false;
-//
-//                        break;
-//
-//                    case UPDATED:
-//                        if (!update)
-//                            return false;
-//
-//                        break;
-//
-//                    default:
-//                        assert false : evtType;
-//                }
-//
-//                if (fltr == null)
-//                    return true;
-//
-//                if (cache == null) {
-//                    cache = ignite.jcache(cacheName);
-//
-//                    assert cache != null : cacheName;
-//                }
-//
-//                return fltr.evaluate(new CacheEntryEvent(cache, evtType, 
entry));
-//            }
-//            catch (Exception e) {
-//                LT.warn(ignite.log(), e, "Cache entry event filter error: " 
+ e);
-//
-//                return true;
-//            }
-        }
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, 
? extends V>> evts) {
+            for (CacheEntryEvent<? extends K, ? extends V> evt : evts) {
+                switch (evt.getEventType()) {
+                    case CREATED:
+                        assert impl instanceof CacheEntryCreatedListener;
 
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeBoolean(create);
+                        ((CacheEntryCreatedListener<K, 
V>)impl).onCreated(singleton(evt));
 
-            out.writeBoolean(update);
+                        break;
 
-            out.writeBoolean(rmv);
+                    case UPDATED:
+                        assert impl instanceof CacheEntryUpdatedListener;
 
-            out.writeBoolean(expire);
+                        ((CacheEntryUpdatedListener<K, 
V>)impl).onUpdated(singleton(evt));
 
-            U.writeString(out, cacheName);
+                        break;
 
-            out.writeObject(fltrFactory);
-        }
+                    case REMOVED:
+                        assert impl instanceof CacheEntryRemovedListener;
 
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            create = in.readBoolean();
+                        ((CacheEntryRemovedListener<K, 
V>)impl).onRemoved(singleton(evt));
 
-            update = in.readBoolean();
+                        break;
 
-            rmv = in.readBoolean();
+                    case EXPIRED:
+                        assert impl instanceof CacheEntryExpiredListener;
 
-            expire = in.readBoolean();
+                        ((CacheEntryExpiredListener<K, 
V>)impl).onExpired(singleton(evt));
 
-            cacheName = U.readString(in);
+                        break;
 
-            fltrFactory = (Factory<CacheEntryEventFilter<? super K1, ? super 
V1>>)in.readObject();
+                    default:
+                        throw new IllegalStateException("Unknown type: " + 
evt.getEventType());
+                }
+            }
+        }
 
-            if (fltrFactory != null)
-                fltr = fltrFactory.create();
+        /**
+         * @param evt Event.
+         * @return Singleton iterable.
+         */
+        private Iterable<CacheEntryEvent<? extends K, ? extends V>> singleton(
+            CacheEntryEvent<? extends K, ? extends V> evt) {
+            Collection<CacheEntryEvent<? extends K, ? extends V>> evts = new 
ArrayList<>(1);
+
+            evts.add(evt);
+
+            return evts;
         }
     }
 
     /**
-     *
      */
-    private class EntryListenerCallback implements
-        IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> {
-        /** */
-        private final IgniteCacheProxy<K, V> cache;
-
-        /** */
-        private final CacheEntryCreatedListener createLsnr;
-
+    private static class JCacheQueryRemoteFilter<K, V> implements 
CacheEntryEventFilter<K, V>, Externalizable {
         /** */
-        private final CacheEntryUpdatedListener updateLsnr;
+        private CacheEntryEventFilter<K, V> impl;
 
         /** */
-        private final CacheEntryRemovedListener rmvLsnr;
-
-        /** */
-        private final CacheEntryExpiredListener expireLsnr;
+        private byte types;
 
         /**
-         * @param cache Cache to be used as event source.
-         * @param lsnr Listener.
+         * For {@link Externalizable}.
          */
-        EntryListenerCallback(IgniteCacheProxy<K, V> cache, CacheEntryListener 
lsnr) {
-            this.cache = cache;
-
-            createLsnr = lsnr instanceof CacheEntryCreatedListener ? 
(CacheEntryCreatedListener)lsnr : null;
-            updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? 
(CacheEntryUpdatedListener)lsnr : null;
-            rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? 
(CacheEntryRemovedListener)lsnr : null;
-            expireLsnr = lsnr instanceof CacheEntryExpiredListener ? 
(CacheEntryExpiredListener)lsnr : null;
+        public JCacheQueryRemoteFilter() {
+            // no-op.
         }
 
         /**
-         * @return {@code True} if listens for create event.
+         * @param impl Filter.
+         * @param types Types.
          */
-        boolean create() {
-            return createLsnr != null;
-        }
+        JCacheQueryRemoteFilter(CacheEntryEventFilter<K, V> impl, byte types) {
+            assert types != 0;
 
-        /**
-         * @return {@code True} if listens for update event.
-         */
-        boolean update() {
-            return updateLsnr != null;
+            this.impl = impl;
+            this.types = types;
         }
 
-        /**
-         * @return {@code True} if listens for remove event.
-         */
-        boolean remove() {
-            return rmvLsnr != null;
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends K, ? 
extends V> evt) {
+            return (types & flag(evt.getEventType())) != 0 && (impl == null || 
impl.evaluate(evt));
         }
 
-        /**
-         * @return {@code True} if listens for expire event.
-         */
-        boolean expire() {
-            return expireLsnr != null;
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            out.writeObject(impl);
+            out.writeByte(types);
         }
 
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
-        @Override public boolean apply(UUID uuid,
-            Collection<CacheContinuousQueryEntry<K, V>> entries) {
-//            for (CacheContinuousQueryEntry entry : entries) {
-//                try {
-//                    EventType evtType = 
(((GridCacheContinuousQueryEntry)entry).eventType());
-//
-//                    switch (evtType) {
-//                        case EXPIRED: {
-//                            assert expireLsnr != null;
-//
-//                            CacheEntryEvent evt0 =
-//                                new CacheEntryEvent(cache, EXPIRED, entry);
-//
-//                            
expireLsnr.onExpired(Collections.singleton(evt0));
-//
-//                            break;
-//                        }
-//
-//                        case REMOVED: {
-//                            assert rmvLsnr != null;
-//
-//                            CacheEntryEvent evt0 =
-//                                new CacheEntryEvent(cache, REMOVED, entry);
-//
-//                            rmvLsnr.onRemoved(Collections.singleton(evt0));
-//
-//                            break;
-//                        }
-//
-//                        case UPDATED: {
-//                            assert updateLsnr != null;
-//
-//                            CacheEntryEvent evt0 =
-//                                new CacheEntryEvent(cache, UPDATED, entry);
-//
-//                            
updateLsnr.onUpdated(Collections.singleton(evt0));
-//
-//                            break;
-//                        }
-//
-//                        case CREATED: {
-//                            assert createLsnr != null;
-//
-//                            CacheEntryEvent evt0 =
-//                                new CacheEntryEvent(cache, CREATED, entry);
-//
-//                            
createLsnr.onCreated(Collections.singleton(evt0));
-//
-//                            break;
-//                        }
-//
-//                        default:
-//                            assert false : evtType;
-//                    }
-//                }
-//                catch (CacheEntryListenerException e) {
-//                    LT.warn(log, e, "Cache entry listener error: " + e);
-//                }
-//            }
-
-            return true;
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            impl = (CacheEntryEventFilter<K, V>)in.readObject();
+            types = in.readByte();
+        }
+
+        /**
+         * @param evtType Type.
+         * @return Flag value.
+         */
+        private byte flag(EventType evtType) {
+            switch (evtType) {
+                case CREATED:
+                    return CREATED_FLAG;
+
+                case UPDATED:
+                    return UPDATED_FLAG;
+
+                case REMOVED:
+                    return REMOVED_FLAG;
+
+                case EXPIRED:
+                    return EXPIRED_FLAG;
+
+                default:
+                    throw new IllegalStateException("Unknown type: " + 
evtType);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 10f5d36..faa8012 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.service;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
@@ -41,8 +40,6 @@ import org.apache.ignite.thread.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
-import javax.cache.*;
-import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -89,11 +86,11 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
     /** Topology listener. */
     private GridLocalEventListener topLsnr = new TopologyListener();
 
-    /** Deployment listener. */
-    private QueryCursor<Cache.Entry<Object, Object>> cfgQryCur;
+    /** Deployment listener ID. */
+    private UUID cfgQryId;
 
-    /** Assignment listener. */
-    private QueryCursor<Cache.Entry<Object, Object>> assignQryCur;
+    /** Assignment listener ID. */
+    private UUID assignQryId;
 
     /**
      * @param ctx Kernal context.
@@ -122,7 +119,7 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
         if (ctx.isDaemon())
             return;
 
-        cache = (GridCacheProjectionEx<Object, 
Object>)ctx.cache().utilityCache();
+        cache = ctx.cache().utilityCache();
 
         ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
 
@@ -130,19 +127,9 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
-            IgniteCache<Object, Object> jCache = ctx.cache().utilityJCache();
-
-            ContinuousQuery<Object, Object> cfgQry = Query.continuous();
-
-            cfgQry.setLocalListener(new DeploymentListener());
-
-            cfgQryCur = jCache.localQuery(cfgQry);
-
-            ContinuousQuery<Object, Object> assignQry = Query.continuous();
-
-            assignQry.setLocalListener(new AssignmentListener());
-
-            assignQryCur = jCache.localQuery(assignQry);
+            cfgQryId = 
cache.context().continuousQueries().executeInternalQuery(new 
DeploymentListener(), null, true);
+            assignQryId = 
cache.context().continuousQueries().executeInternalQuery(
+                new AssignmentListener(), null, true);
         }
         finally {
             if (ctx.deploy().enabled())
@@ -175,11 +162,11 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
 
         ctx.event().removeLocalEventListener(topLsnr);
 
-        if (cfgQryCur != null)
-            cfgQryCur.close();
+        if (cfgQryId != null)
+            cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
 
-        if (assignQryCur != null)
-            assignQryCur.close();
+        if (assignQryId != null)
+            
cache.context().continuousQueries().cancelInternalQuery(assignQryId);
 
         Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
index da7f47b..6b2d302 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.processors.hadoop.jobtracker;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.*;
@@ -38,7 +37,6 @@ import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.event.*;
-import javax.cache.event.CacheEntryEvent;
 import javax.cache.expiry.*;
 import javax.cache.processor.*;
 import java.io.*;
@@ -84,6 +82,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent 
{
     /** Component busy lock. */
     private GridSpinReadWriteLock busyLock;
 
+    /** Job meta query ID. */
+    private UUID jobMetaQryId;
+
     /** Closure to check result of async transform of system cache. */
     private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new 
CI1<IgniteInternalFuture<?>>() {
         @Override public void apply(IgniteInternalFuture<?> gridFut) {
@@ -138,8 +139,7 @@ public class GridHadoopJobTracker extends 
GridHadoopComponent {
                         ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
                             new Duration(MILLISECONDS, 
ctx.configuration().getFinishedJobInfoTtl()));
 
-                        finishedJobMetaPrj = 
((GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)prj).
-                            withExpiryPolicy(finishedJobPlc);
+                        finishedJobMetaPrj = 
prj.withExpiryPolicy(finishedJobPlc);
                     }
                     else
                         finishedJobMetaPrj = jobMetaPrj;
@@ -172,30 +172,29 @@ public class GridHadoopJobTracker extends 
GridHadoopComponent {
     @Override public void onKernalStart() throws IgniteCheckedException {
         super.onKernalStart();
 
-        ContinuousQuery<GridHadoopJobId, GridHadoopJobMetadata> qry = 
Query.continuous();
+        jobMetaQryId = 
jobMetaCache().context().continuousQueries().executeInternalQuery(
+            new CacheEntryUpdatedListener<GridHadoopJobId, 
GridHadoopJobMetadata>() {
+                @Override public void onUpdated(final 
Iterable<CacheEntryEvent<? extends GridHadoopJobId,
+                    ? extends GridHadoopJobMetadata>> evts) {
+                    if (!busyLock.tryReadLock())
+                        return;
 
-        qry.setLocalListener(new CacheEntryUpdatedListener<GridHadoopJobId, 
GridHadoopJobMetadata>() {
-            @Override public void onUpdated(final Iterable<CacheEntryEvent<? 
extends GridHadoopJobId,
-                ? extends GridHadoopJobMetadata>> evts) {
-                if (!busyLock.tryReadLock())
-                    return;
-
-                try {
-                    // Must process query callback in a separate thread to 
avoid deadlocks.
-                    evtProcSvc.submit(new EventHandler() {
-                        @Override protected void body() throws 
IgniteCheckedException {
-                            processJobMetadataUpdates(evts);
-                        }
-                    });
-                }
-                finally {
-                    busyLock.readUnlock();
+                    try {
+                        // Must process query callback in a separate thread to 
avoid deadlocks.
+                        evtProcSvc.submit(new EventHandler() {
+                            @Override protected void body() throws 
IgniteCheckedException {
+                                processJobMetadataUpdates(evts);
+                            }
+                        });
+                    }
+                    finally {
+                        busyLock.readUnlock();
+                    }
                 }
-            }
-        });
-
-        // TODO: Filter by type
-        ctx.kernalContext().cache().utilityJCache().localQuery(qry);
+            },
+            null,
+            true
+        );
 
         ctx.kernalContext().event().addLocalEventListener(new 
GridLocalEventListener() {
             @Override public void onEvent(final Event evt) {
@@ -228,6 +227,8 @@ public class GridHadoopJobTracker extends 
GridHadoopComponent {
         // Fail all pending futures.
         for (GridFutureAdapter<GridHadoopJobId> fut : 
activeFinishFuts.values())
             fut.onDone(new IgniteCheckedException("Failed to execute Hadoop 
map-reduce job (grid is stopping)."));
+
+        
jobMetaCache().context().continuousQueries().cancelInternalQuery(jobMetaQryId);
     }
 
     /**

Reply via email to