# Merged 6.6.3 fixes.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a86ae903
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a86ae903
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a86ae903

Branch: refs/heads/ignite-160
Commit: a86ae903e337140ddd18e966921e0de9d70ae79f
Parents: 46160c9
Author: vozerov-gridgain <[email protected]>
Authored: Thu Feb 5 16:58:14 2015 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Thu Feb 5 16:58:14 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 22 +++----
 .../affinity/GridAffinityAssignmentCache.java   | 23 ++++++-
 .../processors/cache/GridCacheMapEntry.java     | 10 ++-
 .../GridCacheContinuousQueryAdapter.java        | 45 +++++++++-----
 .../GridCacheContinuousQueryHandler.java        | 15 ++++-
 .../continuous/GridContinuousProcessor.java     | 15 ++++-
 .../portable/GridPortableInputStream.java       |  7 +++
 ...dCacheContinuousQueryReplicatedSelfTest.java | 65 ++++++++++++++++++++
 .../hadoop/jobtracker/GridHadoopJobTracker.java |  2 +-
 9 files changed, 160 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 217a2aa..b612465e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1507,15 +1507,16 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         private void onSegmentation() {
             GridSegmentationPolicy segPlc = 
ctx.config().getSegmentationPolicy();
 
+            // Always disconnect first.
+            try {
+                getSpi().disconnect();
+            }
+            catch (IgniteSpiException e) {
+                U.error(log, "Failed to disconnect discovery SPI.", e);
+            }
+
             switch (segPlc) {
                 case RESTART_JVM:
-                    try {
-                        getSpi().disconnect();
-                    }
-                    catch (IgniteSpiException e) {
-                        U.error(log, "Failed to disconnect discovery SPI.", e);
-                    }
-
                     U.warn(log, "Restarting JVM according to configured 
segmentation policy.");
 
                     restartJvm();
@@ -1523,13 +1524,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     break;
 
                 case STOP:
-                    try {
-                        getSpi().disconnect();
-                    }
-                    catch (IgniteSpiException e) {
-                        U.error(log, "Failed to disconnect discovery SPI.", e);
-                    }
-
                     U.warn(log, "Stopping local node according to configured 
segmentation policy.");
 
                     stopNode();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index cc447ea..42c3b5d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.affinity;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
@@ -35,6 +36,8 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+
 /**
  * Affinity cached function.
  */
@@ -121,6 +124,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version to calculate affinity cache for.
      * @param discoEvt Discovery event that caused this topology version 
change.
      */
+    @SuppressWarnings("IfMayBeConditional")
     public List<List<ClusterNode>> calculate(long topVer, IgniteDiscoveryEvent 
discoEvt) {
         if (log.isDebugEnabled())
             log.debug("Calculating affinity [topVer=" + topVer + ", 
locNodeId=" + ctx.localNodeId() +
@@ -142,8 +146,23 @@ public class GridAffinityAssignmentCache {
 
         List<List<ClusterNode>> prevAssignment = prev == null ? null : 
prev.assignment();
 
-        List<List<ClusterNode>> assignment = aff.assignPartitions(
-            new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, 
discoEvt, topVer, backups));
+        List<List<ClusterNode>> assignment;
+
+        if (prevAssignment != null && discoEvt != null) {
+            CacheDistributionMode distroMode = 
U.distributionMode(discoEvt.eventNode(), ctx.name());
+
+            if (distroMode == null || // no cache on node.
+                distroMode == CLIENT_ONLY || distroMode == NEAR_ONLY)
+                assignment = prevAssignment;
+            else
+                assignment = aff.assignPartitions(new 
GridCacheAffinityFunctionContextImpl(sorted, prevAssignment,
+                    discoEvt, topVer, backups));
+        }
+        else
+            assignment = aff.assignPartitions(new 
GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt,
+                topVer, backups));
+
+        assert assignment != null;
 
         GridAffinityAssignment updated = new GridAffinityAssignment(topVer, 
assignment);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6aafc5d..96ceb93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1167,8 +1167,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
             CacheMode mode = cctx.config().getCacheMode();
 
-            if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED ||
-                (tx != null && tx.local() && !isNear()))
+            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
                 cctx.continuousQueries().onEntryUpdate(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, false);
@@ -1329,8 +1328,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
                 CacheMode mode = cctx.config().getCacheMode();
 
-                if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED ||
-                    (tx != null && tx.local() && !isNear()))
+                if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
                     cctx.continuousQueries().onEntryUpdate(this, key, null, 
null, old, oldBytes, false);
 
                 cctx.dataStructures().onEntryUpdated(key, true);
@@ -2144,7 +2142,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             if (res)
                 updateMetrics(op, metrics);
 
-            if (primary || cctx.isReplicated())
+            if (primary)
                 cctx.continuousQueries().onEntryUpdate(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
@@ -3143,7 +3141,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 drReplicate(drType, val, valBytes, ver);
 
                 if (!skipQryNtf) {
-                    if (cctx.isReplicated() || 
cctx.affinity().primary(cctx.localNode(), key, topVer)) {
+                    if (cctx.affinity().primary(cctx.localNode(), key, 
topVer)) {
                         cctx.continuousQueries().onEntryUpdate(this,
                             key,
                             val,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
index ba34d6b..acd96ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
@@ -35,7 +35,7 @@ import javax.cache.event.*;
 import java.util.*;
 import java.util.concurrent.locks.*;
 
-import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
 
 /**
  * Continuous query implementation.
@@ -228,27 +228,39 @@ public class GridCacheContinuousQueryAdapter<K, V> 
implements CacheContinuousQue
         prj = prj.forCacheNodes(ctx.name());
 
         if (prj.nodes().isEmpty())
-            throw new ClusterTopologyCheckedException("Failed to execute query 
(projection is empty): " + this);
+            throw new ClusterTopologyCheckedException("Failed to continuous 
execute query (projection is empty): " +
+                this);
 
-        CacheMode mode = ctx.config().getCacheMode();
+        boolean skipPrimaryCheck = false;
 
-        if (mode == LOCAL || mode == REPLICATED) {
-            Collection<ClusterNode> nodes = prj.nodes();
+        Collection<ClusterNode> nodes = prj.nodes();
 
-            ClusterNode node = nodes.contains(ctx.localNode()) ? 
ctx.localNode() : F.rand(nodes);
+        if (nodes.isEmpty())
+            throw new ClusterTopologyCheckedException("Failed to execute 
continuous query (empty projection is " +
+                "provided): " + this);
 
-            assert node != null;
+        switch (ctx.config().getCacheMode()) {
+            case LOCAL:
+                if (!nodes.contains(ctx.localNode()))
+                    throw new ClusterTopologyCheckedException("Continuous 
query for LOCAL cache can be executed " +
+                        "only locally (provided projection contains remote 
nodes only): " + this);
+                else if (nodes.size() > 1)
+                    U.warn(log, "Continuous query for LOCAL cache will be 
executed locally (provided projection is " +
+                        "ignored): " + this);
 
-            if (nodes.size() > 1) {
-                if (node.id().equals(ctx.localNodeId()))
-                    U.warn(log, "Continuous query for " + mode + " cache can 
be run only on local node. " +
-                        "Will execute query locally: " + this);
-                else
-                    U.warn(log, "Continuous query for " + mode + " cache can 
be run only on single node. " +
-                        "Will execute query on remote node [qry=" + this + ", 
node=" + node + ']');
-            }
+                prj = prj.forNode(ctx.localNode());
 
-            prj = prj.forNode(node);
+                break;
+
+            case REPLICATED:
+                if (nodes.size() == 1 && 
F.first(nodes).equals(ctx.localNode())) {
+                    CacheDistributionMode distributionMode = 
ctx.config().getDistributionMode();
+
+                    if (distributionMode == PARTITIONED_ONLY || 
distributionMode == NEAR_PARTITIONED)
+                        skipPrimaryCheck = true;
+                }
+
+                break;
         }
 
         closeLock.lock();
@@ -271,6 +283,7 @@ public class GridCacheContinuousQueryAdapter<K, V> 
implements CacheContinuousQue
                 entryLsnr,
                 sync,
                 oldVal,
+                skipPrimaryCheck,
                 taskNameHash,
                 keepPortable);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
index 350b9b8..a03b9db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
@@ -84,6 +84,9 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
     /** Keep portable flag. */
     private boolean keepPortable;
 
+    /** Whether to skip primary check for REPLICATED cache. */
+    private transient boolean skipPrimaryCheck;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -103,6 +106,7 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
      * @param entryLsnr {@code True} if query created for {@link 
CacheEntryListener}.
      * @param sync {@code True} if query created for synchronous {@link 
CacheEntryListener}.
      * @param oldVal {@code True} if old value is required.
+     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED 
cache.
      * @param taskHash Task name hash code.
      */
     GridCacheContinuousQueryHandler(@Nullable String cacheName,
@@ -114,6 +118,7 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         boolean entryLsnr,
         boolean sync,
         boolean oldVal,
+        boolean skipPrimaryCheck,
         int taskHash,
         boolean keepPortable) {
         assert topic != null;
@@ -131,6 +136,7 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         this.oldVal = oldVal;
         this.taskHash = taskHash;
         this.keepPortable = keepPortable;
+        this.skipPrimaryCheck = skipPrimaryCheck;
     }
 
     /** {@inheritDoc} */
@@ -184,16 +190,21 @@ class GridCacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
             }
 
             @Override public void 
onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                if (cctx.isReplicated() && !skipPrimaryCheck && !e.primary())
+                    return;
+
                 boolean notify;
 
-                CacheFlag[] f = cacheContext(ctx).forceLocalRead();
+                CacheFlag[] f = cctx.forceLocalRead();
 
                 try {
                     notify = (prjPred == null || checkProjection(e)) &&
                         (filter == null || filter.apply(e));
                 }
                 finally {
-                    cacheContext(ctx).forceFlags(f);
+                    cctx.forceFlags(f);
                 }
 
                 if (notify) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 93df61f..bc66a2a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1276,7 +1276,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
          * @return Object to send or {@code null} if there is nothing to send 
for now.
          */
         @Nullable Collection<Object> add(@Nullable Object obj) {
-            Collection<Object> toSnd = null;
+            ConcurrentLinkedDeque8 buf0 = null;
 
             if (buf.sizex() >= bufSize - 1) {
                 lock.writeLock().lock();
@@ -1284,7 +1284,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 try {
                     buf.add(obj);
 
-                    toSnd = buf;
+                    buf0 = buf;
 
                     buf = new ConcurrentLinkedDeque8<>();
 
@@ -1306,7 +1306,16 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 }
             }
 
-            return toSnd != null ? new ArrayList<>(toSnd) : null;
+            Collection<Object> toSnd = null;
+
+            if (buf0 != null) {
+                toSnd = new ArrayList<>(buf0.sizex());
+
+                for (Object o : buf0)
+                    toSnd.add(o);
+            }
+
+            return toSnd;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
index 8ab16f2..3c676bf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
@@ -174,4 +174,11 @@ public interface GridPortableInputStream extends 
GridPortableStream {
      * @return Remaining data.
      */
     public int remaining();
+
+    /**
+     * Length of data inside array.
+     *
+     * @param len Length.
+     */
+    public void length(int len);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
index 39dfda0..dddda8c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.cache.query.continuous;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
 
 import java.util.*;
 import java.util.concurrent.*;
@@ -45,6 +46,7 @@ public class GridCacheContinuousQueryReplicatedSelfTest 
extends GridCacheContinu
     /**
      * @throws Exception If failed.
      */
+    @SuppressWarnings("unchecked")
     public void testRemoteNodeCallback() throws Exception {
         GridCache<Integer, Integer> cache1 = grid(0).cache(null);
 
@@ -79,4 +81,67 @@ public class GridCacheContinuousQueryReplicatedSelfTest 
extends GridCacheContinu
 
         assertEquals(10, val.get().intValue());
     }
+
+    /**
+     * Ensure that every node see every update.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testCrossCallback() throws Exception {
+        // Prepare.
+        GridCache<Integer, Integer> cache1 = grid(0).cache(null);
+        GridCache<Integer, Integer> cache2 = grid(1).cache(null);
+
+        final int key1 = primaryKey(cache1);
+        final int key2 = primaryKey(cache2);
+
+        final CountDownLatch latch1 = new CountDownLatch(2);
+        final CountDownLatch latch2 = new CountDownLatch(2);
+
+
+        // Start query on the first node.
+        CacheContinuousQuery<Integer, Integer> qry1 = 
cache1.queries().createContinuousQuery();
+
+        qry1.localCallback(new IgniteBiPredicate<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
+            @Override public boolean apply(UUID nodeID,
+                Collection<CacheContinuousQueryEntry<Integer, Integer>> 
entries) {
+                for (CacheContinuousQueryEntry entry : entries) {
+                    log.info("Update in cache 1: " + entry);
+
+                    if (entry.getKey() == key1 || entry.getKey() == key2)
+                        latch1.countDown();
+                }
+
+                return latch1.getCount() != 0;
+            }
+        });
+
+        qry1.execute();
+
+        // Start query on the second node.
+        CacheContinuousQuery<Integer, Integer> qry2 = 
cache2.queries().createContinuousQuery();
+
+        qry2.localCallback(new IgniteBiPredicate<UUID, 
Collection<CacheContinuousQueryEntry<Integer, Integer>>>() {
+            @Override public boolean apply(UUID nodeID,
+                Collection<CacheContinuousQueryEntry<Integer, Integer>> 
entries) {
+                for (CacheContinuousQueryEntry entry : entries) {
+                    log.info("Update in cache 2: " + entry);
+
+                    if (entry.getKey() == key1 || entry.getKey() == key2)
+                        latch2.countDown();
+                }
+
+                return latch2.getCount() != 0;
+            }
+        });
+
+        qry2.execute();
+
+        cache1.put(key1, key1);
+        cache1.put(key2, key2);
+
+        assert latch1.await(LATCH_TIMEOUT, MILLISECONDS);
+        assert latch2.await(LATCH_TIMEOUT, MILLISECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
index 1b9b4cb..2a771b8 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
@@ -194,7 +194,7 @@ public class GridHadoopJobTracker extends 
GridHadoopComponent {
                 }
             });
 
-        qry.execute();
+        qry.execute(ctx.kernalContext().grid().forLocal());
 
         ctx.kernalContext().event().addLocalEventListener(new 
GridLocalEventListener() {
             @Override public void onEvent(final IgniteEvent evt) {

Reply via email to