http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
new file mode 100644
index 0000000..17c2ff5
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Grid client for NIO server.
+ */
+public class HadoopTcpNioCommunicationClient extends 
HadoopAbstractCommunicationClient {
+    /** Socket. */
+    private final GridNioSession ses;
+
+    /**
+     * Constructor for test purposes only.
+     */
+    public HadoopTcpNioCommunicationClient() {
+        ses = null;
+    }
+
+    /**
+     * @param ses Session.
+     */
+    public HadoopTcpNioCommunicationClient(GridNioSession ses) {
+        assert ses != null;
+
+        this.ses = ses;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean close() {
+        boolean res = super.close();
+
+        if (res)
+            ses.close();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void forceClose() {
+        super.forceClose();
+
+        ses.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendMessage(HadoopProcessDescriptor desc, 
HadoopMessage msg)
+        throws IgniteCheckedException {
+        if (closed())
+            throw new IgniteCheckedException("Client was closed: " + this);
+
+        GridNioFuture<?> fut = ses.send(msg);
+
+        if (fut.isDone())
+            fut.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getIdleTime() {
+        long now = U.currentTimeMillis();
+
+        // Session can be used for receiving and sending.
+        return Math.min(Math.min(now - ses.lastReceiveTime(), now - 
ses.lastSendScheduleTime()),
+            now - ses.lastSendTime());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopTcpNioCommunicationClient.class, this, 
super.toString());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
index 974d8d9..0be8bf9 100644
--- 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
@@ -39,7 +39,7 @@ import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import 
org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobTracker;
+import 
org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
 import org.apache.ignite.internal.processors.igfs.IgfsEx;
 import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.typedef.F;

http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java
index 9180ca6..b4e63d1 100644
--- 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java
@@ -20,7 +20,7 @@ package 
org.apache.ignite.internal.processors.hadoop.impl.taskexecutor;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import 
org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopExecutorService;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopExecutorService;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jsr166.LongAdder8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
deleted file mode 100644
index 7c43500..0000000
--- 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
-
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.igfs.IgfsOutputStream;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import 
org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-
-import static 
org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
-
-/**
- * Job tracker self test.
- */
-public class HadoopExternalTaskExecutionSelfTest extends 
HadoopAbstractSelfTest {
-    /** {@inheritDoc} */
-    @Override protected boolean igfsEnabled() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-404";);
-
-        startGrids(gridCount());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
-        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
-        // TODO: IGNITE-404: Uncomment when fixed.
-        //cfg.setExternalExecution(true);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setMarshaller(new JdkMarshaller());
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSimpleTaskSubmit() throws Exception {
-        String testInputFile = "/test";
-
-        prepareTestFile(testInputFile);
-
-        Configuration cfg = new Configuration();
-
-        setupFileSystems(cfg);
-
-        Job job = Job.getInstance(cfg);
-
-        job.setMapperClass(TestMapper.class);
-        job.setCombinerClass(TestReducer.class);
-        job.setReducerClass(TestReducer.class);
-
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(IntWritable.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
-
-        job.setNumReduceTasks(1);
-
-        FileInputFormat.setInputPaths(job, new Path("igfs://:" + 
getTestGridName(0) + "@/" + testInputFile));
-        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + 
getTestGridName(0) + "@/output"));
-
-        job.setJarByClass(getClass());
-
-        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new 
HadoopJobId(UUID.randomUUID(), 1),
-            createJobInfo(job.getConfiguration()));
-
-        fut.get();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMapperException() throws Exception {
-        String testInputFile = "/test";
-
-        prepareTestFile(testInputFile);
-
-        Configuration cfg = new Configuration();
-
-        setupFileSystems(cfg);
-
-        Job job = Job.getInstance(cfg);
-
-        job.setMapperClass(TestFailingMapper.class);
-        job.setCombinerClass(TestReducer.class);
-        job.setReducerClass(TestReducer.class);
-
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(IntWritable.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
-
-        job.setNumReduceTasks(1);
-
-        FileInputFormat.setInputPaths(job, new Path("igfs://:" + 
getTestGridName(0) + "@/" + testInputFile));
-        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + 
getTestGridName(0) + "@/output"));
-
-        job.setJarByClass(getClass());
-
-        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new 
HadoopJobId(UUID.randomUUID(), 1),
-            createJobInfo(job.getConfiguration()));
-
-        try {
-            fut.get();
-        }
-        catch (IgniteCheckedException e) {
-            IOException exp = X.cause(e, IOException.class);
-
-            assertNotNull(exp);
-            assertEquals("Test failure", exp.getMessage());
-        }
-    }
-
-    /**
-     * @param filePath File path to prepare.
-     * @throws Exception If failed.
-     */
-    private void prepareTestFile(String filePath) throws Exception {
-        IgniteFileSystem igfs = grid(0).fileSystem(igfsName);
-
-        try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) 
{
-            PrintWriter wr = new PrintWriter(new OutputStreamWriter(out));
-
-            for (int i = 0; i < 1000; i++)
-                wr.println("Hello, world: " + i);
-
-            wr.flush();
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestMapper extends Mapper<Object, Text, Text, 
IntWritable> {
-        /** One constant. */
-        private IntWritable one = new IntWritable(1);
-
-        /** Line constant. */
-        private Text line = new Text("line");
-
-        @Override protected void map(Object key, Text val, Context ctx) throws 
IOException, InterruptedException {
-            ctx.write(line, one);
-        }
-    }
-
-    /**
-     * Failing mapper.
-     */
-    private static class TestFailingMapper extends Mapper<Object, Text, Text, 
IntWritable> {
-        @Override protected void map(Object key, Text val, Context c) throws 
IOException, InterruptedException {
-            throw new IOException("Test failure");
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestReducer extends Reducer<Text, IntWritable, Text, 
IntWritable> {
-        /** Line constant. */
-        private Text line = new Text("line");
-
-        @Override protected void setup(Context ctx) throws IOException, 
InterruptedException {
-            super.setup(ctx);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void reduce(Text key, Iterable<IntWritable> 
values, Context ctx)
-            throws IOException, InterruptedException {
-            int s = 0;
-
-            for (IntWritable val : values)
-                s += val.get();
-
-            System.out.println(">>>> Reduced: " + s);
-
-            ctx.write(line, new IntWritable(s));
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
deleted file mode 100644
index a5ee04b..0000000
--- 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import 
org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Tests Hadoop external communication component.
- */
-public class HadoopExternalCommunicationSelfTest extends 
GridCommonAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-404";);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSimpleMessageSendingTcp() throws Exception {
-        checkSimpleMessageSending(false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSimpleMessageSendingShmem() throws Exception {
-        checkSimpleMessageSending(true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkSimpleMessageSending(boolean useShmem) throws Exception {
-        UUID parentNodeId = UUID.randomUUID();
-
-        Marshaller marsh = new JdkMarshaller();
-
-        IgniteLogger log = log();
-
-        HadoopExternalCommunication[] comms = new 
HadoopExternalCommunication[4];
-
-        try {
-            String name = "grid";
-
-            TestHadoopListener[] lsnrs = new TestHadoopListener[4];
-
-            int msgs = 10;
-
-            for (int i = 0; i < comms.length; i++) {
-                comms[i] = new HadoopExternalCommunication(parentNodeId, 
UUID.randomUUID(), marsh, log,
-                    Executors.newFixedThreadPool(1), name + i);
-
-                if (useShmem)
-                    comms[i].setSharedMemoryPort(14000);
-
-                lsnrs[i] = new TestHadoopListener(msgs);
-
-                comms[i].setListener(lsnrs[i]);
-
-                comms[i].start();
-            }
-
-            for (int r = 0; r < msgs; r++) {
-                for (int from = 0; from < comms.length; from++) {
-                    for (int to = 0; to < comms.length; to++) {
-                        if (from == to)
-                            continue;
-
-                        
comms[from].sendMessage(comms[to].localProcessDescriptor(), new 
TestMessage(from, to));
-                    }
-                }
-            }
-
-            U.sleep(1000);
-
-            for (TestHadoopListener lsnr : lsnrs) {
-                lsnr.await(3_000);
-
-                assertEquals(String.valueOf(lsnr.messages()), msgs * 
(comms.length - 1), lsnr.messages().size());
-            }
-        }
-        finally {
-            for (HadoopExternalCommunication comm : comms) {
-                if (comm != null)
-                    comm.stop();
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestHadoopListener implements HadoopMessageListener {
-        /** Received messages (array list is safe because executor has one 
thread). */
-        private Collection<TestMessage> msgs = new ArrayList<>();
-
-        /** Await latch. */
-        private CountDownLatch receiveLatch;
-
-        /**
-         * @param msgs Number of messages to await.
-         */
-        private TestHadoopListener(int msgs) {
-            receiveLatch = new CountDownLatch(msgs);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMessageReceived(HadoopProcessDescriptor desc, 
HadoopMessage msg) {
-            assert msg instanceof TestMessage;
-
-            msgs.add((TestMessage)msg);
-
-            receiveLatch.countDown();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
-            // No-op.
-        }
-
-        /**
-         * @return Received messages.
-         */
-        public Collection<TestMessage> messages() {
-            return msgs;
-        }
-
-        /**
-         * @param millis Time to await.
-         * @throws InterruptedException If wait interrupted.
-         */
-        public void await(int millis) throws InterruptedException {
-            receiveLatch.await(millis, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestMessage implements HadoopMessage {
-        /** From index. */
-        private int from;
-
-        /** To index. */
-        private int to;
-
-        /**
-         * @param from From index.
-         * @param to To index.
-         */
-        private TestMessage(int from, int to) {
-            this.from = from;
-            this.to = to;
-        }
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public TestMessage() {
-            // No-op.
-        }
-
-        /**
-         * @return From index.
-         */
-        public int from() {
-            return from;
-        }
-
-        /**
-         * @return To index.
-         */
-        public int to() {
-            return to;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeInt(from);
-            out.writeInt(to);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            from = in.readInt();
-            to = in.readInt();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
new file mode 100644
index 0000000..2385668
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import 
org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+
+import static 
org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Job tracker self test.
+ */
+public class HadoopExternalTaskExecutionSelfTest extends 
HadoopAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-404";);
+
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new JdkMarshaller());
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleTaskSubmit() throws Exception {
+        String testInputFile = "/test";
+
+        prepareTestFile(testInputFile);
+
+        Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        Job job = Job.getInstance(cfg);
+
+        job.setMapperClass(TestMapper.class);
+        job.setCombinerClass(TestReducer.class);
+        job.setReducerClass(TestReducer.class);
+
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(IntWritable.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setNumReduceTasks(1);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + 
getTestGridName(0) + "@/" + testInputFile));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + 
getTestGridName(0) + "@/output"));
+
+        job.setJarByClass(getClass());
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new 
HadoopJobId(UUID.randomUUID(), 1),
+            createJobInfo(job.getConfiguration()));
+
+        fut.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapperException() throws Exception {
+        String testInputFile = "/test";
+
+        prepareTestFile(testInputFile);
+
+        Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        Job job = Job.getInstance(cfg);
+
+        job.setMapperClass(TestFailingMapper.class);
+        job.setCombinerClass(TestReducer.class);
+        job.setReducerClass(TestReducer.class);
+
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(IntWritable.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setNumReduceTasks(1);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + 
getTestGridName(0) + "@/" + testInputFile));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + 
getTestGridName(0) + "@/output"));
+
+        job.setJarByClass(getClass());
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new 
HadoopJobId(UUID.randomUUID(), 1),
+            createJobInfo(job.getConfiguration()));
+
+        try {
+            fut.get();
+        }
+        catch (IgniteCheckedException e) {
+            IOException exp = X.cause(e, IOException.class);
+
+            assertNotNull(exp);
+            assertEquals("Test failure", exp.getMessage());
+        }
+    }
+
+    /**
+     * @param filePath File path to prepare.
+     * @throws Exception If failed.
+     */
+    private void prepareTestFile(String filePath) throws Exception {
+        IgniteFileSystem igfs = grid(0).fileSystem(igfsName);
+
+        try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) 
{
+            PrintWriter wr = new PrintWriter(new OutputStreamWriter(out));
+
+            for (int i = 0; i < 1000; i++)
+                wr.println("Hello, world: " + i);
+
+            wr.flush();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestMapper extends Mapper<Object, Text, Text, 
IntWritable> {
+        /** One constant. */
+        private IntWritable one = new IntWritable(1);
+
+        /** Line constant. */
+        private Text line = new Text("line");
+
+        @Override protected void map(Object key, Text val, Context ctx) throws 
IOException, InterruptedException {
+            ctx.write(line, one);
+        }
+    }
+
+    /**
+     * Failing mapper.
+     */
+    private static class TestFailingMapper extends Mapper<Object, Text, Text, 
IntWritable> {
+        @Override protected void map(Object key, Text val, Context c) throws 
IOException, InterruptedException {
+            throw new IOException("Test failure");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestReducer extends Reducer<Text, IntWritable, Text, 
IntWritable> {
+        /** Line constant. */
+        private Text line = new Text("line");
+
+        @Override protected void setup(Context ctx) throws IOException, 
InterruptedException {
+            super.setup(ctx);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(Text key, Iterable<IntWritable> 
values, Context ctx)
+            throws IOException, InterruptedException {
+            int s = 0;
+
+            for (IntWritable val : values)
+                s += val.get();
+
+            System.out.println(">>>> Reduced: " + s);
+
+            ctx.write(line, new IntWritable(s));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
new file mode 100644
index 0000000..851c3af
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests Hadoop external communication component.
+ */
+public class HadoopExternalCommunicationSelfTest extends 
GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-404";);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleMessageSendingTcp() throws Exception {
+        checkSimpleMessageSending(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleMessageSendingShmem() throws Exception {
+        checkSimpleMessageSending(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkSimpleMessageSending(boolean useShmem) throws Exception {
+        UUID parentNodeId = UUID.randomUUID();
+
+        Marshaller marsh = new JdkMarshaller();
+
+        IgniteLogger log = log();
+
+        HadoopExternalCommunication[] comms = new 
HadoopExternalCommunication[4];
+
+        try {
+            String name = "grid";
+
+            TestHadoopListener[] lsnrs = new TestHadoopListener[4];
+
+            int msgs = 10;
+
+            for (int i = 0; i < comms.length; i++) {
+                comms[i] = new HadoopExternalCommunication(parentNodeId, 
UUID.randomUUID(), marsh, log,
+                    Executors.newFixedThreadPool(1), name + i);
+
+                if (useShmem)
+                    comms[i].setSharedMemoryPort(14000);
+
+                lsnrs[i] = new TestHadoopListener(msgs);
+
+                comms[i].setListener(lsnrs[i]);
+
+                comms[i].start();
+            }
+
+            for (int r = 0; r < msgs; r++) {
+                for (int from = 0; from < comms.length; from++) {
+                    for (int to = 0; to < comms.length; to++) {
+                        if (from == to)
+                            continue;
+
+                        
comms[from].sendMessage(comms[to].localProcessDescriptor(), new 
TestMessage(from, to));
+                    }
+                }
+            }
+
+            U.sleep(1000);
+
+            for (TestHadoopListener lsnr : lsnrs) {
+                lsnr.await(3_000);
+
+                assertEquals(String.valueOf(lsnr.messages()), msgs * 
(comms.length - 1), lsnr.messages().size());
+            }
+        }
+        finally {
+            for (HadoopExternalCommunication comm : comms) {
+                if (comm != null)
+                    comm.stop();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestHadoopListener implements HadoopMessageListener {
+        /** Received messages (array list is safe because executor has one 
thread). */
+        private Collection<TestMessage> msgs = new ArrayList<>();
+
+        /** Await latch. */
+        private CountDownLatch receiveLatch;
+
+        /**
+         * @param msgs Number of messages to await.
+         */
+        private TestHadoopListener(int msgs) {
+            receiveLatch = new CountDownLatch(msgs);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageReceived(HadoopProcessDescriptor desc, 
HadoopMessage msg) {
+            assert msg instanceof TestMessage;
+
+            msgs.add((TestMessage)msg);
+
+            receiveLatch.countDown();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+            // No-op.
+        }
+
+        /**
+         * @return Received messages.
+         */
+        public Collection<TestMessage> messages() {
+            return msgs;
+        }
+
+        /**
+         * @param millis Time to await.
+         * @throws InterruptedException If wait interrupted.
+         */
+        public void await(int millis) throws InterruptedException {
+            receiveLatch.await(millis, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestMessage implements HadoopMessage {
+        /** From index. */
+        private int from;
+
+        /** To index. */
+        private int to;
+
+        /**
+         * @param from From index.
+         * @param to To index.
+         */
+        private TestMessage(int from, int to) {
+            this.from = from;
+            this.to = to;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public TestMessage() {
+            // No-op.
+        }
+
+        /**
+         * @return From index.
+         */
+        public int from() {
+            return from;
+        }
+
+        /**
+         * @return To index.
+         */
+        public int to() {
+            return to;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            out.writeInt(from);
+            out.writeInt(to);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            from = in.readInt();
+            to = in.readInt();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
new file mode 100644
index 0000000..b22d291
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+/**
+* State of the task.
+*/
+public enum HadoopTaskState {
+    /** Running task. */
+    RUNNING,
+
+    /** Completed task. */
+    COMPLETED,
+
+    /** Failed task. */
+    FAILED,
+
+    /** Canceled task. */
+    CANCELED,
+
+    /** Process crashed. */
+    CRASHED
+}
\ No newline at end of file

Reply via email to