http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
new file mode 100644
index 0000000..3875304
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Message sent from node to child process to start task(s) execution.
+ */
+public class HadoopTaskExecutionRequest implements HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job ID. */
+    @GridToStringInclude
+    private HadoopJobId jobId;
+
+    /** Job info. */
+    @GridToStringInclude
+    private HadoopJobInfo jobInfo;
+
+    /** Mappers. */
+    @GridToStringInclude
+    private Collection<HadoopTaskInfo> tasks;
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /**
+     * @param jobId Job ID.
+     */
+    public void jobId(HadoopJobId jobId) {
+        this.jobId = jobId;
+    }
+
+    /**
+     * @return Jon info.
+     */
+    public HadoopJobInfo jobInfo() {
+        return jobInfo;
+    }
+
+    /**
+     * @param jobInfo Job info.
+     */
+    public void jobInfo(HadoopJobInfo jobInfo) {
+        this.jobInfo = jobInfo;
+    }
+
+    /**
+     * @return Tasks.
+     */
+    public Collection<HadoopTaskInfo> tasks() {
+        return tasks;
+    }
+
+    /**
+     * @param tasks Tasks.
+     */
+    public void tasks(Collection<HadoopTaskInfo> tasks) {
+        this.tasks = tasks;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopTaskExecutionRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        jobId.writeExternal(out);
+
+        out.writeObject(jobInfo);
+        U.writeCollection(out, tasks);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        jobId = new HadoopJobId();
+        jobId.readExternal(in);
+
+        jobInfo = (HadoopJobInfo)in.readObject();
+        tasks = U.readCollection(in);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
new file mode 100644
index 0000000..9e1fdb3
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Task finished message. Sent when local task finishes execution.
+ */
+public class HadoopTaskFinishedMessage implements HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Finished task info. */
+    private HadoopTaskInfo taskInfo;
+
+    /** Task finish status. */
+    private HadoopTaskStatus status;
+
+    /**
+     * Constructor required by {@link Externalizable}.
+     */
+    public HadoopTaskFinishedMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param taskInfo Finished task info.
+     * @param status Task finish status.
+     */
+    public HadoopTaskFinishedMessage(HadoopTaskInfo taskInfo, HadoopTaskStatus 
status) {
+        assert taskInfo != null;
+        assert status != null;
+
+        this.taskInfo = taskInfo;
+        this.status = status;
+    }
+
+    /**
+     * @return Finished task info.
+     */
+    public HadoopTaskInfo taskInfo() {
+        return taskInfo;
+    }
+
+    /**
+     * @return Task finish status.
+     */
+    public HadoopTaskStatus status() {
+        return status;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopTaskFinishedMessage.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        taskInfo.writeExternal(out);
+        status.writeExternal(out);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        taskInfo = new HadoopTaskInfo();
+        taskInfo.readExternal(in);
+
+        status = new HadoopTaskStatus();
+        status.readExternal(in);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
new file mode 100644
index 0000000..4a946e9
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob;
+import 
org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopExecutorService;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopRunnableTask;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopJobInfoUpdateRequest;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopPrepareForJobRequest;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessStartedAck;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopTaskExecutionRequest;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopTaskFinishedMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopExternalCommunication;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopMessageListener;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopTaskType.REDUCE;
+
+/**
+ * Hadoop process base.
+ */
+@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+public class HadoopChildProcessRunner {
+    /** Node process descriptor. */
+    private HadoopProcessDescriptor nodeDesc;
+
+    /** Message processing executor service. */
+    private ExecutorService msgExecSvc;
+
+    /** Task executor service. */
+    private HadoopExecutorService execSvc;
+
+    /** */
+    protected GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+    /** External communication. */
+    private HadoopExternalCommunication comm;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Init guard. */
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /** Start time. */
+    private long startTime;
+
+    /** Init future. */
+    private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>();
+
+    /** Job instance. */
+    private HadoopJob job;
+
+    /** Number of uncompleted tasks. */
+    private final AtomicInteger pendingTasks = new AtomicInteger();
+
+    /** Shuffle job. */
+    private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob;
+
+    /** Concurrent mappers. */
+    private int concMappers;
+
+    /** Concurrent reducers. */
+    private int concReducers;
+
+    /**
+     * Starts child process runner.
+     */
+    public void start(HadoopExternalCommunication comm, 
HadoopProcessDescriptor nodeDesc,
+        ExecutorService msgExecSvc, IgniteLogger parentLog)
+        throws IgniteCheckedException {
+        this.comm = comm;
+        this.nodeDesc = nodeDesc;
+        this.msgExecSvc = msgExecSvc;
+
+        comm.setListener(new MessageListener());
+        log = parentLog.getLogger(HadoopChildProcessRunner.class);
+
+        startTime = U.currentTimeMillis();
+
+        // At this point node knows that this process has started.
+        comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck());
+    }
+
+    /**
+     * Initializes process for task execution.
+     *
+     * @param req Initialization request.
+     */
+    private void prepareProcess(HadoopPrepareForJobRequest req) {
+        if (initGuard.compareAndSet(false, true)) {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Initializing external hadoop task: " + req);
+
+                assert job == null;
+
+                job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), 
log, null);
+
+                job.initialize(true, nodeDesc.processId());
+
+                shuffleJob = new 
HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
+                    req.totalReducerCount(), req.localReducers());
+
+                initializeExecutors(req);
+
+                if (log.isDebugEnabled())
+                    log.debug("External process initialized [initWaitTime=" +
+                        (U.currentTimeMillis() - startTime) + ']');
+
+                initFut.onDone();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to initialize process: " + req, e);
+
+                initFut.onDone(e);
+            }
+        }
+        else
+            log.warning("Duplicate initialize process request received (will 
ignore): " + req);
+    }
+
+    /**
+     * @param req Task execution request.
+     */
+    private void runTasks(final HadoopTaskExecutionRequest req) {
+        if (!initFut.isDone() && log.isDebugEnabled())
+            log.debug("Will wait for process initialization future completion: 
" + req);
+
+        initFut.listen(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> f) {
+                try {
+                    // Make sure init was successful.
+                    f.get();
+
+                    boolean set = pendingTasks.compareAndSet(0, 
req.tasks().size());
+
+                    assert set;
+
+                    HadoopTaskInfo info = F.first(req.tasks());
+
+                    assert info != null;
+
+                    int size = info.type() == MAP ? concMappers : concReducers;
+
+//                    execSvc.setCorePoolSize(size);
+//                    execSvc.setMaximumPoolSize(size);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Set executor service size for task type 
[type=" + info.type() +
+                            ", size=" + size + ']');
+
+                    for (HadoopTaskInfo taskInfo : req.tasks()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Submitted task for external execution: 
" + taskInfo);
+
+                        execSvc.submit(new HadoopRunnableTask(log, job, mem, 
taskInfo, nodeDesc.parentNodeId()) {
+                            @Override protected void 
onTaskFinished(HadoopTaskStatus status) {
+                                onTaskFinished0(this, status);
+                            }
+
+                            @Override protected HadoopTaskInput 
createInput(HadoopTaskContext ctx)
+                                throws IgniteCheckedException {
+                                return shuffleJob.input(ctx);
+                            }
+
+                            @Override protected HadoopTaskOutput 
createOutput(HadoopTaskContext ctx)
+                                throws IgniteCheckedException {
+                                return shuffleJob.output(ctx);
+                            }
+                        });
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    for (HadoopTaskInfo info : req.tasks())
+                        notifyTaskFinished(info, new 
HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+                }
+            }
+        });
+    }
+
+    /**
+     * Creates executor services.
+     *
+     * @param req Init child process request.
+     */
+    private void initializeExecutors(HadoopPrepareForJobRequest req) {
+        int cpus = Runtime.getRuntime().availableProcessors();
+//
+//        concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus);
+//        concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, 
cpus);
+
+        execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024);
+    }
+
+    /**
+     * Updates external process map so that shuffle can proceed with sending 
messages to reducers.
+     *
+     * @param req Update request.
+     */
+    private void updateTasks(final HadoopJobInfoUpdateRequest req) {
+        initFut.listen(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> gridFut) {
+                assert initGuard.get();
+
+                assert req.jobId().equals(job.id());
+
+                if (req.reducersAddresses() != null) {
+                    if 
(shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
+                        shuffleJob.startSending("external",
+                            new IgniteInClosure2X<HadoopProcessDescriptor, 
HadoopShuffleMessage>() {
+                                @Override public void 
applyx(HadoopProcessDescriptor dest,
+                                    HadoopShuffleMessage msg) throws 
IgniteCheckedException {
+                                    comm.sendMessage(dest, msg);
+                                }
+                            });
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Stops all executors and running tasks.
+     */
+    private void shutdown() {
+        if (execSvc != null)
+            execSvc.shutdown(5000);
+
+        if (msgExecSvc != null)
+            msgExecSvc.shutdownNow();
+
+        try {
+            job.dispose(true);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to dispose job.", e);
+        }
+    }
+
+    /**
+     * Notifies node about task finish.
+     *
+     * @param run Finished task runnable.
+     * @param status Task status.
+     */
+    private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus 
status) {
+        HadoopTaskInfo info = run.taskInfo();
+
+        int pendingTasks0 = pendingTasks.decrementAndGet();
+
+        if (log.isDebugEnabled())
+            log.debug("Hadoop task execution finished [info=" + info
+                + ", state=" + status.state() + ", waitTime=" + run.waitTime() 
+ ", execTime=" + run.executionTime() +
+                ", pendingTasks=" + pendingTasks0 +
+                ", err=" + status.failCause() + ']');
+
+        assert info.type() == MAP || info.type() == REDUCE : "Only MAP or 
REDUCE tasks are supported.";
+
+        boolean flush = pendingTasks0 == 0 && info.type() == MAP;
+
+        notifyTaskFinished(info, status, flush);
+    }
+
+    /**
+     * @param taskInfo Finished task info.
+     * @param status Task status.
+     */
+    private void notifyTaskFinished(final HadoopTaskInfo taskInfo, final 
HadoopTaskStatus status,
+        boolean flush) {
+
+        final HadoopTaskState state = status.state();
+        final Throwable err = status.failCause();
+
+        if (!flush) {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Sending notification to parent node [taskInfo=" 
+ taskInfo + ", state=" + state +
+                        ", err=" + err + ']');
+
+                comm.sendMessage(nodeDesc, new 
HadoopTaskFinishedMessage(taskInfo, status));
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to send message to parent node (will 
terminate child process).", e);
+
+                shutdown();
+
+                terminate();
+            }
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Flushing shuffle messages before sending last task 
completion notification [taskInfo=" +
+                    taskInfo + ", state=" + state + ", err=" + err + ']');
+
+            final long start = U.currentTimeMillis();
+
+            try {
+                shuffleJob.flush().listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        long end = U.currentTimeMillis();
+
+                        if (log.isDebugEnabled())
+                            log.debug("Finished flushing shuffle messages 
[taskInfo=" + taskInfo +
+                                ", flushTime=" + (end - start) + ']');
+
+                        try {
+                            // Check for errors on shuffle.
+                            f.get();
+
+                            notifyTaskFinished(taskInfo, status, false);
+                        }
+                        catch (IgniteCheckedException e) {
+                            log.error("Failed to flush shuffle messages (will 
fail the task) [taskInfo=" + taskInfo +
+                                ", state=" + state + ", err=" + err + ']', e);
+
+                            notifyTaskFinished(taskInfo,
+                                new HadoopTaskStatus(HadoopTaskState.FAILED, 
e), false);
+                        }
+                    }
+                });
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to flush shuffle messages (will fail the 
task) [taskInfo=" + taskInfo +
+                    ", state=" + state + ", err=" + err + ']', e);
+
+                notifyTaskFinished(taskInfo, new 
HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+            }
+        }
+    }
+
+    /**
+     * Checks if message was received from parent node and prints warning if 
not.
+     *
+     * @param desc Sender process ID.
+     * @param msg Received message.
+     * @return {@code True} if received from parent node.
+     */
+    private boolean validateNodeMessage(HadoopProcessDescriptor desc, 
HadoopMessage msg) {
+        if (!nodeDesc.processId().equals(desc.processId())) {
+            log.warning("Received process control request from unknown process 
(will ignore) [desc=" + desc +
+                ", msg=" + msg + ']');
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Stops execution of this process.
+     */
+    private void terminate() {
+        System.exit(1);
+    }
+
+    /**
+     * Message listener.
+     */
+    private class MessageListener implements HadoopMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessageReceived(final HadoopProcessDescriptor 
desc, final HadoopMessage msg) {
+            if (msg instanceof HadoopTaskExecutionRequest) {
+                if (validateNodeMessage(desc, msg))
+                    runTasks((HadoopTaskExecutionRequest)msg);
+            }
+            else if (msg instanceof HadoopJobInfoUpdateRequest) {
+                if (validateNodeMessage(desc, msg))
+                    updateTasks((HadoopJobInfoUpdateRequest)msg);
+            }
+            else if (msg instanceof HadoopPrepareForJobRequest) {
+                if (validateNodeMessage(desc, msg))
+                    prepareProcess((HadoopPrepareForJobRequest)msg);
+            }
+            else if (msg instanceof HadoopShuffleMessage) {
+                if (log.isTraceEnabled())
+                    log.trace("Received shuffle message [desc=" + desc + ", 
msg=" + msg + ']');
+
+                initFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        try {
+                            HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+
+                            shuffleJob.onShuffleMessage(m);
+
+                            comm.sendMessage(desc, new 
HadoopShuffleAck(m.id(), m.jobId()));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to process hadoop shuffle 
message [desc=" + desc + ", msg=" + msg + ']', e);
+                        }
+                    }
+                });
+            }
+            else if (msg instanceof HadoopShuffleAck) {
+                if (log.isTraceEnabled())
+                    log.trace("Received shuffle ack [desc=" + desc + ", msg=" 
+ msg + ']');
+
+                shuffleJob.onShuffleAck((HadoopShuffleAck)msg);
+            }
+            else
+                log.warning("Unknown message received (will ignore) [desc=" + 
desc + ", msg=" + msg + ']');
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+            if (log.isDebugEnabled())
+                log.debug("Lost connection with remote process: " + desc);
+
+            if (desc == null)
+                U.warn(log, "Handshake failed.");
+            else if (desc.processId().equals(nodeDesc.processId())) {
+                log.warning("Child process lost connection with parent node 
(will terminate child process).");
+
+                shutdown();
+
+                terminate();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
new file mode 100644
index 0000000..32880e4
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.URL;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopExternalCommunication;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.logger.log4j.Log4JLogger;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+
+/**
+ * Hadoop external process base class.
+ */
+public class HadoopExternalProcessStarter {
+    /** Path to Log4j configuration file. */
+    public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml";
+
+    /** Arguments. */
+    private Args args;
+
+    /** System out. */
+    private OutputStream out;
+
+    /** System err. */
+    private OutputStream err;
+
+    /**
+     * @param args Parsed arguments.
+     */
+    public HadoopExternalProcessStarter(Args args) {
+        this.args = args;
+    }
+
+    /**
+     * @param cmdArgs Process arguments.
+     */
+    public static void main(String[] cmdArgs) {
+        try {
+            Args args = arguments(cmdArgs);
+
+            new HadoopExternalProcessStarter(args).run();
+        }
+        catch (Exception e) {
+            System.err.println("Failed");
+
+            System.err.println(e.getMessage());
+
+            e.printStackTrace(System.err);
+        }
+    }
+
+    /**
+     *
+     * @throws Exception
+     */
+    public void run() throws Exception {
+        U.setWorkDirectory(args.workDir, U.getIgniteHome());
+
+        File outputDir = outputDirectory();
+
+        initializeStreams(outputDir);
+
+        ExecutorService msgExecSvc = Executors.newFixedThreadPool(
+            Integer.getInteger("MSG_THREAD_POOL_SIZE", 
Runtime.getRuntime().availableProcessors() * 2));
+
+        IgniteLogger log = logger(outputDir);
+
+        HadoopExternalCommunication comm = new HadoopExternalCommunication(
+            args.nodeId,
+            args.childProcId,
+            new JdkMarshaller(),
+            log,
+            msgExecSvc,
+            "external"
+        );
+
+        comm.start();
+
+        HadoopProcessDescriptor nodeDesc = new 
HadoopProcessDescriptor(args.nodeId, args.parentProcId);
+        nodeDesc.address(args.addr);
+        nodeDesc.tcpPort(args.tcpPort);
+        nodeDesc.sharedMemoryPort(args.shmemPort);
+
+        HadoopChildProcessRunner runner = new HadoopChildProcessRunner();
+
+        runner.start(comm, nodeDesc, msgExecSvc, log);
+
+        System.err.println("Started");
+        System.err.flush();
+
+        System.setOut(new PrintStream(out));
+        System.setErr(new PrintStream(err));
+    }
+
+    /**
+     * @param outputDir Directory for process output.
+     * @throws Exception
+     */
+    private void initializeStreams(File outputDir) throws Exception {
+        out = new FileOutputStream(new File(outputDir, args.childProcId + 
".out"));
+        err = new FileOutputStream(new File(outputDir, args.childProcId + 
".err"));
+    }
+
+    /**
+     * @return Path to output directory.
+     * @throws IOException If failed.
+     */
+    private File outputDirectory() throws IOException {
+        File f = new File(args.out);
+
+        if (!f.exists()) {
+            if (!f.mkdirs())
+                throw new IOException("Failed to create output directory: " + 
args.out);
+        }
+        else {
+            if (f.isFile())
+                throw new IOException("Output directory is a file: " + 
args.out);
+        }
+
+        return f;
+    }
+
+    /**
+     * @param outputDir Directory for process output.
+     * @return Logger.
+     */
+    private IgniteLogger logger(final File outputDir) {
+        final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG);
+
+        Log4JLogger logger;
+
+        try {
+            logger = url != null ? new Log4JLogger(url) : new 
Log4JLogger(true);
+        }
+        catch (IgniteCheckedException e) {
+            System.err.println("Failed to create URL-based logger. Will use 
default one.");
+
+            e.printStackTrace();
+
+            logger = new Log4JLogger(true);
+        }
+
+        logger.updateFilePath(new IgniteClosure<String, String>() {
+            @Override public String apply(String s) {
+                return new File(outputDir, args.childProcId + 
".log").getAbsolutePath();
+            }
+        });
+
+        return logger;
+    }
+
+    /**
+     * @param processArgs Process arguments.
+     * @return Child process instance.
+     */
+    private static Args arguments(String[] processArgs) throws Exception {
+        Args args = new Args();
+
+        for (int i = 0; i < processArgs.length; i++) {
+            String arg = processArgs[i];
+
+            switch (arg) {
+                case "-cpid": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing process ID for '-cpid' 
parameter");
+
+                    String procIdStr = processArgs[++i];
+
+                    args.childProcId = UUID.fromString(procIdStr);
+
+                    break;
+                }
+
+                case "-ppid": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing process ID for '-ppid' 
parameter");
+
+                    String procIdStr = processArgs[++i];
+
+                    args.parentProcId = UUID.fromString(procIdStr);
+
+                    break;
+                }
+
+                case "-nid": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing node ID for '-nid' 
parameter");
+
+                    String nodeIdStr = processArgs[++i];
+
+                    args.nodeId = UUID.fromString(nodeIdStr);
+
+                    break;
+                }
+
+                case "-addr": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing node address for '-addr' 
parameter");
+
+                    args.addr = processArgs[++i];
+
+                    break;
+                }
+
+                case "-tport": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing tcp port for '-tport' 
parameter");
+
+                    args.tcpPort = Integer.parseInt(processArgs[++i]);
+
+                    break;
+                }
+
+                case "-sport": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing shared memory port for 
'-sport' parameter");
+
+                    args.shmemPort = Integer.parseInt(processArgs[++i]);
+
+                    break;
+                }
+
+                case "-out": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing output folder name for 
'-out' parameter");
+
+                    args.out = processArgs[++i];
+
+                    break;
+                }
+
+                case "-wd": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing work folder name for 
'-wd' parameter");
+
+                    args.workDir = processArgs[++i];
+
+                    break;
+                }
+            }
+        }
+
+        return args;
+    }
+
+    /**
+     * Execution arguments.
+     */
+    private static class Args {
+        /** Process ID. */
+        private UUID childProcId;
+
+        /** Process ID. */
+        private UUID parentProcId;
+
+        /** Process ID. */
+        private UUID nodeId;
+
+        /** Node address. */
+        private String addr;
+
+        /** TCP port */
+        private int tcpPort;
+
+        /** Shmem port. */
+        private int shmemPort = -1;
+
+        /** Output folder. */
+        private String out;
+
+        /** Work directory. */
+        private String workDir;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
new file mode 100644
index 0000000..ddf6a20
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Implements basic lifecycle for communication clients.
+ */
+public abstract class HadoopAbstractCommunicationClient implements 
HadoopCommunicationClient {
+    /** Time when this client was last used. */
+    private volatile long lastUsed = U.currentTimeMillis();
+
+    /** Reservations. */
+    private final AtomicInteger reserves = new AtomicInteger();
+
+    /** {@inheritDoc} */
+    @Override public boolean close() {
+        return reserves.compareAndSet(0, -1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void forceClose() {
+        reserves.set(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean closed() {
+        return reserves.get() == -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean reserve() {
+        while (true) {
+            int r = reserves.get();
+
+            if (r == -1)
+                return false;
+
+            if (reserves.compareAndSet(r, r + 1))
+                return true;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void release() {
+        while (true) {
+            int r = reserves.get();
+
+            if (r == -1)
+                return;
+
+            if (reserves.compareAndSet(r, r - 1))
+                return;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean reserved() {
+        return reserves.get() > 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getIdleTime() {
+        return U.currentTimeMillis() - lastUsed;
+    }
+
+    /**
+     * Updates used time.
+     */
+    protected void markUsed() {
+        lastUsed = U.currentTimeMillis();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopAbstractCommunicationClient.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopCommunicationClient.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopCommunicationClient.java
new file mode 100644
index 0000000..a325a3d
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopCommunicationClient.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+
+/**
+ *
+ */
+public interface HadoopCommunicationClient {
+    /**
+     * @return {@code True} if client has been closed by this call,
+     *      {@code false} if failed to close client (due to concurrent 
reservation or concurrent close).
+     */
+    public boolean close();
+
+    /**
+     * Forces client close.
+     */
+    public void forceClose();
+
+    /**
+     * @return {@code True} if client is closed;
+     */
+    public boolean closed();
+
+    /**
+     * @return {@code True} if client was reserved, {@code false} otherwise.
+     */
+    public boolean reserve();
+
+    /**
+     * Releases this client by decreasing reservations.
+     */
+    public void release();
+
+    /**
+     * @return {@code True} if client was reserved.
+     */
+    public boolean reserved();
+
+    /**
+     * Gets idle time of this client.
+     *
+     * @return Idle time of this client.
+     */
+    public long getIdleTime();
+
+    /**
+     * @param desc Process descriptor.
+     * @param msg Message to send.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) 
throws IgniteCheckedException;
+}
\ No newline at end of file

Reply via email to