http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
new file mode 100644
index 0000000..308316d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -0,0 +1,1744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.compute.ComputeJobResultPolicy.*;
+import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
+
+/**
+ *
+ */
+public class GridClosureProcessor extends GridProcessorAdapter {
+    /** */
+    private final Executor sysPool;
+
+    /** */
+    private final Executor pubPool;
+
+    /** */
+    private final Executor ggfsPool;
+
+    /** Lock to control execution after stop. */
+    private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+
+    /** Workers count. */
+    private final LongAdder workersCnt = new LongAdder();
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public GridClosureProcessor(GridKernalContext ctx) {
+        super(ctx);
+
+        sysPool = ctx.config().getSystemExecutorService();
+        pubPool = ctx.config().getExecutorService();
+        ggfsPool = ctx.config().getGgfsExecutorService();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Started closure processor.");
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("BusyWait")
+    @Override public void onKernalStop(boolean cancel) {
+        busyLock.writeLock();
+
+        boolean interrupted = Thread.interrupted();
+
+        while (workersCnt.sum() != 0) {
+            try {
+                Thread.sleep(200);
+            }
+            catch (InterruptedException ignored) {
+                interrupted = true;
+            }
+        }
+
+        if (interrupted)
+            Thread.currentThread().interrupt();
+
+        if (log.isDebugEnabled())
+            log.debug("Stopped closure processor.");
+    }
+
+    /**
+     * @throws IllegalStateException If grid is stopped.
+     */
+    private void enterBusy() throws IllegalStateException {
+        if (!busyLock.tryReadLock())
+            throw new IllegalStateException("Closure processor cannot be used 
on stopped grid: " + ctx.gridName());
+    }
+
+    /**
+     * Unlocks busy lock.
+     */
+    private void leaveBusy() {
+        busyLock.readUnlock();
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param jobs Closures to execute.
+     * @param nodes Grid nodes.
+     * @return Task execution future.
+     */
+    public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable 
Collection<? extends Runnable> jobs,
+        @Nullable Collection<ClusterNode> nodes) {
+        return runAsync(mode, jobs, nodes, false);
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param jobs Closures to execute.
+     * @param nodes Grid nodes.
+     * @param sys If {@code true}, then system pool will be used.
+     * @return Task execution future.
+     */
+    public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable 
Collection<? extends Runnable> jobs,
+        @Nullable Collection<ClusterNode> nodes, boolean sys) {
+        assert mode != null;
+
+        enterBusy();
+
+        try {
+            if (F.isEmpty(jobs))
+                return new GridFinishedFuture(ctx);
+
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture(ctx, U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T1(mode, jobs), null, sys);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param job Closure to execute.
+     * @param nodes Grid nodes.
+     * @return Task execution future.
+     */
+    public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable 
Runnable job,
+        @Nullable Collection<ClusterNode> nodes) {
+        return runAsync(mode, job, nodes, false);
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param job Closure to execute.
+     * @param nodes Grid nodes.
+     * @param sys If {@code true}, then system pool will be used.
+     * @return Task execution future.
+     */
+    public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable 
Runnable job,
+        @Nullable Collection<ClusterNode> nodes, boolean sys) {
+        assert mode != null;
+
+        enterBusy();
+
+        try {
+            if (job == null)
+                return new GridFinishedFuture(ctx);
+
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture(ctx, U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T2(mode, job), null, sys);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Maps {@link Runnable} jobs to specified nodes based on distribution 
mode.
+     *
+     * @param mode Distribution mode.
+     * @param jobs Closures to map.
+     * @param nodes Grid nodes.
+     * @param lb Load balancer.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     * @return Mapping.
+     */
+    private Map<ComputeJob, ClusterNode> absMap(GridClosureCallMode mode, 
Collection<? extends Runnable> jobs,
+        Collection<ClusterNode> nodes, ComputeLoadBalancer lb) throws 
IgniteCheckedException {
+        assert mode != null;
+        assert jobs != null;
+        assert nodes != null;
+        assert lb != null;
+
+        if (!F.isEmpty(jobs) && !F.isEmpty(nodes)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1);
+
+            JobMapper mapper = new JobMapper(map);
+
+            switch (mode) {
+                case BROADCAST: {
+                    for (ClusterNode n : nodes)
+                        for (Runnable r : jobs)
+                            mapper.map(job(r), n);
+
+                    break;
+                }
+
+                case BALANCE: {
+                    for (Runnable r : jobs) {
+                        ComputeJob job = job(r);
+
+                        mapper.map(job, lb.getBalancedNode(job, null));
+                    }
+
+                    break;
+                }
+            }
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * Maps {@link Callable} jobs to specified nodes based on distribution 
mode.
+     *
+     * @param mode Distribution mode.
+     * @param jobs Closures to map.
+     * @param nodes Grid nodes.
+     * @param lb Load balancer.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     * @return Mapping.
+     */
+    private <R> Map<ComputeJob, ClusterNode> outMap(GridClosureCallMode mode,
+        Collection<? extends Callable<R>> jobs, Collection<ClusterNode> nodes, 
ComputeLoadBalancer lb)
+        throws IgniteCheckedException {
+        assert mode != null;
+        assert jobs != null;
+        assert nodes != null;
+        assert lb != null;
+
+        if (!F.isEmpty(jobs) && !F.isEmpty(nodes)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1);
+
+            JobMapper mapper = new JobMapper(map);
+
+            switch (mode) {
+                case BROADCAST: {
+                    for (ClusterNode n : nodes)
+                        for (Callable<R> c : jobs)
+                            mapper.map(job(c), n);
+
+                    break;
+                }
+
+                case BALANCE: {
+                    for (Callable<R> c : jobs) {
+                        ComputeJob job = job(c);
+
+                        mapper.map(job, lb.getBalancedNode(job, null));
+                    }
+
+                    break;
+                }
+            }
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param jobs Closures to execute.
+     * @param rdc Reducer.
+     * @param nodes Grid nodes.
+     * @param <R1> Type.
+     * @param <R2> Type.
+     * @return Reduced result.
+     */
+    public <R1, R2> IgniteFuture<R2> forkjoinAsync(GridClosureCallMode mode,
+        @Nullable Collection<? extends Callable<R1>> jobs,
+        @Nullable IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> 
nodes) {
+        assert mode != null;
+
+        enterBusy();
+
+        try {
+            if (F.isEmpty(jobs) || rdc == null)
+                return new GridFinishedFuture<>(ctx);
+
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T3<>(mode, jobs, rdc), null);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param jobs Closures to execute.
+     * @param nodes Grid nodes.
+     * @param <R> Type.
+     * @return Grid future for collection of closure results.
+     */
+    public <R> IgniteFuture<Collection<R>> callAsync(
+        GridClosureCallMode mode,
+        @Nullable Collection<? extends Callable<R>> jobs,
+        @Nullable Collection<ClusterNode> nodes) {
+        return callAsync(mode, jobs, nodes, false);
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param jobs Closures to execute.
+     * @param nodes Grid nodes.
+     * @param sys If {@code true}, then system pool will be used.
+     * @param <R> Type.
+     * @return Grid future for collection of closure results.
+     */
+    public <R> IgniteFuture<Collection<R>> callAsync(GridClosureCallMode mode,
+        @Nullable Collection<? extends Callable<R>> jobs, @Nullable 
Collection<ClusterNode> nodes,
+        boolean sys) {
+        assert mode != null;
+
+        enterBusy();
+
+        try {
+            if (F.isEmpty(jobs))
+                return new GridFinishedFuture<>(ctx);
+
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T6<>(mode, jobs), null, sys);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     *
+     * @param mode Distribution mode.
+     * @param job Closure to execute.
+     * @param nodes Grid nodes.
+     * @param <R> Type.
+     * @return Grid future for collection of closure results.
+     */
+    public <R> IgniteFuture<R> callAsync(GridClosureCallMode mode,
+        @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) {
+        return callAsync(mode, job, nodes, false);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param affKey Affinity key.
+     * @param job Job.
+     * @param nodes Grid nodes.
+     * @return Job future.
+     */
+    public <R> IgniteFuture<R> affinityCall(@Nullable String cacheName, Object 
affKey, Callable<R> job,
+        @Nullable Collection<ClusterNode> nodes) {
+        enterBusy();
+
+        try {
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            // In case cache key is passed instead of affinity key.
+            final Object affKey0 = ctx.affinity().affinityKey(cacheName, 
affKey);
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T5<>(cacheName, affKey0, job), null, 
false);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param affKey Affinity key.
+     * @param job Job.
+     * @param nodes Grid nodes.
+     * @return Job future.
+     */
+    public IgniteFuture<?> affinityRun(@Nullable String cacheName, Object 
affKey, Runnable job,
+        @Nullable Collection<ClusterNode> nodes) {
+        enterBusy();
+
+        try {
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            // In case cache key is passed instead of affinity key.
+            final Object affKey0 = ctx.affinity().affinityKey(cacheName, 
affKey);
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T4(cacheName, affKey0, job), null, 
false);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param job Closure to execute.
+     * @param nodes Grid nodes.
+     * @param sys If {@code true}, then system pool will be used.
+     * @param <R> Type.
+     * @return Grid future for collection of closure results.
+     */
+    public <R> IgniteFuture<R> callAsyncNoFailover(GridClosureCallMode mode, 
@Nullable Callable<R> job,
+        @Nullable Collection<ClusterNode> nodes, boolean sys) {
+        assert mode != null;
+
+        enterBusy();
+
+        try {
+            if (job == null)
+                return new GridFinishedFuture<>(ctx);
+
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T7<>(mode, job), null, sys);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param jobs Closures to execute.
+     * @param nodes Grid nodes.
+     * @param sys If {@code true}, then system pool will be used.
+     * @param <R> Type.
+     * @return Grid future for collection of closure results.
+     */
+    public <R> IgniteFuture<Collection<R>> 
callAsyncNoFailover(GridClosureCallMode mode,
+        @Nullable Collection<? extends Callable<R>> jobs, @Nullable 
Collection<ClusterNode> nodes,
+        boolean sys) {
+        assert mode != null;
+
+        enterBusy();
+
+        try {
+            if (F.isEmpty(jobs))
+                return new GridFinishedFuture<>(ctx);
+
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T6<>(mode, jobs), null, sys);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param mode Distribution mode.
+     * @param job Closure to execute.
+     * @param nodes Grid nodes.
+     * @param sys If {@code true}, then system pool will be used.
+     * @param <R> Type.
+     * @return Grid future for collection of closure results.
+     */
+    public <R> IgniteFuture<R> callAsync(GridClosureCallMode mode,
+        @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes, 
boolean sys) {
+        assert mode != null;
+
+        enterBusy();
+
+        try {
+            if (job == null)
+                return new GridFinishedFuture<>(ctx);
+
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T7<>(mode, job), null, sys);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param job Job closure.
+     * @param arg Optional job argument.
+     * @param nodes Grid nodes.
+     * @return Grid future for execution result.
+     */
+    public <T, R> IgniteFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable 
T arg,
+        @Nullable Collection<ClusterNode> nodes) {
+        enterBusy();
+
+        try {
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T8<>(job, arg), null, false);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param job Job closure.
+     * @param arg Optional job argument.
+     * @param nodes Grid nodes.
+     * @return Grid future for execution result.
+     */
+    public <T, R> IgniteFuture<Collection<R>> broadcast(IgniteClosure<T, R> 
job, @Nullable T arg,
+        @Nullable Collection<ClusterNode> nodes) {
+        enterBusy();
+
+        try {
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T11<>(job, arg, nodes), null, false);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param job Job closure.
+     * @param arg Optional job argument.
+     * @param nodes Grid nodes.
+     * @return Grid future for execution result.
+     */
+    public <T, R> IgniteFuture<Collection<R>> 
broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg,
+        @Nullable Collection<ClusterNode> nodes) {
+        enterBusy();
+
+        try {
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
+
+            return ctx.task().execute(new T11<>(job, arg, nodes), null, false);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param job Job closure.
+     * @param args Job arguments.
+     * @param nodes Grid nodes.
+     * @return Grid future for execution result.
+     */
+    public <T, R> IgniteFuture<Collection<R>> callAsync(IgniteClosure<T, R> 
job, @Nullable Collection<? extends T> args,
+        @Nullable Collection<ClusterNode> nodes) {
+        enterBusy();
+
+        try {
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T9<>(job, args), null, false);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param job Job closure.
+     * @param args Job arguments.
+     * @param rdc Reducer.
+     * @param nodes Grid nodes.
+     * @return Grid future for execution result.
+     */
+    public <T, R1, R2> IgniteFuture<R2> callAsync(IgniteClosure<T, R1> job,
+        Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable 
Collection<ClusterNode> nodes) {
+        enterBusy();
+
+        try {
+            if (F.isEmpty(nodes))
+                return new GridFinishedFuture<>(ctx, 
U.emptyTopologyException());
+
+            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+
+            return ctx.task().execute(new T10<>(job, args, rdc), null, false);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Gets pool by execution policy.
+     *
+     * @param plc Whether to get system or public pool.
+     * @return Requested worker pool.
+     */
+    private Executor pool(GridClosurePolicy plc) {
+        switch (plc) {
+            case PUBLIC_POOL:
+                return pubPool;
+
+            case SYSTEM_POOL:
+                return sysPool;
+
+            case GGFS_POOL:
+                return ggfsPool;
+
+            default:
+                throw new IllegalArgumentException("Invalid closure execution 
policy: " + plc);
+        }
+    }
+
+    /**
+     * Gets pool name by execution policy.
+     *
+     * @param plc Policy to choose executor pool.
+     * @return Pool name.
+     */
+    private String poolName(GridClosurePolicy plc) {
+        switch (plc) {
+            case PUBLIC_POOL:
+                return "public";
+
+            case SYSTEM_POOL:
+                return "system";
+
+            case GGFS_POOL:
+                return "ggfs";
+
+            default:
+                throw new IllegalArgumentException("Invalid closure execution 
policy: " + plc);
+        }
+    }
+
+    /**
+     * @param c Closure to execute.
+     * @param sys If {@code true}, then system pool will be used, otherwise 
public pool will be used.
+     * @return Future.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    private IgniteFuture<?> runLocal(@Nullable final Runnable c, boolean sys) 
throws IgniteCheckedException {
+        return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : 
GridClosurePolicy.PUBLIC_POOL);
+    }
+
+    /**
+     * @param c Closure to execute.
+     * @param plc Whether to run on system or public pool.
+     * @return Future.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    private IgniteFuture<?> runLocal(@Nullable final Runnable c, 
GridClosurePolicy plc) throws IgniteCheckedException {
+        if (c == null)
+            return new GridFinishedFuture(ctx);
+
+        enterBusy();
+
+        try {
+            // Inject only if needed.
+            if (!(c instanceof GridPlainRunnable))
+                
ctx.resource().inject(ctx.deploy().getDeployment(c.getClass().getName()), 
c.getClass(), c);
+
+            final ClassLoader ldr = 
Thread.currentThread().getContextClassLoader();
+
+            final GridWorkerFuture fut = new GridWorkerFuture(ctx);
+
+            workersCnt.increment();
+
+            GridWorker w = new GridWorker(ctx.gridName(), 
"closure-proc-worker", log) {
+                @Override protected void body() {
+                    try {
+                        if (ldr != null)
+                            U.wrapThreadLoader(ldr, c);
+                        else
+                            c.run();
+
+                        fut.onDone();
+                    }
+                    catch (Throwable e) {
+                        if (e instanceof Error)
+                            U.error(log, "Closure execution failed with 
error.", e);
+
+                        fut.onDone(U.cast(e));
+                    }
+                    finally {
+                        workersCnt.decrement();
+                    }
+                }
+            };
+
+            fut.setWorker(w);
+
+            try {
+                pool(plc).execute(w);
+            }
+            catch (RejectedExecutionException e) {
+                U.error(log, "Failed to execute worker due to execution 
rejection " +
+                    "(increase upper bound on " + poolName(plc) + " executor 
service).", e);
+
+                w.run();
+            }
+
+            return fut;
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Executes closure on system pool. Companion to {@link 
#runLocal(Runnable, boolean)} but
+     * in case of rejected execution re-runs the closure in the current thread 
(blocking).
+     *
+     * @param c Closure to execute.
+     * @return Future.
+     */
+    public IgniteFuture<?> runLocalSafe(Runnable c) {
+        return runLocalSafe(c, true);
+    }
+
+    /**
+     * Companion to {@link #runLocal(Runnable, boolean)} but in case of 
rejected execution re-runs
+     * the closure in the current thread (blocking).
+     *
+     * @param c Closure to execute.
+     * @param sys If {@code true}, then system pool will be used, otherwise 
public pool will be used.
+     * @return Future.
+     */
+    public IgniteFuture<?> runLocalSafe(Runnable c, boolean sys) {
+        return runLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : 
GridClosurePolicy.PUBLIC_POOL);
+    }
+
+    /**
+     * Companion to {@link #runLocal(Runnable, boolean)} but in case of 
rejected execution re-runs
+     * the closure in the current thread (blocking).
+     *
+     * @param c Closure to execute.
+     * @param plc Policy to choose executor pool.
+     * @return Future.
+     */
+    public IgniteFuture<?> runLocalSafe(Runnable c, GridClosurePolicy plc) {
+        try {
+            return runLocal(c, plc);
+        }
+        catch (Throwable e) {
+            if (e instanceof Error)
+                U.error(log, "Closure execution failed with error.", e);
+
+            // If execution was rejected - rerun locally.
+            if (e.getCause() instanceof RejectedExecutionException) {
+                U.warn(log, "Closure execution has been rejected (will execute 
in the same thread) [plc=" + plc +
+                    ", closure=" + c + ']');
+
+                try {
+                    c.run();
+
+                    return new GridFinishedFuture(ctx);
+                }
+                catch (Throwable t) {
+                    if (t instanceof Error)
+                        U.error(log, "Closure execution failed with error.", 
t);
+
+                    return new GridFinishedFuture(ctx, U.cast(t));
+                }
+            }
+            // If failed for other reasons - return error future.
+            else
+                return new GridFinishedFuture(ctx, U.cast(e));
+        }
+    }
+
+    /**
+     * @param c Closure to execute.
+     * @param sys If {@code true}, then system pool will be used, otherwise 
public pool will be used.
+     * @return Future.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    private <R> IgniteFuture<R> callLocal(@Nullable final Callable<R> c, 
boolean sys) throws IgniteCheckedException {
+        return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : 
GridClosurePolicy.PUBLIC_POOL);
+    }
+
+    /**
+     * @param c Closure to execute.
+     * @param plc Whether to run on system or public pool.
+     * @param <R> Type of closure return value.
+     * @return Future.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    private <R> IgniteFuture<R> callLocal(@Nullable final Callable<R> c, 
GridClosurePolicy plc) throws IgniteCheckedException {
+        if (c == null)
+            return new GridFinishedFuture<>(ctx);
+
+        enterBusy();
+
+        try {
+            // Inject only if needed.
+            if (!(c instanceof GridPlainCallable))
+                
ctx.resource().inject(ctx.deploy().getDeployment(c.getClass().getName()), 
c.getClass(), c);
+
+            final ClassLoader ldr = 
Thread.currentThread().getContextClassLoader();
+
+            final GridWorkerFuture<R> fut = new GridWorkerFuture<>(ctx);
+
+            workersCnt.increment();
+
+            GridWorker w = new GridWorker(ctx.gridName(), 
"closure-proc-worker", log) {
+                @Override protected void body() {
+                    try {
+                        if (ldr != null)
+                            fut.onDone(U.wrapThreadLoader(ldr, c));
+                        else
+                            fut.onDone(c.call());
+                    }
+                    catch (Throwable e) {
+                        if (e instanceof Error)
+                            U.error(log, "Closure execution failed with 
error.", e);
+
+                        fut.onDone(U.cast(e));
+                    }
+                    finally {
+                        workersCnt.decrement();
+                    }
+                }
+            };
+
+            fut.setWorker(w);
+
+            try {
+                pool(plc).execute(w);
+            }
+            catch (RejectedExecutionException e) {
+                U.error(log, "Failed to execute worker due to execution 
rejection " +
+                    "(increase upper bound on " + poolName(plc) + " executor 
service).", e);
+
+                w.run();
+            }
+
+            return fut;
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Executes closure on system pool. Companion to {@link 
#callLocal(Callable, boolean)}
+     * but in case of rejected execution re-runs the closure in the current 
thread (blocking).
+     *
+     * @param c Closure to execute.
+     * @return Future.
+     */
+    public <R> IgniteFuture<R> callLocalSafe(Callable<R> c) {
+        return callLocalSafe(c, true);
+    }
+
+    /**
+     * Executes closure on system pool. Companion to {@link 
#callLocal(Callable, boolean)}
+     * but in case of rejected execution re-runs the closure in the current 
thread (blocking).
+     *
+     * @param c Closure to execute.
+     * @param sys If {@code true}, then system pool will be used, otherwise 
public pool will be used.
+     * @return Future.
+     */
+    public <R> IgniteFuture<R> callLocalSafe(Callable<R> c, boolean sys) {
+        return callLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : 
GridClosurePolicy.PUBLIC_POOL);
+    }
+
+    /**
+     * Companion to {@link #callLocal(Callable, boolean)} but in case of 
rejected execution re-runs
+     * the closure in the current thread (blocking).
+     *
+     * @param c Closure to execute.
+     * @param plc Policy to choose executor pool.
+     * @return Future.
+     */
+    public <R> IgniteFuture<R> callLocalSafe(Callable<R> c, GridClosurePolicy 
plc) {
+        try {
+            return callLocal(c, plc);
+        }
+        catch (IgniteCheckedException e) {
+            // If execution was rejected - rerun locally.
+            if (e.getCause() instanceof RejectedExecutionException) {
+                U.warn(log, "Closure execution has been rejected (will execute 
in the same thread) [plc=" + plc +
+                    ", closure=" + c + ']');
+
+                try {
+                    return new GridFinishedFuture<>(ctx, c.call());
+                }
+                // If failed again locally - return error future.
+                catch (Exception e2) {
+                    return new GridFinishedFuture<>(ctx, U.cast(e2));
+                }
+            }
+            // If failed for other reasons - return error future.
+            else
+                return new GridFinishedFuture<>(ctx, U.cast(e));
+        }
+    }
+
+    /**
+     * Converts given closure with arguments to grid job.
+     * @param job Job.
+     * @param arg Optional argument.
+     * @return Job.
+     */
+    @SuppressWarnings("IfMayBeConditional")
+    private <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable 
final T arg) {
+        A.notNull(job, "job");
+
+        if (job instanceof ComputeJobMasterLeaveAware) {
+            return new GridMasterLeaveAwareComputeJobAdapter() {
+                @Nullable @Override public Object execute() {
+                    return job.apply(arg);
+                }
+
+                @Override public void onMasterNodeLeft(ComputeTaskSession ses) 
throws IgniteCheckedException {
+                    ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses);
+                }
+            };
+        }
+        else {
+            return new ComputeJobAdapter() {
+                @Nullable @Override public Object execute() {
+                    return job.apply(arg);
+                }
+            };
+        }
+    }
+
+    /**
+     * Converts given closure to a grid job.
+     *
+     * @param c Closure to convert to grid job.
+     * @return Grid job made out of closure.
+     */
+    @SuppressWarnings("IfMayBeConditional")
+    private ComputeJob job(final Callable<?> c) {
+        A.notNull(c, "job");
+
+        if (c instanceof ComputeJobMasterLeaveAware) {
+            return new GridMasterLeaveAwareComputeJobAdapter() {
+                @Override public Object execute() {
+                    try {
+                        return c.call();
+                    }
+                    catch (Exception e) {
+                        throw new IgniteException(e);
+                    }
+                }
+
+                @Override public void onMasterNodeLeft(ComputeTaskSession ses) 
throws IgniteCheckedException {
+                    ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses);
+                }
+            };
+        }
+        else {
+            return new ComputeJobAdapter() {
+                @Override public Object execute() {
+                    try {
+                        return c.call();
+                    }
+                    catch (Exception e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            };
+        }
+    }
+
+    /**
+     * Converts given closure to a grid job.
+     *
+     * @param c Closure to convert to grid job.
+     * @param cacheName Cache name.
+     * @param affKey Affinity key.
+     * @return Grid job made out of closure.
+     */
+    @SuppressWarnings(value = {"IfMayBeConditional", "UnusedDeclaration"})
+    private ComputeJob job(final Callable<?> c, @Nullable final String 
cacheName, final Object affKey) {
+        A.notNull(c, "job");
+
+        if (c instanceof ComputeJobMasterLeaveAware) {
+            return new GridMasterLeaveAwareComputeJobAdapter() {
+                /** */
+                @GridCacheName
+                private final String cn = cacheName;
+
+                /** */
+                @GridCacheAffinityKeyMapped
+                private final Object ak = affKey;
+
+                @Override public Object execute() {
+                    try {
+                        return c.call();
+                    }
+                    catch (Exception e) {
+                        throw new IgniteException(e);
+                    }
+                }
+
+                @Override public void onMasterNodeLeft(ComputeTaskSession ses) 
throws IgniteCheckedException {
+                    ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses);
+                }
+            };
+        }
+        else {
+            return new ComputeJobAdapter() {
+                /** */
+                @GridCacheName
+                private final String cn = cacheName;
+
+                /** */
+                @GridCacheAffinityKeyMapped
+                private final Object ak = affKey;
+
+                @Override public Object execute() {
+                    try {
+                        return c.call();
+                    }
+                    catch (Exception e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            };
+        }
+    }
+
+    /**
+     * Converts given closure to a grid job.
+     *
+     * @param r Closure to convert to grid job.
+     * @return Grid job made out of closure.
+     */
+    @SuppressWarnings("IfMayBeConditional")
+    private static ComputeJob job(final Runnable r) {
+        A.notNull(r, "job");
+
+        if (r instanceof ComputeJobMasterLeaveAware) {
+            return new GridMasterLeaveAwareComputeJobAdapter() {
+                @Nullable @Override public Object execute() {
+                    r.run();
+
+                    return null;
+                }
+
+                @Override public void onMasterNodeLeft(ComputeTaskSession ses) 
throws IgniteCheckedException {
+                    ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses);
+                }
+            };
+        }
+        else {
+            return new ComputeJobAdapter() {
+                @Nullable @Override public Object execute() {
+                    r.run();
+
+                    return null;
+                }
+            };
+        }
+    }
+
+    /**
+     * Converts given closure to a grid job.
+     *
+     * @param r Closure to convert to grid job.
+     * @param cacheName Cache name.
+     * @param affKey Affinity key.
+     * @return Grid job made out of closure.
+     */
+    @SuppressWarnings(value = {"IfMayBeConditional", "UnusedDeclaration"})
+    private ComputeJob job(final Runnable r, @Nullable final String cacheName, 
final Object affKey) {
+        A.notNull(r, "job");
+
+        if (r instanceof ComputeJobMasterLeaveAware) {
+            return new GridMasterLeaveAwareComputeJobAdapter() {
+                /** */
+                @GridCacheName
+                private final String cn = cacheName;
+
+                /** */
+                @GridCacheAffinityKeyMapped
+                private final Object ak = affKey;
+
+                @Nullable @Override public Object execute() {
+                    r.run();
+
+                    return null;
+                }
+
+                @Override public void onMasterNodeLeft(ComputeTaskSession ses) 
throws IgniteCheckedException {
+                    ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses);
+                }
+            };
+        }
+        else {
+            return new ComputeJobAdapter() {
+                /** */
+                @GridCacheName
+                private final String cn = cacheName;
+
+                /** */
+                @GridCacheAffinityKeyMapped
+                private final Object ak = affKey;
+
+                @Nullable @Override public Object execute() {
+                    r.run();
+
+                    return null;
+                }
+            };
+        }
+    }
+
+    /** */
+    private class JobMapper {
+        /** */
+        private final Map<ComputeJob, ClusterNode> map;
+
+        /** */
+        private boolean hadLocNode;
+
+        /**
+         * @param map Jobs map.
+         */
+        private JobMapper(Map<ComputeJob, ClusterNode> map) {
+            assert map != null;
+            assert map.isEmpty();
+
+            this.map = map;
+        }
+
+        /**
+         * @param job Job.
+         * @param node Node.
+         * @throws IgniteCheckedException In case of error.
+         */
+        public void map(ComputeJob job, ClusterNode node) throws 
IgniteCheckedException {
+            assert job != null;
+            assert node != null;
+
+            if (ctx.localNodeId().equals(node.id())) {
+                if (hadLocNode) {
+                    IgniteMarshaller marsh = ctx.config().getMarshaller();
+
+                    job = marsh.unmarshal(marsh.marshal(job), null);
+                }
+                else
+                    hadLocNode = true;
+            }
+
+            map.put(job, node);
+        }
+    }
+
+    /**
+     * No-reduce task adapter.
+     */
+    private abstract static class TaskNoReduceAdapter<T> extends 
GridPeerDeployAwareTaskAdapter<T, Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param pda Peer deploy aware instance.
+         */
+        protected TaskNoReduceAdapter(@Nullable GridPeerDeployAware pda) {
+            super(pda);
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Void reduce(List<ComputeJobResult> results) 
throws IgniteCheckedException {
+            return null;
+        }
+    }
+
+    /**
+     * Task that is free of dragged in enclosing context for the method
+     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, 
Collection)}.
+     */
+    private class T1 extends TaskNoReduceAdapter<Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /** */
+        private IgniteBiTuple<GridClosureCallMode, Collection<? extends 
Runnable>> t;
+
+        /**
+         * @param mode Call mode.
+         * @param jobs Collection of jobs.
+         */
+        private T1(GridClosureCallMode mode, Collection<? extends Runnable> 
jobs) {
+            super(U.peerDeployAware0(jobs));
+
+            t = F.<
+                GridClosureCallMode,
+                Collection<? extends Runnable>
+                >t(mode, jobs);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            return absMap(t.get1(), t.get2(), subgrid, lb);
+        }
+    }
+
+    /**
+     * Task that is free of dragged in enclosing context for the method
+     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, 
Collection)}.
+     */
+    private class T2 extends TaskNoReduceAdapter<Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /** */
+        private IgniteBiTuple<GridClosureCallMode, Runnable> t;
+
+        /**
+         * @param mode Call mode.
+         * @param job Job.
+         */
+        private T2(GridClosureCallMode mode, Runnable job) {
+            super(U.peerDeployAware(job));
+
+            t = F.t(mode, job);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            return absMap(t.get1(), F.asList(t.get2()), subgrid, lb);
+        }
+    }
+
+    /**
+     * Task that is free of dragged in enclosing context for the method
+     * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, 
Collection, org.apache.ignite.lang.IgniteReducer, Collection)}
+     */
+    private class T3<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /** */
+        private GridTuple3<GridClosureCallMode,
+            Collection<? extends Callable<R1>>,
+            IgniteReducer<R1, R2>
+            > t;
+
+        /**
+         *
+         * @param mode Call mode.
+         * @param jobs Collection of jobs.
+         * @param rdc Reducer.
+         */
+        private T3(GridClosureCallMode mode, Collection<? extends 
Callable<R1>> jobs, IgniteReducer<R1, R2> rdc) {
+            super(U.peerDeployAware0(jobs));
+
+            t = F.<
+                GridClosureCallMode,
+                Collection<? extends Callable<R1>>,
+                IgniteReducer<R1, R2>
+                >t(mode, jobs, rdc);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            return outMap(t.get1(), t.get2(), subgrid, lb);
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd)
+            throws IgniteCheckedException {
+            ComputeJobResultPolicy resPlc = super.result(res, rcvd);
+
+            if (res.getException() == null && resPlc != FAILOVER && 
!t.get3().collect((R1)res.getData()))
+                resPlc = REDUCE; // If reducer returned false - reduce right 
away.
+
+            return resPlc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public R2 reduce(List<ComputeJobResult> res) {
+            return t.get3().reduce();
+        }
+    }
+
+    /**
+     */
+    private class T4 extends TaskNoReduceAdapter<Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final String cacheName;
+
+        /** */
+        private Object affKey;
+
+        /** */
+        private Runnable job;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /**
+         * @param cacheName Cache name.
+         * @param affKey Affinity key.
+         * @param job Job.
+         */
+        private T4(@Nullable String cacheName, Object affKey, Runnable job) {
+            super(U.peerDeployAware0(job));
+
+            this.cacheName = cacheName;
+            this.affKey = affKey;
+            this.job = job;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            ComputeJob job = job(this.job, cacheName, affKey);
+
+            return Collections.singletonMap(job, lb.getBalancedNode(job, 
null));
+        }
+    }
+
+    /**
+     */
+    private class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final String cacheName;
+
+        /** */
+        private Object affKey;
+
+        /** */
+        private Callable<R> job;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /**
+         * @param cacheName Cache name.
+         * @param affKey Affinity key.
+         * @param job Job.
+         */
+        private T5(@Nullable String cacheName, Object affKey, Callable<R> job) 
{
+            super(U.peerDeployAware0(job));
+
+            this.cacheName = cacheName;
+            this.affKey = affKey;
+            this.job = job;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            ComputeJob job = job(this.job, cacheName, affKey);
+
+            return Collections.singletonMap(job, lb.getBalancedNode(job, 
null));
+        }
+
+        /** {@inheritDoc} */
+        @Override public R reduce(List<ComputeJobResult> res) throws 
IgniteCheckedException {
+            for (ComputeJobResult r : res) {
+                if (r.getException() == null)
+                    return r.getData();
+            }
+
+            throw new IgniteCheckedException("Failed to find successful job 
result: " + res);
+        }
+    }
+
+    /**
+     * Task that is free of dragged in enclosing context for the method
+     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, 
Collection)}
+     */
+    private class T6<R> extends GridPeerDeployAwareTaskAdapter<Void, 
Collection<R>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final GridClosureCallMode mode;
+
+        /** */
+        private final Collection<? extends Callable<R>> jobs;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /**
+         *
+         * @param mode Call mode.
+         * @param jobs Collection of jobs.
+         */
+        private T6(
+            GridClosureCallMode mode,
+            Collection<? extends Callable<R>> jobs) {
+            super(U.peerDeployAware0(jobs));
+
+            this.mode = mode;
+            this.jobs = jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            return outMap(mode, jobs, subgrid, lb);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<R> reduce(List<ComputeJobResult> res) {
+            return F.jobResults(res);
+        }
+    }
+
+    /**
+     * Task that is free of dragged in enclosing context for the method
+     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, 
Collection)}
+     */
+    private class T7<R> extends GridPeerDeployAwareTaskAdapter<Void, R> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private IgniteBiTuple<GridClosureCallMode, Callable<R>> t;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /**
+         * @param mode Call mode.
+         * @param job Job.
+         */
+        private T7(GridClosureCallMode mode, Callable<R> job) {
+            super(U.peerDeployAware(job));
+
+            t = F.t(mode, job);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            return outMap(t.get1(), F.asList(t.get2()), subgrid, lb);
+        }
+
+        /** {@inheritDoc} */
+        @Override public R reduce(List<ComputeJobResult> res) throws 
IgniteCheckedException {
+            for (ComputeJobResult r : res)
+                if (r.getException() == null)
+                    return r.getData();
+
+            throw new IgniteCheckedException("Failed to find successful job 
result: " + res);
+        }
+    }
+
+    /**
+     */
+    private class T8<T, R> extends GridPeerDeployAwareTaskAdapter<Void, R> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private IgniteClosure<T, R> job;
+
+        /** */
+        private T arg;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /**
+         * @param job Job.
+         * @param arg Optional job argument.
+         */
+        private T8(IgniteClosure<T, R> job, @Nullable T arg) {
+            super(U.peerDeployAware(job));
+
+            this.job = job;
+            this.arg = arg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            ComputeJob job = job(this.job, this.arg);
+
+            return Collections.singletonMap(job, lb.getBalancedNode(job, 
null));
+        }
+
+        /** {@inheritDoc} */
+        @Override public R reduce(List<ComputeJobResult> res) throws 
IgniteCheckedException {
+            for (ComputeJobResult r : res)
+                if (r.getException() == null)
+                    return r.getData();
+
+            throw new IgniteCheckedException("Failed to find successful job 
result: " + res);
+        }
+    }
+
+    /**
+     */
+    private class T9<T, R> extends GridPeerDeployAwareTaskAdapter<Void, 
Collection<R>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private IgniteClosure<T, R> job;
+
+        /** */
+        private Collection<? extends T> args;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /**
+         * @param job Job.
+         * @param args Job arguments.
+         */
+        private T9(IgniteClosure<T, R> job, Collection<? extends T> args) {
+            super(U.peerDeployAware(job));
+
+            this.job = job;
+            this.args = args;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(args.size(), 1);
+
+            JobMapper mapper = new JobMapper(map);
+
+            for (T jobArg : args) {
+                ComputeJob job = job(this.job, jobArg);
+
+                mapper.map(job, lb.getBalancedNode(job, null));
+            }
+
+            return map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<R> reduce(List<ComputeJobResult> res) 
throws IgniteCheckedException {
+            return F.jobResults(res);
+        }
+    }
+
+    /**
+     */
+    private class T10<T, R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, 
R2> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private IgniteClosure<T, R1> job;
+
+        /** */
+        private Collection<? extends T> args;
+
+        /** */
+        private IgniteReducer<R1, R2> rdc;
+
+        /** */
+        @IgniteLoadBalancerResource
+        private ComputeLoadBalancer lb;
+
+        /**
+         * @param job Job.
+         * @param args Job arguments.
+         * @param rdc Reducer.
+         */
+        private T10(IgniteClosure<T, R1> job, Collection<? extends T> args, 
IgniteReducer<R1, R2> rdc) {
+            super(U.peerDeployAware(job));
+
+            this.job = job;
+            this.args = args;
+            this.rdc = rdc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(args.size(), 1);
+
+            JobMapper mapper = new JobMapper(map);
+
+            for (T jobArg : args) {
+                ComputeJob job = job(this.job, jobArg);
+
+                mapper.map(job, lb.getBalancedNode(job, null));
+            }
+
+            return map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd)
+            throws IgniteCheckedException {
+            ComputeJobResultPolicy resPlc = super.result(res, rcvd);
+
+            if (res.getException() == null && resPlc != FAILOVER && 
!rdc.collect((R1) res.getData()))
+                resPlc = REDUCE; // If reducer returned false - reduce right 
away.
+
+            return resPlc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public R2 reduce(List<ComputeJobResult> res) throws 
IgniteCheckedException {
+            return rdc.reduce();
+        }
+    }
+
+    /**
+     */
+    private class T11<T, R> extends GridPeerDeployAwareTaskAdapter<Void, 
Collection<R>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteClosure<T, R> job;
+
+        /** */
+        private final T arg;
+
+        /**
+         * @param job Job.
+         * @param arg Job argument.
+         * @param nodes Collection of nodes.
+         */
+        private T11(IgniteClosure<T, R> job, @Nullable T arg, 
Collection<ClusterNode> nodes) {
+            super(U.peerDeployAware(job));
+
+            this.job = job;
+            this.arg = arg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Void arg)
+            throws IgniteCheckedException {
+            if (F.isEmpty(subgrid))
+                return Collections.emptyMap();
+
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(subgrid.size(), 
1);
+
+            JobMapper mapper = new JobMapper(map);
+
+            for (ClusterNode n : subgrid)
+                mapper.map(job(job, this.arg), n);
+
+            return map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<R> reduce(List<ComputeJobResult> res) {
+            return F.jobResults(res);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java
new file mode 100644
index 0000000..e28784e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import org.apache.ignite.compute.*;
+
+/**
+ * Job adapter implementing {@link 
org.apache.ignite.compute.ComputeJobMasterLeaveAware}.
+ */
+public abstract class GridMasterLeaveAwareComputeJobAdapter extends 
ComputeJobAdapter
+    implements ComputeJobMasterLeaveAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * No-arg constructor.
+     */
+    protected GridMasterLeaveAwareComputeJobAdapter() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridPeerDeployAwareTaskAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridPeerDeployAwareTaskAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridPeerDeployAwareTaskAdapter.java
new file mode 100644
index 0000000..79768a7
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridPeerDeployAwareTaskAdapter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Peer deployment aware task adapter.
+ */
+public abstract class GridPeerDeployAwareTaskAdapter<T, R> extends 
ComputeTaskAdapter<T, R>
+    implements GridPeerDeployAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Peer deploy aware class. */
+    private transient GridPeerDeployAware pda;
+
+    /**
+     * Constructor that receives deployment information for task.
+     *
+     * @param pda Deployment information.
+     */
+    protected GridPeerDeployAwareTaskAdapter(@Nullable GridPeerDeployAware 
pda) {
+        this.pda = pda;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Class<?> deployClass() {
+        if (pda == null)
+            pda = U.detectPeerDeployAware(this);
+
+        return pda.deployClass();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClassLoader classLoader() {
+        if (pda == null)
+            pda = U.detectPeerDeployAware(this);
+
+        return pda.classLoader();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/package.html
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/package.html
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/package.html
new file mode 100644
index 0000000..cbdb317
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/package.html
@@ -0,0 +1,23 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 
"http://www.w3.org/TR/html4/loose.dtd";>
+<html>
+<body>
+    <!-- Package description. -->
+    TODO.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
new file mode 100644
index 0000000..17c7a0a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Continuous routine handler.
+ */
+public interface GridContinuousHandler extends Externalizable {
+    /**
+     * Registers listener.
+     *
+     * @param nodeId ID of the node that started routine.
+     * @param routineId Routine ID.
+     * @param ctx Kernal context.
+     * @return Whether listener was actually registered.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public boolean register(UUID nodeId, UUID routineId, GridKernalContext 
ctx) throws IgniteCheckedException;
+
+    /**
+     * Callback called after listener is registered and acknowledgement is 
sent.
+     *
+     * @param routineId Routine ID.
+     * @param ctx Kernal context.
+     */
+    public void onListenerRegistered(UUID routineId, GridKernalContext ctx);
+
+    /**
+     * Unregisters listener.
+     *
+     * @param routineId Routine ID.
+     * @param ctx Kernal context.
+     */
+    public void unregister(UUID routineId, GridKernalContext ctx);
+
+    /**
+     * Notifies local callback.
+     *
+     * @param nodeId ID of the node where notification came from.
+     * @param routineId Routine ID.
+     * @param objs Notification objects.
+     * @param ctx Kernal context.
+     */
+    public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> 
objs, GridKernalContext ctx);
+
+    /**
+     * Deploys and marshals inner objects (called only if peer deployment is 
enabled).
+     *
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void p2pMarshal(GridKernalContext ctx) throws 
IgniteCheckedException;
+
+    /**
+     * Unmarshals inner objects (called only if peer deployment is enabled).
+     *
+     * @param nodeId Sender node ID.
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws 
IgniteCheckedException;
+
+    /**
+     * @return Topic for ordered notifications. If {@code null}, notifications
+     * will be sent in non-ordered messages.
+     */
+    @Nullable public Object orderedTopic();
+
+    /**
+     * @return {@code True} if for events.
+     */
+    public boolean isForEvents();
+
+    /**
+     * @return {@code True} if for messaging.
+     */
+    public boolean isForMessaging();
+
+    /**
+     * @return {@code True} if for continuous queries.
+     */
+    public boolean isForQuery();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
new file mode 100644
index 0000000..1c63a8a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+import static 
org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.*;
+
+/**
+ * Continuous processor message.
+ */
+public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message type. */
+    private GridContinuousMessageType type;
+
+    /** Routine ID. */
+    private UUID routineId;
+
+    /** Optional message data. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Object data;
+
+    /** Serialized message data. */
+    private byte[] dataBytes;
+
+    /** Future ID for synchronous event notifications. */
+    private IgniteUuid futId;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public GridContinuousMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param type Message type.
+     * @param routineId Consume ID.
+     * @param futId Future ID.
+     * @param data Optional message data.
+     */
+    GridContinuousMessage(GridContinuousMessageType type,
+        @Nullable UUID routineId,
+        @Nullable IgniteUuid futId,
+        @Nullable Object data) {
+        assert type != null;
+        assert routineId != null || type == MSG_EVT_ACK;
+
+        this.type = type;
+        this.routineId = routineId;
+        this.futId = futId;
+        this.data = data;
+    }
+
+    /**
+     * @return Message type.
+     */
+    public GridContinuousMessageType type() {
+        return type;
+    }
+
+    /**
+     * @return Consume ID.
+     */
+    public UUID routineId() {
+        return routineId;
+    }
+
+    /**
+     * @return Message data.
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T data() {
+        return (T)data;
+    }
+
+    /**
+     * @param data Message data.
+     */
+    public void data(Object data) {
+        this.data = data;
+    }
+
+    /**
+     * @return Serialized message data.
+     */
+    public byte[] dataBytes() {
+        return dataBytes;
+    }
+
+    /**
+     * @param dataBytes Serialized message data.
+     */
+    public void dataBytes(byte[] dataBytes) {
+        this.dataBytes = dataBytes;
+    }
+
+    /**
+     * @return Future ID for synchronous event notification.
+     */
+    @Nullable public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridContinuousMessage _clone = new GridContinuousMessage();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) {
+        GridContinuousMessage clone = (GridContinuousMessage)msg;
+
+        clone.type = type;
+        clone.routineId = routineId;
+        clone.futId = futId;
+        clone.data = data;
+        clone.dataBytes = dataBytes;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("fallthrough")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 0:
+                if (!commState.putByteArray(dataBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 1:
+                if (!commState.putGridUuid(futId))
+                    return false;
+
+                commState.idx++;
+
+            case 2:
+                if (!commState.putUuid(routineId))
+                    return false;
+
+                commState.idx++;
+
+            case 3:
+                if (!commState.putEnum(type))
+                    return false;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("fallthrough")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        switch (commState.idx) {
+            case 0:
+                byte[] dataBytes0 = commState.getByteArray();
+
+                if (dataBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                dataBytes = dataBytes0;
+
+                commState.idx++;
+
+            case 1:
+                IgniteUuid futId0 = commState.getGridUuid();
+
+                if (futId0 == GRID_UUID_NOT_READ)
+                    return false;
+
+                futId = futId0;
+
+                commState.idx++;
+
+            case 2:
+                UUID routineId0 = commState.getUuid();
+
+                if (routineId0 == UUID_NOT_READ)
+                    return false;
+
+                routineId = routineId0;
+
+                commState.idx++;
+
+            case 3:
+                if (buf.remaining() < 1)
+                    return false;
+
+                byte type0 = commState.getByte();
+
+                type = fromOrdinal(type0);
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 60;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridContinuousMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
new file mode 100644
index 0000000..eb33613
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Continuous processor message types.
+ */
+enum GridContinuousMessageType {
+    /** Consume start request. */
+    MSG_START_REQ,
+
+    /** Consume start acknowledgement. */
+    MSG_START_ACK,
+
+    /** Consume stop request. */
+    MSG_STOP_REQ,
+
+    /** Consume stop acknowledgement. */
+    MSG_STOP_ACK,
+
+    /** Remote event notification. */
+    MSG_EVT_NOTIFICATION,
+
+    /** Event notification acknowledgement for synchronous events. */
+    MSG_EVT_ACK;
+
+    /** Enumerated values. */
+    private static final GridContinuousMessageType[] VALS = values();
+
+    /**
+     * Efficiently gets enumerated value from its ordinal.
+     *
+     * @param ord Ordinal value.
+     * @return Enumerated value.
+     */
+    @Nullable public static GridContinuousMessageType fromOrdinal(byte ord) {
+        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+    }
+}

Reply via email to