http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
new file mode 100644
index 0000000..5ede18e
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import 
org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
+import 
org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+
+/**
+ * Task executor.
+ */
+public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
+    /** Job tracker. */
+    private HadoopJobTracker jobTracker;
+
+    /** */
+    private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> 
jobs = new ConcurrentHashMap<>();
+
+    /** Executor service to run tasks. */
+    private HadoopExecutorService exec;
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        jobTracker = ctx.jobTracker();
+
+        exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(),
+            ctx.configuration().getMaxParallelTasks(), 
ctx.configuration().getMaxTaskQueueSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (exec != null) {
+            exec.shutdown(3000);
+
+            if (cancel) {
+                for (HadoopJobId jobId : jobs.keySet())
+                    cancelTasks(jobId);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        if (exec != null && !exec.shutdown(30000))
+            U.warn(log, "Failed to finish running tasks in 30 sec.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> 
tasks) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Submitting tasks for local execution [locNodeId=" + 
ctx.localNodeId() +
+                ", tasksCnt=" + tasks.size() + ']');
+
+        Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id());
+
+        if (executedTasks == null) {
+            executedTasks = new GridConcurrentHashSet<>();
+
+            Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), 
executedTasks);
+
+            assert extractedCol == null;
+        }
+
+        final Collection<HadoopRunnableTask> finalExecutedTasks = 
executedTasks;
+
+        for (final HadoopTaskInfo info : tasks) {
+            assert info != null;
+
+            HadoopRunnableTask task = new HadoopRunnableTask(log, job, 
ctx.shuffle().memory(), info,
+                ctx.localNodeId()) {
+                @Override protected void onTaskFinished(HadoopTaskStatus 
status) {
+                    if (log.isDebugEnabled())
+                        log.debug("Finished task execution [jobId=" + job.id() 
+ ", taskInfo=" + info + ", " +
+                            "waitTime=" + waitTime() + ", execTime=" + 
executionTime() + ']');
+
+                    finalExecutedTasks.remove(this);
+
+                    jobTracker.onTaskFinished(info, status);
+                }
+
+                @Override protected HadoopTaskInput 
createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+                    return ctx.shuffle().input(taskCtx);
+                }
+
+                @Override protected HadoopTaskOutput 
createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+                    return ctx.shuffle().output(taskCtx);
+                }
+            };
+
+            executedTasks.add(task);
+
+            exec.submit(task);
+        }
+    }
+
+    /**
+     * Cancels all currently running tasks for given job ID and cancels 
scheduled execution of tasks
+     * for this job ID.
+     * <p>
+     * It is guaranteed that this method will not be called concurrently with
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, 
Collection)} method. No more job submissions will be performed via
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, 
Collection)} method for given job ID after this method is called.
+     *
+     * @param jobId Job ID to cancel.
+     */
+    @Override public void cancelTasks(HadoopJobId jobId) {
+        Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId);
+
+        if (executedTasks != null) {
+            for (HadoopRunnableTask task : executedTasks)
+                task.cancel();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onJobStateChanged(HadoopJobMetadata meta) throws 
IgniteCheckedException {
+        if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
+            Collection<HadoopRunnableTask> executedTasks = 
jobs.remove(meta.jobId());
+
+            assert executedTasks == null || executedTasks.isEmpty();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
new file mode 100644
index 0000000..993ecc9
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.internal.util.worker.GridWorkerListener;
+import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter;
+import org.apache.ignite.thread.IgniteThread;
+import org.jsr166.ConcurrentHashMap8;
+
+import static java.util.Collections.newSetFromMap;
+
+/**
+ * Executor service without thread pooling.
+ */
+public class HadoopExecutorService {
+    /** */
+    private final LinkedBlockingQueue<Callable<?>> queue;
+
+    /** */
+    private final Collection<GridWorker> workers = newSetFromMap(new 
ConcurrentHashMap8<GridWorker, Boolean>());
+
+    /** */
+    private final AtomicInteger active = new AtomicInteger();
+
+    /** */
+    private final int maxTasks;
+
+    /** */
+    private final String gridName;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private volatile boolean shutdown;
+
+    /** */
+    private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() {
+            @Override public void onStopped(GridWorker w) {
+                workers.remove(w);
+
+                if (shutdown) {
+                    active.decrementAndGet();
+
+                    return;
+                }
+
+                Callable<?> task = queue.poll();
+
+                if (task != null)
+                    startThread(task);
+                else {
+                    active.decrementAndGet();
+
+                    if (!queue.isEmpty())
+                        startFromQueue();
+                }
+            }
+        };
+
+    /**
+     * @param log Logger.
+     * @param gridName Grid name.
+     * @param maxTasks Max number of tasks.
+     * @param maxQueue Max queue length.
+     */
+    public HadoopExecutorService(IgniteLogger log, String gridName, int 
maxTasks, int maxQueue) {
+        assert maxTasks > 0 : maxTasks;
+        assert maxQueue > 0 : maxQueue;
+
+        this.maxTasks = maxTasks;
+        this.queue = new LinkedBlockingQueue<>(maxQueue);
+        this.gridName = gridName;
+        this.log = log.getLogger(HadoopExecutorService.class);
+    }
+
+    /**
+     * @return Number of active workers.
+     */
+    public int active() {
+        return workers.size();
+    }
+
+    /**
+     * Submit task.
+     *
+     * @param task Task.
+     */
+    public void submit(Callable<?> task) {
+        while (queue.isEmpty()) {
+            int active0 = active.get();
+
+            if (active0 == maxTasks)
+                break;
+
+            if (active.compareAndSet(active0, active0 + 1)) {
+                startThread(task);
+
+                return; // Started in new thread bypassing queue.
+            }
+        }
+
+        try {
+            while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) {
+                if (shutdown)
+                    return; // Rejected due to shutdown.
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            return;
+        }
+
+        startFromQueue();
+    }
+
+    /**
+     * Attempts to start task from queue.
+     */
+    private void startFromQueue() {
+        do {
+            int active0 = active.get();
+
+            if (active0 == maxTasks)
+                break;
+
+            if (active.compareAndSet(active0, active0 + 1)) {
+                Callable<?> task = queue.poll();
+
+                if (task == null) {
+                    int res = active.decrementAndGet();
+
+                    assert res >= 0 : res;
+
+                    break;
+                }
+
+                startThread(task);
+            }
+        }
+        while (!queue.isEmpty());
+    }
+
+    /**
+     * @param task Task.
+     */
+    private void startThread(final Callable<?> task) {
+        String workerName;
+
+        if (task instanceof HadoopRunnableTask) {
+            final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo();
+
+            workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + 
i.taskNumber() + "-" + i.attempt();
+        }
+        else
+            workerName = task.toString();
+
+        GridWorker w = new GridWorker(gridName, workerName, log, lsnr) {
+            @Override protected void body() {
+                try {
+                    task.call();
+                }
+                catch (Exception e) {
+                    log.error("Failed to execute task: " + task, e);
+                }
+            }
+        };
+
+        workers.add(w);
+
+        if (shutdown)
+            w.cancel();
+
+        new IgniteThread(w).start();
+    }
+
+    /**
+     * Shuts down this executor service.
+     *
+     * @param awaitTimeMillis Time in milliseconds to wait for tasks 
completion.
+     * @return {@code true} If all tasks completed.
+     */
+    public boolean shutdown(long awaitTimeMillis) {
+        shutdown = true;
+
+        for (GridWorker w : workers)
+            w.cancel();
+
+        while (awaitTimeMillis > 0 && !workers.isEmpty()) {
+            try {
+                Thread.sleep(100);
+
+                awaitTimeMillis -= 100;
+            }
+            catch (InterruptedException e) {
+                break;
+            }
+        }
+
+        return workers.isEmpty();
+    }
+
+    /**
+     * @return {@code true} If method {@linkplain #shutdown(long)} was already 
called.
+     */
+    public boolean isShutdown() {
+        return shutdown;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
new file mode 100644
index 0000000..a57efe6
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import 
org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import 
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMultimap;
+import 
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
+import 
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.COMBINER_HASHMAP_SIZE;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_COMBINER_NO_SORTING;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMBINE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
+
+/**
+ * Runnable task.
+ */
+public abstract class HadoopRunnableTask implements Callable<Void> {
+    /** */
+    private final GridUnsafeMemory mem;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final HadoopJob job;
+
+    /** Task to run. */
+    private final HadoopTaskInfo info;
+
+    /** Submit time. */
+    private final long submitTs = U.currentTimeMillis();
+
+    /** Execution start timestamp. */
+    private long execStartTs;
+
+    /** Execution end timestamp. */
+    private long execEndTs;
+
+    /** */
+    private HadoopMultimap combinerInput;
+
+    /** */
+    private volatile HadoopTaskContext ctx;
+
+    /** Set if task is to cancelling. */
+    private volatile boolean cancelled;
+
+    /** Node id. */
+    private UUID nodeId;
+
+    /**
+     * @param log Log.
+     * @param job Job.
+     * @param mem Memory.
+     * @param info Task info.
+     * @param nodeId Node id.
+     */
+    protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, 
GridUnsafeMemory mem, HadoopTaskInfo info,
+        UUID nodeId) {
+        this.nodeId = nodeId;
+        this.log = log.getLogger(HadoopRunnableTask.class);
+        this.job = job;
+        this.mem = mem;
+        this.info = info;
+    }
+
+    /**
+     * @return Wait time.
+     */
+    public long waitTime() {
+        return execStartTs - submitTs;
+    }
+
+    /**
+     * @return Execution time.
+     */
+    public long executionTime() {
+        return execEndTs - execStartTs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Void call() throws IgniteCheckedException {
+        ctx = job.getTaskContext(info);
+
+        return ctx.runAsJobOwner(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                call0();
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Implements actual task running.
+     * @throws IgniteCheckedException
+     */
+    void call0() throws IgniteCheckedException {
+        execStartTs = U.currentTimeMillis();
+
+        Throwable err = null;
+
+        HadoopTaskState state = HadoopTaskState.COMPLETED;
+
+        HadoopPerformanceCounter perfCntr = null;
+
+        try {
+            perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), 
nodeId);
+
+            perfCntr.onTaskSubmit(info, submitTs);
+            perfCntr.onTaskPrepare(info, execStartTs);
+
+            ctx.prepareTaskEnvironment();
+
+            runTask(perfCntr);
+
+            if (info.type() == MAP && job.info().hasCombiner()) {
+                ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), 
info.taskNumber(), info.attempt(), null));
+
+                try {
+                    runTask(perfCntr);
+                }
+                finally {
+                    ctx.taskInfo(info);
+                }
+            }
+        }
+        catch (HadoopTaskCancelledException ignored) {
+            state = HadoopTaskState.CANCELED;
+        }
+        catch (Throwable e) {
+            state = HadoopTaskState.FAILED;
+            err = e;
+
+            U.error(log, "Task execution failed.", e);
+
+            if (e instanceof Error)
+                throw e;
+        }
+        finally {
+            execEndTs = U.currentTimeMillis();
+
+            if (perfCntr != null)
+                perfCntr.onTaskFinish(info, execEndTs);
+
+            onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : 
ctx.counters()));
+
+            if (combinerInput != null)
+                combinerInput.close();
+
+            if (ctx != null)
+                ctx.cleanupTaskEnvironment();
+        }
+    }
+
+    /**
+     * @param perfCntr Performance counter.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void runTask(HadoopPerformanceCounter perfCntr) throws 
IgniteCheckedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+
+        try (HadoopTaskOutput out = createOutputInternal(ctx);
+             HadoopTaskInput in = createInputInternal(ctx)) {
+
+            ctx.input(in);
+            ctx.output(out);
+
+            perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
+
+            ctx.run();
+        }
+    }
+
+    /**
+     * Cancel the executed task.
+     */
+    public void cancel() {
+        cancelled = true;
+
+        if (ctx != null)
+            ctx.cancel();
+    }
+
+    /**
+     * @param status Task status.
+     */
+    protected abstract void onTaskFinished(HadoopTaskStatus status);
+
+    /**
+     * @param ctx Task context.
+     * @return Task input.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws 
IgniteCheckedException {
+        switch (ctx.taskInfo().type()) {
+            case SETUP:
+            case MAP:
+            case COMMIT:
+            case ABORT:
+                return null;
+
+            case COMBINE:
+                assert combinerInput != null;
+
+                return combinerInput.input(ctx);
+
+            default:
+                return createInput(ctx);
+        }
+    }
+
+    /**
+     * @param ctx Task context.
+     * @return Input.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) 
throws IgniteCheckedException;
+
+    /**
+     * @param ctx Task info.
+     * @return Output.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) 
throws IgniteCheckedException;
+
+    /**
+     * @param ctx Task info.
+     * @return Task output.
+     * @throws IgniteCheckedException If failed.
+     */
+    private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) 
throws IgniteCheckedException {
+        switch (ctx.taskInfo().type()) {
+            case SETUP:
+            case REDUCE:
+            case COMMIT:
+            case ABORT:
+                return null;
+
+            case MAP:
+                if (job.info().hasCombiner()) {
+                    assert combinerInput == null;
+
+                    combinerInput = get(job.info(), 
SHUFFLE_COMBINER_NO_SORTING, false) ?
+                        new HadoopHashMultimap(job.info(), mem, 
get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)):
+                        new HadoopSkipList(job.info(), mem); // TODO replace 
with red-black tree
+
+                    return combinerInput.startAdding(ctx);
+                }
+
+            default:
+                return createOutput(ctx);
+        }
+    }
+
+    /**
+     * @return Task info.
+     */
+    public HadoopTaskInfo taskInfo() {
+        return info;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
new file mode 100644
index 0000000..f13c76a
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import 
org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
+
+/**
+ * Common superclass for task executor.
+ */
+public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
+    /**
+     * Runs tasks.
+     *
+     * @param job Job.
+     * @param tasks Tasks.
+     * @throws IgniteCheckedException If failed.
+     */
+    public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> 
tasks) throws IgniteCheckedException;
+
+    /**
+     * Cancels all currently running tasks for given job ID and cancels 
scheduled execution of tasks
+     * for this job ID.
+     * <p>
+     * It is guaranteed that this method will not be called concurrently with
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, 
Collection)} method. No more job submissions will be performed via
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, 
Collection)} method for given job ID after this method is called.
+     *
+     * @param jobId Job ID to cancel.
+     */
+    public abstract void cancelTasks(HadoopJobId jobId) throws 
IgniteCheckedException;
+
+    /**
+     * On job state change callback;
+     *
+     * @param meta Job metadata.
+     */
+    public abstract void onJobStateChanged(HadoopJobMetadata meta) throws 
IgniteCheckedException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
new file mode 100644
index 0000000..b22d291
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+/**
+* State of the task.
+*/
+public enum HadoopTaskState {
+    /** Running task. */
+    RUNNING,
+
+    /** Completed task. */
+    COMPLETED,
+
+    /** Failed task. */
+    FAILED,
+
+    /** Canceled task. */
+    CANCELED,
+
+    /** Process crashed. */
+    CRASHED
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
new file mode 100644
index 0000000..fa09ff7
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task status.
+ */
+public class HadoopTaskStatus implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private HadoopTaskState state;
+
+    /** */
+    private Throwable failCause;
+
+    /** */
+    private HadoopCounters cntrs;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopTaskStatus() {
+        // No-op.
+    }
+
+    /**
+     * Creates new instance.
+     *
+     * @param state Task state.
+     * @param failCause Failure cause (if any).
+     */
+    public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable 
failCause) {
+        this(state, failCause, null);
+    }
+
+    /**
+     * Creates new instance.
+     *
+     * @param state Task state.
+     * @param failCause Failure cause (if any).
+     * @param cntrs Task counters.
+     */
+    public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable 
failCause,
+        @Nullable HadoopCounters cntrs) {
+        assert state != null;
+
+        this.state = state;
+        this.failCause = failCause;
+        this.cntrs = cntrs;
+    }
+
+    /**
+     * @return State.
+     */
+    public HadoopTaskState state() {
+        return state;
+    }
+
+    /**
+     * @return Fail cause.
+     */
+    @Nullable public Throwable failCause() {
+        return failCause;
+    }
+
+    /**
+     * @return Counters.
+     */
+    @Nullable public HadoopCounters counters() {
+        return cntrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopTaskStatus.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(state);
+        out.writeObject(failCause);
+        out.writeObject(cntrs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        state = (HadoopTaskState)in.readObject();
+        failCause = (Throwable)in.readObject();
+        cntrs = (HadoopCounters)in.readObject();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
new file mode 100644
index 0000000..dc5874d
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import 
org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
+import 
org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.HadoopExternalProcessStarter;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopExternalCommunication;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopMessageListener;
+import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.IgnitePortProtocol;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+import static 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.CRASHED;
+import static 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.FAILED;
+
+/**
+ * External process registry. Handles external process lifecycle.
+ */
+public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
+    /** Hadoop context. */
+    private HadoopContext ctx;
+
+    /** */
+    private String javaCmd;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Node process descriptor. */
+    private HadoopProcessDescriptor nodeDesc;
+
+    /** Output base. */
+    private File outputBase;
+
+    /** Path separator. */
+    private String pathSep;
+
+    /** Hadoop external communication. */
+    private HadoopExternalCommunication comm;
+
+    /** Starting processes. */
+    private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = 
new ConcurrentHashMap8<>();
+
+    /** Starting processes. */
+    private final ConcurrentMap<HadoopJobId, HadoopProcess> 
runningProcsByJobId = new ConcurrentHashMap8<>();
+
+    /** Busy lock. */
+    private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+
+    /** Job tracker. */
+    private HadoopJobTracker jobTracker;
+
+    /** {@inheritDoc} */
+    @Override public void start(HadoopContext ctx) throws 
IgniteCheckedException {
+        this.ctx = ctx;
+
+        log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class);
+
+        outputBase = U.resolveWorkDirectory("hadoop", false);
+
+        pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : 
":");
+
+        initJavaCommand();
+
+        comm = new HadoopExternalCommunication(
+            ctx.localNodeId(),
+            UUID.randomUUID(),
+            ctx.kernalContext().config().getMarshaller(),
+            log,
+            ctx.kernalContext().getSystemExecutorService(),
+            ctx.kernalContext().gridName());
+
+        comm.setListener(new MessageListener());
+
+        comm.start();
+
+        nodeDesc = comm.localProcessDescriptor();
+
+        ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), 
IgnitePortProtocol.TCP,
+            HadoopExternalTaskExecutor.class);
+
+        if (nodeDesc.sharedMemoryPort() != -1)
+            
ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), 
IgnitePortProtocol.TCP,
+                HadoopExternalTaskExecutor.class);
+
+        jobTracker = ctx.jobTracker();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        busyLock.writeLock();
+
+        try {
+            comm.stop();
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to gracefully stop external hadoop 
communication server (will shutdown anyway)", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onJobStateChanged(final HadoopJobMetadata meta) {
+        final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
+
+        // If we have a local process for this job.
+        if (proc != null) {
+            if (log.isDebugEnabled())
+                log.debug("Updating job information for remote task process 
[proc=" + proc + ", meta=" + meta + ']');
+
+            if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
+                if (log.isDebugEnabled())
+                    log.debug("Completed job execution, will terminate child 
process [jobId=" + meta.jobId() +
+                        ", proc=" + proc + ']');
+
+                runningProcsByJobId.remove(meta.jobId());
+                runningProcsByProcId.remove(proc.descriptor().processId());
+
+                proc.terminate();
+
+                return;
+            }
+
+            if (proc.initFut.isDone()) {
+                if (!proc.initFut.isFailed())
+                    sendJobInfoUpdate(proc, meta);
+                else if (log.isDebugEnabled())
+                    log.debug("Failed to initialize child process (will skip 
job state notification) " +
+                        "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
+            }
+            else {
+                proc.initFut.listen(new 
CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+                    @Override
+                    public void 
apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+                        try {
+                            f.get();
+
+                            sendJobInfoUpdate(proc, meta);
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to initialize child process 
(will skip job state notification) " +
+                                    "[jobId=" + meta.jobId() + ", meta=" + 
meta + ", err=" + e + ']');
+                        }
+
+                    }
+                });
+            }
+        }
+        else if (ctx.isParticipating(meta)) {
+            HadoopJob job;
+
+            try {
+                job = jobTracker.job(meta.jobId(), meta.jobInfo());
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to get job: " + meta.jobId(), e);
+
+                return;
+            }
+
+            startProcess(job, meta.mapReducePlan());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public void run(final HadoopJob job, final 
Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
+        if (!busyLock.tryReadLock()) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to start hadoop tasks (grid is stopping, 
will ignore).");
+
+            return;
+        }
+
+        try {
+            HadoopProcess proc = runningProcsByJobId.get(job.id());
+
+            HadoopTaskType taskType = F.first(tasks).type();
+
+            if (taskType == HadoopTaskType.SETUP || taskType == 
HadoopTaskType.ABORT ||
+                taskType == HadoopTaskType.COMMIT) {
+                if (proc == null || proc.terminated()) {
+                    runningProcsByJobId.remove(job.id(), proc);
+
+                    // Start new process for ABORT task since previous 
processes were killed.
+                    proc = startProcess(job, jobTracker.plan(job.id()));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Starting new process for maintenance task 
[jobId=" + job.id() +
+                            ", proc=" + proc + ", taskType=" + taskType + ']');
+                }
+            }
+            else
+                assert proc != null : "Missing started process for task 
execution request: " + job.id() +
+                    ", tasks=" + tasks;
+
+            final HadoopProcess proc0 = proc;
+
+            proc.initFut.listen(new 
CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+                @Override public void apply(
+                    IgniteInternalFuture<IgniteBiTuple<Process, 
HadoopProcessDescriptor>> f) {
+                    if (!busyLock.tryReadLock())
+                        return;
+
+                    try {
+                        f.get();
+
+                        proc0.addTasks(tasks);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Sending task execution request to child 
process [jobId=" + job.id() +
+                                ", proc=" + proc0 + ", tasks=" + tasks + ']');
+
+                        sendExecutionRequest(proc0, job, tasks);
+                    }
+                    catch (IgniteCheckedException e) {
+                        notifyTasksFailed(tasks, FAILED, e);
+                    }
+                    finally {
+                        busyLock.readUnlock();
+                    }
+                }
+            });
+        }
+        finally {
+            busyLock.readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancelTasks(HadoopJobId jobId) {
+        HadoopProcess proc = runningProcsByJobId.get(jobId);
+
+        if (proc != null)
+            proc.terminate();
+    }
+
+    /**
+     * Sends execution request to remote node.
+     *
+     * @param proc Process to send request to.
+     * @param job Job instance.
+     * @param tasks Collection of tasks to execute in started process.
+     */
+    private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, 
Collection<HadoopTaskInfo> tasks)
+        throws IgniteCheckedException {
+        // Must synchronize since concurrent process crash may happen and will 
receive onConnectionLost().
+        proc.lock();
+
+        try {
+            if (proc.terminated()) {
+                notifyTasksFailed(tasks, CRASHED, null);
+
+                return;
+            }
+
+            HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest();
+
+            req.jobId(job.id());
+            req.jobInfo(job.info());
+            req.tasks(tasks);
+
+            comm.sendMessage(proc.descriptor(), req);
+        }
+        finally {
+            proc.unlock();
+        }
+    }
+
+    /**
+     * @return External task metadata.
+     */
+    private HadoopExternalTaskMetadata buildTaskMeta() {
+        HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata();
+
+        
meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
+        meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", 
"-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
+            "-DIGNITE_HOME=" + U.getIgniteHome()));
+
+        return meta;
+    }
+
+    /**
+     * @param tasks Tasks to notify about.
+     * @param state Fail state.
+     * @param e Optional error.
+     */
+    private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, 
HadoopTaskState state, Throwable e) {
+        HadoopTaskStatus fail = new HadoopTaskStatus(state, e);
+
+        for (HadoopTaskInfo task : tasks)
+            jobTracker.onTaskFinished(task, fail);
+    }
+
+    /**
+     * Starts process template that will be ready to execute Hadoop tasks.
+     *
+     * @param job Job instance.
+     * @param plan Map reduce plan.
+     */
+    private HadoopProcess startProcess(final HadoopJob job, final 
HadoopMapReducePlan plan) {
+        final UUID childProcId = UUID.randomUUID();
+
+        HadoopJobId jobId = job.id();
+
+        final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, 
jobId);
+
+        final HadoopProcess proc = new HadoopProcess(jobId, fut, 
plan.reducers(ctx.localNodeId()));
+
+        HadoopProcess old = runningProcsByJobId.put(jobId, proc);
+
+        assert old == null;
+
+        old = runningProcsByProcId.put(childProcId, proc);
+
+        assert old == null;
+
+        ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+            @Override public void run() {
+                if (!busyLock.tryReadLock()) {
+                    fut.onDone(new IgniteCheckedException("Failed to start 
external process (grid is stopping)."));
+
+                    return;
+                }
+
+                try {
+                    HadoopExternalTaskMetadata startMeta = buildTaskMeta();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Created hadoop child process metadata for 
job [job=" + job +
+                            ", childProcId=" + childProcId + ", taskMeta=" + 
startMeta + ']');
+
+                    Process proc = startJavaProcess(childProcId, startMeta, 
job);
+
+                    BufferedReader rdr = new BufferedReader(new 
InputStreamReader(proc.getInputStream()));
+
+                    String line;
+
+                    // Read up all the process output.
+                    while ((line = rdr.readLine()) != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Tracing process output: " + line);
+
+                        if ("Started".equals(line)) {
+                            // Process started successfully, it should not 
write anything more to the output stream.
+                            if (log.isDebugEnabled())
+                                log.debug("Successfully started child process 
[childProcId=" + childProcId +
+                                    ", meta=" + job + ']');
+
+                            fut.onProcessStarted(proc);
+
+                            break;
+                        }
+                        else if ("Failed".equals(line)) {
+                            StringBuilder sb = new StringBuilder("Failed to 
start child process: " + job + "\n");
+
+                            while ((line = rdr.readLine()) != null)
+                                sb.append("    ").append(line).append("\n");
+
+                            // Cut last character.
+                            sb.setLength(sb.length() - 1);
+
+                            log.warning(sb.toString());
+
+                            fut.onDone(new 
IgniteCheckedException(sb.toString()));
+
+                            break;
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    fut.onDone(new IgniteCheckedException("Failed to 
initialize child process: " + job, e));
+
+                    if (e instanceof Error)
+                        throw (Error)e;
+                }
+                finally {
+                    busyLock.readUnlock();
+                }
+            }
+        }, true);
+
+        fut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, 
HadoopProcessDescriptor>>>() {
+            @Override public void 
apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+                try {
+                    // Make sure there were no exceptions.
+                    f.get();
+
+                    prepareForJob(proc, job, plan);
+                }
+                catch (IgniteCheckedException ignore) {
+                    // Exception is printed in future's onDone() method.
+                }
+            }
+        });
+
+        return proc;
+    }
+
+    /**
+     * Checks that java local command is available.
+     *
+     * @throws IgniteCheckedException If initialization failed.
+     */
+    private void initJavaCommand() throws IgniteCheckedException {
+        String javaHome = System.getProperty("java.home");
+
+        if (javaHome == null)
+            javaHome = System.getenv("JAVA_HOME");
+
+        if (javaHome == null)
+            throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
+
+        javaCmd = javaHome + File.separator + "bin" + File.separator + 
(U.isWindows() ? "java.exe" : "java");
+
+        try {
+            Process proc = new ProcessBuilder(javaCmd, 
"-version").redirectErrorStream(true).start();
+
+            Collection<String> out = readProcessOutput(proc);
+
+            int res = proc.waitFor();
+
+            if (res != 0)
+                throw new IgniteCheckedException("Failed to execute 'java 
-version' command (process finished with nonzero " +
+                    "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', 
msg=" + F.first(out) + ']');
+
+            if (log.isInfoEnabled()) {
+                log.info("Will use java for external task execution: ");
+
+                for (String s : out)
+                    log.info("    " + s);
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to check java for 
external task execution.", e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteCheckedException("Failed to wait for process 
completion (thread got interrupted).", e);
+        }
+    }
+
+    /**
+     * Reads process output line-by-line.
+     *
+     * @param proc Process to read output.
+     * @return Read lines.
+     * @throws IOException If read failed.
+     */
+    private Collection<String> readProcessOutput(Process proc) throws 
IOException {
+        BufferedReader rdr = new BufferedReader(new 
InputStreamReader(proc.getInputStream()));
+
+        Collection<String> res = new ArrayList<>();
+
+        String s;
+
+        while ((s = rdr.readLine()) != null)
+            res.add(s);
+
+        return res;
+    }
+
+    /**
+     * Builds process from metadata.
+     *
+     * @param childProcId Child process ID.
+     * @param startMeta Metadata.
+     * @param job Job.
+     * @return Started process.
+     */
+    private Process startJavaProcess(UUID childProcId, 
HadoopExternalTaskMetadata startMeta,
+        HadoopJob job) throws Exception {
+        String outFldr = jobWorkFolder(job.id()) + File.separator + 
childProcId;
+
+        if (log.isDebugEnabled())
+            log.debug("Will write process log output to: " + outFldr);
+
+        List<String> cmd = new ArrayList<>();
+
+        File workDir = U.resolveWorkDirectory("", false);
+
+        cmd.add(javaCmd);
+        cmd.addAll(startMeta.jvmOptions());
+        cmd.add("-cp");
+        cmd.add(buildClasspath(startMeta.classpath()));
+        cmd.add(HadoopExternalProcessStarter.class.getName());
+        cmd.add("-cpid");
+        cmd.add(String.valueOf(childProcId));
+        cmd.add("-ppid");
+        cmd.add(String.valueOf(nodeDesc.processId()));
+        cmd.add("-nid");
+        cmd.add(String.valueOf(nodeDesc.parentNodeId()));
+        cmd.add("-addr");
+        cmd.add(nodeDesc.address());
+        cmd.add("-tport");
+        cmd.add(String.valueOf(nodeDesc.tcpPort()));
+        cmd.add("-sport");
+        cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
+        cmd.add("-out");
+        cmd.add(outFldr);
+        cmd.add("-wd");
+        cmd.add(workDir.getAbsolutePath());
+
+        return new ProcessBuilder(cmd)
+            .redirectErrorStream(true)
+            .directory(workDir)
+            .start();
+    }
+
+    /**
+     * Gets job work folder.
+     *
+     * @param jobId Job ID.
+     * @return Job work folder.
+     */
+    private String jobWorkFolder(HadoopJobId jobId) {
+        return outputBase + File.separator + "Job_" + jobId;
+    }
+
+    /**
+     * @param cp Classpath collection.
+     * @return Classpath string.
+     */
+    private String buildClasspath(Collection<String> cp) {
+        assert !cp.isEmpty();
+
+        StringBuilder sb = new StringBuilder();
+
+        for (String s : cp)
+            sb.append(s).append(pathSep);
+
+        sb.setLength(sb.length() - 1);
+
+        return sb.toString();
+    }
+
+    /**
+     * Sends job info update request to remote process.
+     *
+     * @param proc Process to send request to.
+     * @param meta Job metadata.
+     */
+    private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) 
{
+        Map<Integer, HadoopProcessDescriptor> rdcAddrs = 
meta.reducersAddresses();
+
+        int rdcNum = meta.mapReducePlan().reducers();
+
+        HadoopProcessDescriptor[] addrs = null;
+
+        if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
+            addrs = new HadoopProcessDescriptor[rdcNum];
+
+            for (int i = 0; i < rdcNum; i++) {
+                HadoopProcessDescriptor desc = rdcAddrs.get(i);
+
+                assert desc != null : "Missing reducing address [meta=" + meta 
+ ", rdc=" + i + ']';
+
+                addrs[i] = desc;
+            }
+        }
+
+        try {
+            comm.sendMessage(proc.descriptor(), new 
HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
+        }
+        catch (IgniteCheckedException e) {
+            if (!proc.terminated()) {
+                log.error("Failed to send job state update message to remote 
child process (will kill the process) " +
+                    "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
+
+                proc.terminate();
+            }
+        }
+    }
+
+    /**
+     * Sends prepare request to remote process.
+     *
+     * @param proc Process to send request to.
+     * @param job Job.
+     * @param plan Map reduce plan.
+     */
+    private void prepareForJob(HadoopProcess proc, HadoopJob job, 
HadoopMapReducePlan plan) {
+        try {
+            comm.sendMessage(proc.descriptor(), new 
HadoopPrepareForJobRequest(job.id(), job.info(),
+                plan.reducers(), plan.reducers(ctx.localNodeId())));
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send job prepare request to remote process 
[proc=" + proc + ", job=" + job +
+                ", plan=" + plan + ']', e);
+
+            proc.terminate();
+        }
+    }
+
+    /**
+     * Processes task finished message.
+     *
+     * @param desc Remote process descriptor.
+     * @param taskMsg Task finished message.
+     */
+    private void processTaskFinishedMessage(HadoopProcessDescriptor desc, 
HadoopTaskFinishedMessage taskMsg) {
+        HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+        if (proc != null)
+            proc.removeTask(taskMsg.taskInfo());
+
+        jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
+    }
+
+    /**
+     *
+     */
+    private class MessageListener implements HadoopMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessageReceived(HadoopProcessDescriptor desc, 
HadoopMessage msg) {
+            if (!busyLock.tryReadLock())
+                return;
+
+            try {
+                if (msg instanceof HadoopProcessStartedAck) {
+                    HadoopProcess proc = 
runningProcsByProcId.get(desc.processId());
+
+                    assert proc != null : "Missing child process for 
processId: " + desc;
+
+                    HadoopProcessFuture fut = proc.initFut;
+
+                    if (fut != null)
+                        fut.onReplyReceived(desc);
+                    // Safety.
+                    else
+                        log.warning("Failed to find process start future (will 
ignore): " + desc);
+                }
+                else if (msg instanceof HadoopTaskFinishedMessage) {
+                    HadoopTaskFinishedMessage taskMsg = 
(HadoopTaskFinishedMessage)msg;
+
+                    processTaskFinishedMessage(desc, taskMsg);
+                }
+                else
+                    log.warning("Unexpected message received by node [desc=" + 
desc + ", msg=" + msg + ']');
+            }
+            finally {
+                busyLock.readUnlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+            if (!busyLock.tryReadLock())
+                return;
+
+            try {
+                if (desc == null) {
+                    U.warn(log, "Handshake failed.");
+
+                    return;
+                }
+
+                // Notify job tracker about failed tasks.
+                HadoopProcess proc = 
runningProcsByProcId.get(desc.processId());
+
+                if (proc != null) {
+                    Collection<HadoopTaskInfo> tasks = proc.tasks();
+
+                    if (!F.isEmpty(tasks)) {
+                        log.warning("Lost connection with alive process (will 
terminate): " + desc);
+
+                        HadoopTaskStatus status = new HadoopTaskStatus(CRASHED,
+                            new IgniteCheckedException("Failed to run tasks 
(external process finished unexpectedly): " + desc));
+
+                        for (HadoopTaskInfo info : tasks)
+                            jobTracker.onTaskFinished(info, status);
+
+                        runningProcsByJobId.remove(proc.jobId(), proc);
+                    }
+
+                    // Safety.
+                    proc.terminate();
+                }
+            }
+            finally {
+                busyLock.readUnlock();
+            }
+        }
+    }
+
+    /**
+     * Hadoop process.
+     */
+    private static class HadoopProcess extends ReentrantLock {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Job ID. */
+        private final HadoopJobId jobId;
+
+        /** Process. */
+        private Process proc;
+
+        /** Init future. Completes when process is ready to receive messages. 
*/
+        private final HadoopProcessFuture initFut;
+
+        /** Process descriptor. */
+        private HadoopProcessDescriptor procDesc;
+
+        /** Reducers planned for this process. */
+        private Collection<Integer> reducers;
+
+        /** Tasks. */
+        private final Collection<HadoopTaskInfo> tasks = new 
ConcurrentLinkedDeque8<>();
+
+        /** Terminated flag. */
+        private volatile boolean terminated;
+
+        /**
+         * @param jobId Job ID.
+         * @param initFut Init future.
+         */
+        private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut,
+            int[] reducers) {
+            this.jobId = jobId;
+            this.initFut = initFut;
+
+            if (!F.isEmpty(reducers)) {
+                this.reducers = new ArrayList<>(reducers.length);
+
+                for (int r : reducers)
+                    this.reducers.add(r);
+            }
+        }
+
+        /**
+         * @return Communication process descriptor.
+         */
+        private HadoopProcessDescriptor descriptor() {
+            return procDesc;
+        }
+
+        /**
+         * @return Job ID.
+         */
+        public HadoopJobId jobId() {
+            return jobId;
+        }
+
+        /**
+         * Initialized callback.
+         *
+         * @param proc Java process representation.
+         * @param procDesc Process descriptor.
+         */
+        private void onInitialized(Process proc, HadoopProcessDescriptor 
procDesc) {
+            this.proc = proc;
+            this.procDesc = procDesc;
+        }
+
+        /**
+         * Terminates process (kills it).
+         */
+        private void terminate() {
+            // Guard against concurrent message sending.
+            lock();
+
+            try {
+                terminated = true;
+
+                if (!initFut.isDone())
+                    initFut.listen(new 
CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+                        @Override public void apply(
+                            IgniteInternalFuture<IgniteBiTuple<Process, 
HadoopProcessDescriptor>> f) {
+                            proc.destroy();
+                        }
+                    });
+                else
+                    proc.destroy();
+            }
+            finally {
+                unlock();
+            }
+        }
+
+        /**
+         * @return Terminated flag.
+         */
+        private boolean terminated() {
+            return terminated;
+        }
+
+        /**
+         * Sets process tasks.
+         *
+         * @param tasks Tasks to set.
+         */
+        private void addTasks(Collection<HadoopTaskInfo> tasks) {
+            this.tasks.addAll(tasks);
+        }
+
+        /**
+         * Removes task when it was completed.
+         *
+         * @param task Task to remove.
+         */
+        private void removeTask(HadoopTaskInfo task) {
+            if (tasks != null)
+                tasks.remove(task);
+        }
+
+        /**
+         * @return Collection of tasks.
+         */
+        private Collection<HadoopTaskInfo> tasks() {
+            return tasks;
+        }
+
+        /**
+         * @return Planned reducers.
+         */
+        private Collection<Integer> reducers() {
+            return reducers;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(HadoopProcess.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private class HadoopProcessFuture extends 
GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Child process ID. */
+        private UUID childProcId;
+
+        /** Job ID. */
+        private HadoopJobId jobId;
+
+        /** Process descriptor. */
+        private HadoopProcessDescriptor desc;
+
+        /** Running process. */
+        private Process proc;
+
+        /** Process started flag. */
+        private volatile boolean procStarted;
+
+        /** Reply received flag. */
+        private volatile boolean replyReceived;
+
+        /** Logger. */
+        private final IgniteLogger log = HadoopExternalTaskExecutor.this.log;
+
+        /**
+         */
+        private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId) {
+            this.childProcId = childProcId;
+            this.jobId = jobId;
+        }
+
+        /**
+         * Process started callback.
+         */
+        public void onProcessStarted(Process proc) {
+            this.proc = proc;
+
+            procStarted = true;
+
+            if (procStarted && replyReceived)
+                onDone(F.t(proc, desc));
+        }
+
+        /**
+         * Reply received callback.
+         */
+        public void onReplyReceived(HadoopProcessDescriptor desc) {
+            assert childProcId.equals(desc.processId());
+
+            this.desc = desc;
+
+            replyReceived = true;
+
+            if (procStarted && replyReceived)
+                onDone(F.t(proc, desc));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable IgniteBiTuple<Process, 
HadoopProcessDescriptor> res,
+            @Nullable Throwable err) {
+            if (err == null) {
+                HadoopProcess proc = runningProcsByProcId.get(childProcId);
+
+                assert proc != null;
+
+                assert proc.initFut == this;
+
+                proc.onInitialized(res.get1(), res.get2());
+
+                if (!F.isEmpty(proc.reducers()))
+                    jobTracker.onExternalMappersInitialized(jobId, 
proc.reducers(), desc);
+            }
+            else {
+                // Clean up since init failed.
+                runningProcsByJobId.remove(jobId);
+                runningProcsByProcId.remove(childProcId);
+            }
+
+            if (super.onDone(res, err)) {
+                if (err == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Initialized child process for external task 
execution [jobId=" + jobId +
+                            ", desc=" + desc + ", initTime=" + duration() + 
']');
+                }
+                else
+                    U.error(log, "Failed to initialize child process for 
external task execution [jobId=" + jobId +
+                        ", desc=" + desc + ']', err);
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
new file mode 100644
index 0000000..27b0329
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import java.util.Collection;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * External task metadata (classpath, JVM options) needed to start external 
process execution.
+ */
+public class HadoopExternalTaskMetadata {
+    /** Process classpath. */
+    private Collection<String> classpath;
+
+    /** JVM options. */
+    @GridToStringInclude
+    private Collection<String> jvmOpts;
+
+    /**
+     * @return JVM Options.
+     */
+    public Collection<String> jvmOptions() {
+        return jvmOpts;
+    }
+
+    /**
+     * @param jvmOpts JVM options.
+     */
+    public void jvmOptions(Collection<String> jvmOpts) {
+        this.jvmOpts = jvmOpts;
+    }
+
+    /**
+     * @return Classpath.
+     */
+    public Collection<String> classpath() {
+        return classpath;
+    }
+
+    /**
+     * @param classpath Classpath.
+     */
+    public void classpath(Collection<String> classpath) {
+        this.classpath = classpath;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopExternalTaskMetadata.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
new file mode 100644
index 0000000..96b3675
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Job info update request.
+ */
+public class HadoopJobInfoUpdateRequest implements HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job ID. */
+    @GridToStringInclude
+    private HadoopJobId jobId;
+
+    /** Job phase. */
+    @GridToStringInclude
+    private HadoopJobPhase jobPhase;
+
+    /** Reducers addresses. */
+    @GridToStringInclude
+    private HadoopProcessDescriptor[] reducersAddrs;
+
+    /**
+     * Constructor required by {@link Externalizable}.
+     */
+    public HadoopJobInfoUpdateRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param jobId Job ID.
+     * @param jobPhase Job phase.
+     * @param reducersAddrs Reducers addresses.
+     */
+    public HadoopJobInfoUpdateRequest(HadoopJobId jobId, HadoopJobPhase 
jobPhase,
+        HadoopProcessDescriptor[] reducersAddrs) {
+        assert jobId != null;
+
+        this.jobId = jobId;
+        this.jobPhase = jobPhase;
+        this.reducersAddrs = reducersAddrs;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /**
+     * @return Job phase.
+     */
+    public HadoopJobPhase jobPhase() {
+        return jobPhase;
+    }
+
+    /**
+     * @return Reducers addresses.
+     */
+    public HadoopProcessDescriptor[] reducersAddresses() {
+        return reducersAddrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        jobId.writeExternal(out);
+
+        out.writeObject(jobPhase);
+        U.writeArray(out, reducersAddrs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        jobId = new HadoopJobId();
+        jobId.readExternal(in);
+
+        jobPhase = (HadoopJobPhase)in.readObject();
+        reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopJobInfoUpdateRequest.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
new file mode 100644
index 0000000..43bdc36
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Child process initialization request.
+ */
+public class HadoopPrepareForJobRequest implements HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job ID. */
+    @GridToStringInclude
+    private HadoopJobId jobId;
+
+    /** Job info. */
+    @GridToStringInclude
+    private HadoopJobInfo jobInfo;
+
+    /** Total amount of reducers in the job. */
+    @GridToStringInclude
+    private int totalReducersCnt;
+
+    /** Reducers to be executed on current node. */
+    @GridToStringInclude
+    private int[] locReducers;
+
+    /**
+     * Constructor required by {@link Externalizable}.
+     */
+    public HadoopPrepareForJobRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param jobId Job ID.
+     * @param jobInfo Job info.
+     * @param totalReducersCnt Number of reducers in the job.
+     * @param locReducers Reducers to be executed on current node.
+     */
+    public HadoopPrepareForJobRequest(HadoopJobId jobId, HadoopJobInfo 
jobInfo, int totalReducersCnt,
+        int[] locReducers) {
+        assert jobId != null;
+
+        this.jobId = jobId;
+        this.jobInfo = jobInfo;
+        this.totalReducersCnt = totalReducersCnt;
+        this.locReducers = locReducers;
+    }
+
+    /**
+     * @return Job info.
+     */
+    public HadoopJobInfo jobInfo() {
+        return jobInfo;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /**
+     * @return Reducers to be executed on current node.
+     */
+    public int[] localReducers() {
+        return locReducers;
+    }
+
+    /**
+     * @return Number of reducers in job.
+     */
+    public int totalReducerCount() {
+        return totalReducersCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        jobId.writeExternal(out);
+
+        out.writeObject(jobInfo);
+        out.writeInt(totalReducersCnt);
+
+        U.writeIntArray(out, locReducers);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        jobId = new HadoopJobId();
+        jobId.readExternal(in);
+
+        jobInfo = (HadoopJobInfo)in.readObject();
+        totalReducersCnt = in.readInt();
+
+        locReducers = U.readIntArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopPrepareForJobRequest.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
new file mode 100644
index 0000000..2dc233b
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Process descriptor used to identify process for which task is running.
+ */
+public class HadoopProcessDescriptor implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Parent node ID. */
+    private UUID parentNodeId;
+
+    /** Process ID. */
+    private UUID procId;
+
+    /** Address. */
+    private String addr;
+
+    /** TCP port. */
+    private int tcpPort;
+
+    /** Shared memory port. */
+    private int shmemPort;
+
+    /**
+     * @param parentNodeId Parent node ID.
+     * @param procId Process ID.
+     */
+    public HadoopProcessDescriptor(UUID parentNodeId, UUID procId) {
+        this.parentNodeId = parentNodeId;
+        this.procId = procId;
+    }
+
+    /**
+     * Gets process ID.
+     *
+     * @return Process ID.
+     */
+    public UUID processId() {
+        return procId;
+    }
+
+    /**
+     * Gets parent node ID.
+     *
+     * @return Parent node ID.
+     */
+    public UUID parentNodeId() {
+        return parentNodeId;
+    }
+
+    /**
+     * Gets host address.
+     *
+     * @return Host address.
+     */
+    public String address() {
+        return addr;
+    }
+
+    /**
+     * Sets host address.
+     *
+     * @param addr Host address.
+     */
+    public void address(String addr) {
+        this.addr = addr;
+    }
+
+    /**
+     * @return Shared memory port.
+     */
+    public int sharedMemoryPort() {
+        return shmemPort;
+    }
+
+    /**
+     * Sets shared memory port.
+     *
+     * @param shmemPort Shared memory port.
+     */
+    public void sharedMemoryPort(int shmemPort) {
+        this.shmemPort = shmemPort;
+    }
+
+    /**
+     * @return TCP port.
+     */
+    public int tcpPort() {
+        return tcpPort;
+    }
+
+    /**
+     * Sets TCP port.
+     *
+     * @param tcpPort TCP port.
+     */
+    public void tcpPort(int tcpPort) {
+        this.tcpPort = tcpPort;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof HadoopProcessDescriptor))
+            return false;
+
+        HadoopProcessDescriptor that = (HadoopProcessDescriptor)o;
+
+        return parentNodeId.equals(that.parentNodeId) && 
procId.equals(that.procId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int result = parentNodeId.hashCode();
+
+        result = 31 * result + procId.hashCode();
+
+        return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopProcessDescriptor.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
new file mode 100644
index 0000000..b35f3ec
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Process started message.
+ */
+public class HadoopProcessStartedAck implements HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopProcessStartedAck.class, this);
+    }
+}
\ No newline at end of file

Reply via email to