http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
deleted file mode 100644
index 32880e4..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.net.URL;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopExternalCommunication;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.logger.log4j.Log4JLogger;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-
-/**
- * Hadoop external process base class.
- */
-public class HadoopExternalProcessStarter {
-    /** Path to Log4j configuration file. */
-    public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml";
-
-    /** Arguments. */
-    private Args args;
-
-    /** System out. */
-    private OutputStream out;
-
-    /** System err. */
-    private OutputStream err;
-
-    /**
-     * @param args Parsed arguments.
-     */
-    public HadoopExternalProcessStarter(Args args) {
-        this.args = args;
-    }
-
-    /**
-     * @param cmdArgs Process arguments.
-     */
-    public static void main(String[] cmdArgs) {
-        try {
-            Args args = arguments(cmdArgs);
-
-            new HadoopExternalProcessStarter(args).run();
-        }
-        catch (Exception e) {
-            System.err.println("Failed");
-
-            System.err.println(e.getMessage());
-
-            e.printStackTrace(System.err);
-        }
-    }
-
-    /**
-     *
-     * @throws Exception
-     */
-    public void run() throws Exception {
-        U.setWorkDirectory(args.workDir, U.getIgniteHome());
-
-        File outputDir = outputDirectory();
-
-        initializeStreams(outputDir);
-
-        ExecutorService msgExecSvc = Executors.newFixedThreadPool(
-            Integer.getInteger("MSG_THREAD_POOL_SIZE", 
Runtime.getRuntime().availableProcessors() * 2));
-
-        IgniteLogger log = logger(outputDir);
-
-        HadoopExternalCommunication comm = new HadoopExternalCommunication(
-            args.nodeId,
-            args.childProcId,
-            new JdkMarshaller(),
-            log,
-            msgExecSvc,
-            "external"
-        );
-
-        comm.start();
-
-        HadoopProcessDescriptor nodeDesc = new 
HadoopProcessDescriptor(args.nodeId, args.parentProcId);
-        nodeDesc.address(args.addr);
-        nodeDesc.tcpPort(args.tcpPort);
-        nodeDesc.sharedMemoryPort(args.shmemPort);
-
-        HadoopChildProcessRunner runner = new HadoopChildProcessRunner();
-
-        runner.start(comm, nodeDesc, msgExecSvc, log);
-
-        System.err.println("Started");
-        System.err.flush();
-
-        System.setOut(new PrintStream(out));
-        System.setErr(new PrintStream(err));
-    }
-
-    /**
-     * @param outputDir Directory for process output.
-     * @throws Exception
-     */
-    private void initializeStreams(File outputDir) throws Exception {
-        out = new FileOutputStream(new File(outputDir, args.childProcId + 
".out"));
-        err = new FileOutputStream(new File(outputDir, args.childProcId + 
".err"));
-    }
-
-    /**
-     * @return Path to output directory.
-     * @throws IOException If failed.
-     */
-    private File outputDirectory() throws IOException {
-        File f = new File(args.out);
-
-        if (!f.exists()) {
-            if (!f.mkdirs())
-                throw new IOException("Failed to create output directory: " + 
args.out);
-        }
-        else {
-            if (f.isFile())
-                throw new IOException("Output directory is a file: " + 
args.out);
-        }
-
-        return f;
-    }
-
-    /**
-     * @param outputDir Directory for process output.
-     * @return Logger.
-     */
-    private IgniteLogger logger(final File outputDir) {
-        final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG);
-
-        Log4JLogger logger;
-
-        try {
-            logger = url != null ? new Log4JLogger(url) : new 
Log4JLogger(true);
-        }
-        catch (IgniteCheckedException e) {
-            System.err.println("Failed to create URL-based logger. Will use 
default one.");
-
-            e.printStackTrace();
-
-            logger = new Log4JLogger(true);
-        }
-
-        logger.updateFilePath(new IgniteClosure<String, String>() {
-            @Override public String apply(String s) {
-                return new File(outputDir, args.childProcId + 
".log").getAbsolutePath();
-            }
-        });
-
-        return logger;
-    }
-
-    /**
-     * @param processArgs Process arguments.
-     * @return Child process instance.
-     */
-    private static Args arguments(String[] processArgs) throws Exception {
-        Args args = new Args();
-
-        for (int i = 0; i < processArgs.length; i++) {
-            String arg = processArgs[i];
-
-            switch (arg) {
-                case "-cpid": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing process ID for '-cpid' 
parameter");
-
-                    String procIdStr = processArgs[++i];
-
-                    args.childProcId = UUID.fromString(procIdStr);
-
-                    break;
-                }
-
-                case "-ppid": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing process ID for '-ppid' 
parameter");
-
-                    String procIdStr = processArgs[++i];
-
-                    args.parentProcId = UUID.fromString(procIdStr);
-
-                    break;
-                }
-
-                case "-nid": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing node ID for '-nid' 
parameter");
-
-                    String nodeIdStr = processArgs[++i];
-
-                    args.nodeId = UUID.fromString(nodeIdStr);
-
-                    break;
-                }
-
-                case "-addr": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing node address for '-addr' 
parameter");
-
-                    args.addr = processArgs[++i];
-
-                    break;
-                }
-
-                case "-tport": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing tcp port for '-tport' 
parameter");
-
-                    args.tcpPort = Integer.parseInt(processArgs[++i]);
-
-                    break;
-                }
-
-                case "-sport": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing shared memory port for 
'-sport' parameter");
-
-                    args.shmemPort = Integer.parseInt(processArgs[++i]);
-
-                    break;
-                }
-
-                case "-out": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing output folder name for 
'-out' parameter");
-
-                    args.out = processArgs[++i];
-
-                    break;
-                }
-
-                case "-wd": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing work folder name for 
'-wd' parameter");
-
-                    args.workDir = processArgs[++i];
-
-                    break;
-                }
-            }
-        }
-
-        return args;
-    }
-
-    /**
-     * Execution arguments.
-     */
-    private static class Args {
-        /** Process ID. */
-        private UUID childProcId;
-
-        /** Process ID. */
-        private UUID parentProcId;
-
-        /** Process ID. */
-        private UUID nodeId;
-
-        /** Node address. */
-        private String addr;
-
-        /** TCP port */
-        private int tcpPort;
-
-        /** Shmem port. */
-        private int shmemPort = -1;
-
-        /** Output folder. */
-        private String out;
-
-        /** Work directory. */
-        private String workDir;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
deleted file mode 100644
index ddf6a20..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Implements basic lifecycle for communication clients.
- */
-public abstract class HadoopAbstractCommunicationClient implements 
HadoopCommunicationClient {
-    /** Time when this client was last used. */
-    private volatile long lastUsed = U.currentTimeMillis();
-
-    /** Reservations. */
-    private final AtomicInteger reserves = new AtomicInteger();
-
-    /** {@inheritDoc} */
-    @Override public boolean close() {
-        return reserves.compareAndSet(0, -1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void forceClose() {
-        reserves.set(-1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean closed() {
-        return reserves.get() == -1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean reserve() {
-        while (true) {
-            int r = reserves.get();
-
-            if (r == -1)
-                return false;
-
-            if (reserves.compareAndSet(r, r + 1))
-                return true;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void release() {
-        while (true) {
-            int r = reserves.get();
-
-            if (r == -1)
-                return;
-
-            if (reserves.compareAndSet(r, r - 1))
-                return;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean reserved() {
-        return reserves.get() > 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getIdleTime() {
-        return U.currentTimeMillis() - lastUsed;
-    }
-
-    /**
-     * Updates used time.
-     */
-    protected void markUsed() {
-        lastUsed = U.currentTimeMillis();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopAbstractCommunicationClient.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopCommunicationClient.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopCommunicationClient.java
deleted file mode 100644
index a325a3d..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopCommunicationClient.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-
-/**
- *
- */
-public interface HadoopCommunicationClient {
-    /**
-     * @return {@code True} if client has been closed by this call,
-     *      {@code false} if failed to close client (due to concurrent 
reservation or concurrent close).
-     */
-    public boolean close();
-
-    /**
-     * Forces client close.
-     */
-    public void forceClose();
-
-    /**
-     * @return {@code True} if client is closed;
-     */
-    public boolean closed();
-
-    /**
-     * @return {@code True} if client was reserved, {@code false} otherwise.
-     */
-    public boolean reserve();
-
-    /**
-     * Releases this client by decreasing reservations.
-     */
-    public void release();
-
-    /**
-     * @return {@code True} if client was reserved.
-     */
-    public boolean reserved();
-
-    /**
-     * Gets idle time of this client.
-     *
-     * @return Idle time of this client.
-     */
-    public long getIdleTime();
-
-    /**
-     * @param desc Process descriptor.
-     * @param msg Message to send.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) 
throws IgniteCheckedException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
deleted file mode 100644
index 1d59a95..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
+++ /dev/null
@@ -1,1460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.SocketChannel;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.util.GridConcurrentFactory;
-import org.apache.ignite.internal.util.GridKeyLock;
-import org.apache.ignite.internal.util.ipc.IpcEndpoint;
-import 
org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
-import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryClientEndpoint;
-import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
-import org.apache.ignite.internal.util.nio.GridBufferedParser;
-import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
-import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
-import org.apache.ignite.internal.util.nio.GridNioFuture;
-import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
-import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.nio.GridNioServerListener;
-import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
-
-/**
- * Hadoop external communication class.
- */
-public class HadoopExternalCommunication {
-    /** IPC error message. */
-    public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate 
shared memory segment " +
-        "(switching to TCP, may be slower).";
-
-    /** Default port which node sets listener to (value is <tt>47100</tt>). */
-    public static final int DFLT_PORT = 27100;
-
-    /** Default connection timeout (value is <tt>1000</tt>ms). */
-    public static final long DFLT_CONN_TIMEOUT = 1000;
-
-    /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */
-    public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000;
-
-    /** Default reconnect attempts count (value is <tt>10</tt>). */
-    public static final int DFLT_RECONNECT_CNT = 10;
-
-    /** Default message queue limit per connection (for incoming and outgoing 
. */
-    public static final int DFLT_MSG_QUEUE_LIMIT = 
GridNioServer.DFLT_SEND_QUEUE_LIMIT;
-
-    /**
-     * Default count of selectors for TCP server equals to
-     * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
-     */
-    public static final int DFLT_SELECTORS_CNT = 1;
-
-    /** Node ID meta for session. */
-    private static final int PROCESS_META = 
GridNioSessionMetaKey.nextUniqueKey();
-
-    /** Handshake timeout meta for session. */
-    private static final int HANDSHAKE_FINISH_META = 
GridNioSessionMetaKey.nextUniqueKey();
-
-    /** Message tracker meta for session. */
-    private static final int TRACKER_META = 
GridNioSessionMetaKey.nextUniqueKey();
-
-    /**
-     * Default local port range (value is <tt>100</tt>).
-     * See {@link #setLocalPortRange(int)} for details.
-     */
-    public static final int DFLT_PORT_RANGE = 100;
-
-    /** Default value for {@code TCP_NODELAY} socket option (value is 
<tt>true</tt>). */
-    public static final boolean DFLT_TCP_NODELAY = true;
-
-    /** Server listener. */
-    private final GridNioServerListener<HadoopMessage> srvLsnr =
-        new GridNioServerListenerAdapter<HadoopMessage>() {
-            @Override public void onConnected(GridNioSession ses) {
-                HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
-
-                assert desc != null : "Received connected notification without 
finished handshake: " + ses;
-            }
-
-            /** {@inheritDoc} */
-            @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
-                if (log.isDebugEnabled())
-                    log.debug("Closed connection for session: " + ses);
-
-                if (e != null)
-                    U.error(log, "Session disconnected due to exception: " + 
ses, e);
-
-                HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
-
-                if (desc != null) {
-                    HadoopCommunicationClient rmv = 
clients.remove(desc.processId());
-
-                    if (rmv != null)
-                        rmv.forceClose();
-                }
-
-                HadoopMessageListener lsnr0 = lsnr;
-
-                if (lsnr0 != null)
-                    // Notify listener about connection close.
-                    lsnr0.onConnectionLost(desc);
-            }
-
-            /** {@inheritDoc} */
-            @Override public void onMessage(GridNioSession ses, HadoopMessage 
msg) {
-                
notifyListener(ses.<HadoopProcessDescriptor>meta(PROCESS_META), msg);
-
-                if (msgQueueLimit > 0) {
-                    GridNioMessageTracker tracker = ses.meta(TRACKER_META);
-
-                    assert tracker != null : "Missing tracker for limited 
message queue: " + ses;
-
-                    tracker.run();
-                }
-            }
-        };
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Local process descriptor. */
-    private HadoopProcessDescriptor locProcDesc;
-
-    /** Marshaller. */
-    private Marshaller marsh;
-
-    /** Message notification executor service. */
-    private ExecutorService execSvc;
-
-    /** Grid name. */
-    private String gridName;
-
-    /** Complex variable that represents this node IP address. */
-    private volatile InetAddress locHost;
-
-    /** Local port which node uses. */
-    private int locPort = DFLT_PORT;
-
-    /** Local port range. */
-    private int locPortRange = DFLT_PORT_RANGE;
-
-    /** Local port which node uses to accept shared memory connections. */
-    private int shmemPort = -1;
-
-    /** Allocate direct buffer or heap buffer. */
-    private boolean directBuf = true;
-
-    /** Connect timeout. */
-    private long connTimeout = DFLT_CONN_TIMEOUT;
-
-    /** Maximum connect timeout. */
-    private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
-
-    /** Reconnect attempts count. */
-    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private int reconCnt = DFLT_RECONNECT_CNT;
-
-    /** Socket send buffer. */
-    private int sockSndBuf;
-
-    /** Socket receive buffer. */
-    private int sockRcvBuf;
-
-    /** Message queue limit. */
-    private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
-
-    /** NIO server. */
-    private GridNioServer<HadoopMessage> nioSrvr;
-
-    /** Shared memory server. */
-    private IpcSharedMemoryServerEndpoint shmemSrv;
-
-    /** {@code TCP_NODELAY} option value for created sockets. */
-    private boolean tcpNoDelay = DFLT_TCP_NODELAY;
-
-    /** Shared memory accept worker. */
-    private ShmemAcceptWorker shmemAcceptWorker;
-
-    /** Shared memory workers. */
-    private final Collection<ShmemWorker> shmemWorkers = new 
ConcurrentLinkedDeque8<>();
-
-    /** Clients. */
-    private final ConcurrentMap<UUID, HadoopCommunicationClient> clients = 
GridConcurrentFactory.newMap();
-
-    /** Message listener. */
-    private volatile HadoopMessageListener lsnr;
-
-    /** Bound port. */
-    private int boundTcpPort = -1;
-
-    /** Bound port for shared memory server. */
-    private int boundTcpShmemPort = -1;
-
-    /** Count of selectors to use in TCP server. */
-    private int selectorsCnt = DFLT_SELECTORS_CNT;
-
-    /** Local node ID message. */
-    private ProcessHandshakeMessage locIdMsg;
-
-    /** Locks. */
-    private final GridKeyLock locks = new GridKeyLock();
-
-    /**
-     * @param parentNodeId Parent node ID.
-     * @param procId Process ID.
-     * @param marsh Marshaller to use.
-     * @param log Logger.
-     * @param execSvc Executor service for message notification.
-     * @param gridName Grid name.
-     */
-    public HadoopExternalCommunication(
-        UUID parentNodeId,
-        UUID procId,
-        Marshaller marsh,
-        IgniteLogger log,
-        ExecutorService execSvc,
-        String gridName
-    ) {
-        locProcDesc = new HadoopProcessDescriptor(parentNodeId, procId);
-
-        this.marsh = marsh;
-        this.log = log.getLogger(HadoopExternalCommunication.class);
-        this.execSvc = execSvc;
-        this.gridName = gridName;
-    }
-
-    /**
-     * Sets local port for socket binding.
-     * <p>
-     * If not provided, default value is {@link #DFLT_PORT}.
-     *
-     * @param locPort Port number.
-     */
-    public void setLocalPort(int locPort) {
-        this.locPort = locPort;
-    }
-
-    /**
-     * Gets local port for socket binding.
-     *
-     * @return Local port.
-     */
-    public int getLocalPort() {
-        return locPort;
-    }
-
-    /**
-     * Sets local port range for local host ports (value must greater than or 
equal to <tt>0</tt>).
-     * If provided local port (see {@link #setLocalPort(int)}} is occupied,
-     * implementation will try to increment the port number for as long as it 
is less than
-     * initial value plus this range.
-     * <p>
-     * If port range value is <tt>0</tt>, then implementation will try bind 
only to the port provided by
-     * {@link #setLocalPort(int)} method and fail if binding to this port did 
not succeed.
-     * <p>
-     * Local port range is very useful during development when more than one 
grid nodes need to run
-     * on the same physical machine.
-     * <p>
-     * If not provided, default value is {@link #DFLT_PORT_RANGE}.
-     *
-     * @param locPortRange New local port range.
-     */
-    public void setLocalPortRange(int locPortRange) {
-        this.locPortRange = locPortRange;
-    }
-
-    /**
-     * @return Local port range.
-     */
-    public int getLocalPortRange() {
-        return locPortRange;
-    }
-
-    /**
-     * Sets local port to accept shared memory connections.
-     * <p>
-     * If set to {@code -1} shared memory communication will be disabled.
-     * <p>
-     * If not provided, shared memory is disabled.
-     *
-     * @param shmemPort Port number.
-     */
-    public void setSharedMemoryPort(int shmemPort) {
-        this.shmemPort = shmemPort;
-    }
-
-    /**
-     * Gets shared memory port to accept incoming connections.
-     *
-     * @return Shared memory port.
-     */
-    public int getSharedMemoryPort() {
-        return shmemPort;
-    }
-
-    /**
-     * Sets connect timeout used when establishing connection
-     * with remote nodes.
-     * <p>
-     * {@code 0} is interpreted as infinite timeout.
-     * <p>
-     * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
-     *
-     * @param connTimeout Connect timeout.
-     */
-    public void setConnectTimeout(long connTimeout) {
-        this.connTimeout = connTimeout;
-    }
-
-    /**
-     * @return Connection timeout.
-     */
-    public long getConnectTimeout() {
-        return connTimeout;
-    }
-
-    /**
-     * Sets maximum connect timeout. If handshake is not established within 
connect timeout,
-     * then SPI tries to repeat handshake procedure with increased connect 
timeout.
-     * Connect timeout can grow till maximum timeout value,
-     * if maximum timeout value is reached then the handshake is considered as 
failed.
-     * <p>
-     * {@code 0} is interpreted as infinite timeout.
-     * <p>
-     * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
-     *
-     * @param maxConnTimeout Maximum connect timeout.
-     */
-    public void setMaxConnectTimeout(long maxConnTimeout) {
-        this.maxConnTimeout = maxConnTimeout;
-    }
-
-    /**
-     * Gets maximum connection timeout.
-     *
-     * @return Maximum connection timeout.
-     */
-    public long getMaxConnectTimeout() {
-        return maxConnTimeout;
-    }
-
-    /**
-     * Sets maximum number of reconnect attempts used when establishing 
connection
-     * with remote nodes.
-     * <p>
-     * If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
-     *
-     * @param reconCnt Maximum number of reconnection attempts.
-     */
-    public void setReconnectCount(int reconCnt) {
-        this.reconCnt = reconCnt;
-    }
-
-    /**
-     * @return Reconnect count.
-     */
-    public int getReconnectCount() {
-        return reconCnt;
-    }
-
-    /**
-     * Sets flag to allocate direct or heap buffer in SPI.
-     * If value is {@code true}, then SPI will use {@link 
ByteBuffer#allocateDirect(int)} call.
-     * Otherwise, SPI will use {@link ByteBuffer#allocate(int)} call.
-     * <p>
-     * If not provided, default value is {@code true}.
-     *
-     * @param directBuf Flag indicates to allocate direct or heap buffer in 
SPI.
-     */
-    public void setDirectBuffer(boolean directBuf) {
-        this.directBuf = directBuf;
-    }
-
-    /**
-     * @return Direct buffer flag.
-     */
-    public boolean isDirectBuffer() {
-        return directBuf;
-    }
-
-    /**
-     * Sets the count of selectors te be used in TCP server.
-     * <p/>
-     * If not provided, default value is {@link #DFLT_SELECTORS_CNT}.
-     *
-     * @param selectorsCnt Selectors count.
-     */
-    public void setSelectorsCount(int selectorsCnt) {
-        this.selectorsCnt = selectorsCnt;
-    }
-
-    /**
-     * @return Number of selectors to use.
-     */
-    public int getSelectorsCount() {
-        return selectorsCnt;
-    }
-
-    /**
-     * Sets value for {@code TCP_NODELAY} socket option. Each
-     * socket will be opened using provided value.
-     * <p>
-     * Setting this option to {@code true} disables Nagle's algorithm
-     * for socket decreasing latency and delivery time for small messages.
-     * <p>
-     * For systems that work under heavy network load it is advisable to
-     * set this value to {@code false}.
-     * <p>
-     * If not provided, default value is {@link #DFLT_TCP_NODELAY}.
-     *
-     * @param tcpNoDelay {@code True} to disable TCP delay.
-     */
-    public void setTcpNoDelay(boolean tcpNoDelay) {
-        this.tcpNoDelay = tcpNoDelay;
-    }
-
-    /**
-     * @return {@code TCP_NO_DELAY} flag.
-     */
-    public boolean isTcpNoDelay() {
-        return tcpNoDelay;
-    }
-
-    /**
-     * Sets receive buffer size for sockets created or accepted by this SPI.
-     * <p>
-     * If not provided, default is {@code 0} which leaves buffer unchanged 
after
-     * socket creation (OS defaults).
-     *
-     * @param sockRcvBuf Socket receive buffer size.
-     */
-    public void setSocketReceiveBuffer(int sockRcvBuf) {
-        this.sockRcvBuf = sockRcvBuf;
-    }
-
-    /**
-     * @return Socket receive buffer size.
-     */
-    public int getSocketReceiveBuffer() {
-        return sockRcvBuf;
-    }
-
-    /**
-     * Sets send buffer size for sockets created or accepted by this SPI.
-     * <p>
-     * If not provided, default is {@code 0} which leaves the buffer unchanged
-     * after socket creation (OS defaults).
-     *
-     * @param sockSndBuf Socket send buffer size.
-     */
-    public void setSocketSendBuffer(int sockSndBuf) {
-        this.sockSndBuf = sockSndBuf;
-    }
-
-    /**
-     * @return Socket send buffer size.
-     */
-    public int getSocketSendBuffer() {
-        return sockSndBuf;
-    }
-
-    /**
-     * Sets message queue limit for incoming and outgoing messages.
-     * <p>
-     * When set to positive number send queue is limited to the configured 
value.
-     * {@code 0} disables the size limitations.
-     * <p>
-     * If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}.
-     *
-     * @param msgQueueLimit Send queue size limit.
-     */
-    public void setMessageQueueLimit(int msgQueueLimit) {
-        this.msgQueueLimit = msgQueueLimit;
-    }
-
-    /**
-     * @return Message queue size limit.
-     */
-    public int getMessageQueueLimit() {
-        return msgQueueLimit;
-    }
-
-    /**
-     * Sets Hadoop communication message listener.
-     *
-     * @param lsnr Message listener.
-     */
-    public void setListener(HadoopMessageListener lsnr) {
-        this.lsnr = lsnr;
-    }
-
-    /**
-     * @return Outbound message queue size.
-     */
-    public int getOutboundMessagesQueueSize() {
-        return nioSrvr.outboundMessagesQueueSize();
-    }
-
-    /**
-     * Starts communication.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void start() throws IgniteCheckedException {
-        try {
-            locHost = U.getLocalHost();
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to initialize local 
address.", e);
-        }
-
-        try {
-            shmemSrv = resetShmemServer();
-        }
-        catch (IgniteCheckedException e) {
-            U.warn(log, "Failed to start shared memory communication server.", 
e);
-        }
-
-        try {
-            // This method potentially resets local port to the value
-            // local node was bound to.
-            nioSrvr = resetNioServer();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteCheckedException("Failed to initialize TCP server: 
" + locHost, e);
-        }
-
-        locProcDesc.address(locHost.getHostAddress());
-        locProcDesc.sharedMemoryPort(boundTcpShmemPort);
-        locProcDesc.tcpPort(boundTcpPort);
-
-        locIdMsg = new ProcessHandshakeMessage(locProcDesc);
-
-        if (shmemSrv != null) {
-            shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
-
-            new IgniteThread(shmemAcceptWorker).start();
-        }
-
-        nioSrvr.start();
-    }
-
-    /**
-     * Gets local process descriptor.
-     *
-     * @return Local process descriptor.
-     */
-    public HadoopProcessDescriptor localProcessDescriptor() {
-        return locProcDesc;
-    }
-
-    /**
-     * Gets filters used by communication.
-     *
-     * @return Filters array.
-     */
-    private GridNioFilter[] filters() {
-        return new GridNioFilter[] {
-            new GridNioAsyncNotifyFilter(gridName, execSvc, log),
-            new HandshakeAndBackpressureFilter(),
-            new HadoopMarshallerFilter(marsh),
-            new GridNioCodecFilter(new GridBufferedParser(directBuf, 
ByteOrder.nativeOrder()), log, false)
-        };
-    }
-
-    /**
-     * Recreates tpcSrvr socket instance.
-     *
-     * @return Server instance.
-     * @throws IgniteCheckedException Thrown if it's not possible to create 
server.
-     */
-    private GridNioServer<HadoopMessage> resetNioServer() throws 
IgniteCheckedException {
-        if (boundTcpPort >= 0)
-            throw new IgniteCheckedException("Tcp NIO server was already 
created on port " + boundTcpPort);
-
-        IgniteCheckedException lastEx = null;
-
-        // If configured TCP port is busy, find first available in range.
-        for (int port = locPort; port < locPort + locPortRange; port++) {
-            try {
-                GridNioServer<HadoopMessage> srvr =
-                    GridNioServer.<HadoopMessage>builder()
-                        .address(locHost)
-                        .port(port)
-                        .listener(srvLsnr)
-                        .logger(log.getLogger(GridNioServer.class))
-                        .selectorCount(selectorsCnt)
-                        .gridName(gridName)
-                        .tcpNoDelay(tcpNoDelay)
-                        .directBuffer(directBuf)
-                        .byteOrder(ByteOrder.nativeOrder())
-                        .socketSendBufferSize(sockSndBuf)
-                        .socketReceiveBufferSize(sockRcvBuf)
-                        .sendQueueLimit(msgQueueLimit)
-                        .directMode(false)
-                        .filters(filters())
-                        .build();
-
-                boundTcpPort = port;
-
-                // Ack Port the TCP server was bound to.
-                if (log.isInfoEnabled())
-                    log.info("Successfully bound to TCP port [port=" + 
boundTcpPort +
-                        ", locHost=" + locHost + ']');
-
-                return srvr;
-            }
-            catch (IgniteCheckedException e) {
-                lastEx = e;
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to bind to local port (will try next 
port within range) [port=" + port +
-                        ", locHost=" + locHost + ']');
-            }
-        }
-
-        // If free port wasn't found.
-        throw new IgniteCheckedException("Failed to bind to any port within 
range [startPort=" + locPort +
-            ", portRange=" + locPortRange + ", locHost=" + locHost + ']', 
lastEx);
-    }
-
-    /**
-     * Creates new shared memory communication server.
-     * @return Server.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws 
IgniteCheckedException {
-        if (boundTcpShmemPort >= 0)
-            throw new IgniteCheckedException("Shared memory server was already 
created on port " + boundTcpShmemPort);
-
-        if (shmemPort == -1 || U.isWindows())
-            return null;
-
-        IgniteCheckedException lastEx = null;
-
-        // If configured TCP port is busy, find first available in range.
-        for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
-            try {
-                IpcSharedMemoryServerEndpoint srv = new 
IpcSharedMemoryServerEndpoint(
-                    log.getLogger(IpcSharedMemoryServerEndpoint.class),
-                    locProcDesc.processId(), gridName);
-
-                srv.setPort(port);
-
-                srv.omitOutOfResourcesWarning(true);
-
-                srv.start();
-
-                boundTcpShmemPort = port;
-
-                // Ack Port the TCP server was bound to.
-                if (log.isInfoEnabled())
-                    log.info("Successfully bound shared memory communication 
to TCP port [port=" + boundTcpShmemPort +
-                        ", locHost=" + locHost + ']');
-
-                return srv;
-            }
-            catch (IgniteCheckedException e) {
-                lastEx = e;
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to bind to local port (will try next 
port within range) [port=" + port +
-                        ", locHost=" + locHost + ']');
-            }
-        }
-
-        // If free port wasn't found.
-        throw new IgniteCheckedException("Failed to bind shared memory 
communication to any port within range [startPort=" +
-            locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + 
']', lastEx);
-    }
-
-    /**
-     * Stops the server.
-     *
-     * @throws IgniteCheckedException
-     */
-    public void stop() throws IgniteCheckedException {
-        // Stop TCP server.
-        if (nioSrvr != null)
-            nioSrvr.stop();
-
-        U.cancel(shmemAcceptWorker);
-        U.join(shmemAcceptWorker, log);
-
-        U.cancel(shmemWorkers);
-        U.join(shmemWorkers, log);
-
-        shmemWorkers.clear();
-
-        // Force closing on stop (safety).
-        for (HadoopCommunicationClient client : clients.values())
-            client.forceClose();
-
-        // Clear resources.
-        nioSrvr = null;
-
-        boundTcpPort = -1;
-    }
-
-    /**
-     * Sends message to Hadoop process.
-     *
-     * @param desc
-     * @param msg
-     * @throws IgniteCheckedException
-     */
-    public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) 
throws
-        IgniteCheckedException {
-        assert desc != null;
-        assert msg != null;
-
-        if (log.isTraceEnabled())
-            log.trace("Sending message to Hadoop process [desc=" + desc + ", 
msg=" + msg + ']');
-
-        HadoopCommunicationClient client = null;
-
-        boolean closeOnRelease = true;
-
-        try {
-            client = reserveClient(desc);
-
-            client.sendMessage(desc, msg);
-
-            closeOnRelease = false;
-        }
-        finally {
-            if (client != null) {
-                if (closeOnRelease) {
-                    client.forceClose();
-
-                    clients.remove(desc.processId(), client);
-                }
-                else
-                    client.release();
-            }
-        }
-    }
-
-    /**
-     * Returns existing or just created client to node.
-     *
-     * @param desc Node to which client should be open.
-     * @return The existing or just created client.
-     * @throws IgniteCheckedException Thrown if any exception occurs.
-     */
-    private HadoopCommunicationClient reserveClient(HadoopProcessDescriptor 
desc) throws IgniteCheckedException {
-        assert desc != null;
-
-        UUID procId = desc.processId();
-
-        while (true) {
-            HadoopCommunicationClient client = clients.get(procId);
-
-            if (client == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Did not find client for remote process 
[locProcDesc=" + locProcDesc + ", desc=" +
-                        desc + ']');
-
-                // Do not allow concurrent connects.
-                Object sync = locks.lock(procId);
-
-                try {
-                    client = clients.get(procId);
-
-                    if (client == null) {
-                        HadoopCommunicationClient old = clients.put(procId, 
client = createNioClient(desc));
-
-                        assert old == null;
-                    }
-                }
-                finally {
-                    locks.unlock(procId, sync);
-                }
-
-                assert client != null;
-            }
-
-            if (client.reserve())
-                return client;
-            else
-                // Client has just been closed by idle worker. Help it and try 
again.
-                clients.remove(procId, client);
-        }
-    }
-
-    /**
-     * @param desc Process descriptor.
-     * @return Client.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable protected HadoopCommunicationClient 
createNioClient(HadoopProcessDescriptor desc)
-        throws  IgniteCheckedException {
-        assert desc != null;
-
-        int shmemPort = desc.sharedMemoryPort();
-
-        // If remote node has shared memory server enabled and has the same 
set of MACs
-        // then we are likely to run on the same host and shared memory 
communication could be tried.
-        if (shmemPort != -1 && 
locProcDesc.parentNodeId().equals(desc.parentNodeId())) {
-            try {
-                return createShmemClient(desc, shmemPort);
-            }
-            catch (IgniteCheckedException e) {
-                if (e.hasCause(IpcOutOfSystemResourcesException.class))
-                    // Has cause or is itself the 
IpcOutOfSystemResourcesException.
-                    LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG);
-                else if (log.isDebugEnabled())
-                    log.debug("Failed to establish shared memory connection 
with local hadoop process: " +
-                        desc);
-            }
-        }
-
-        return createTcpClient(desc);
-    }
-
-    /**
-     * @param desc Process descriptor.
-     * @param port Port.
-     * @return Client.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable protected HadoopCommunicationClient 
createShmemClient(HadoopProcessDescriptor desc, int port)
-        throws IgniteCheckedException {
-        int attempt = 1;
-
-        int connectAttempts = 1;
-
-        long connTimeout0 = connTimeout;
-
-        while (true) {
-            IpcEndpoint clientEndpoint;
-
-            try {
-                clientEndpoint = new IpcSharedMemoryClientEndpoint(port, 
(int)connTimeout, log);
-            }
-            catch (IgniteCheckedException e) {
-                // Reconnect for the second time, if connection is not 
established.
-                if (connectAttempts < 2 && X.hasCause(e, 
ConnectException.class)) {
-                    connectAttempts++;
-
-                    continue;
-                }
-
-                throw e;
-            }
-
-            HadoopCommunicationClient client = null;
-
-            try {
-                ShmemWorker worker = new ShmemWorker(clientEndpoint, false);
-
-                shmemWorkers.add(worker);
-
-                GridNioSession ses = worker.session();
-
-                HandshakeFinish fin = new HandshakeFinish();
-
-                // We are in lock, it is safe to get session and attach
-                ses.addMeta(HANDSHAKE_FINISH_META, fin);
-
-                client = new HadoopTcpNioCommunicationClient(ses);
-
-                new IgniteThread(worker).start();
-
-                fin.await(connTimeout0);
-            }
-            catch (HadoopHandshakeTimeoutException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Handshake timed out (will retry with increased 
timeout) [timeout=" + connTimeout0 +
-                        ", err=" + e.getMessage() + ", client=" + client + 
']');
-
-                if (client != null)
-                    client.forceClose();
-
-                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
-                    if (log.isDebugEnabled())
-                        log.debug("Handshake timedout (will stop attempts to 
perform the handshake) " +
-                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + 
maxConnTimeout +
-                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
-                            ", err=" + e.getMessage() + ", client=" + client + 
']');
-
-                    throw e;
-                }
-                else {
-                    attempt++;
-
-                    connTimeout0 *= 2;
-
-                    continue;
-                }
-            }
-            catch (RuntimeException | Error e) {
-                if (log.isDebugEnabled())
-                    log.debug(
-                        "Caught exception (will close client) [err=" + 
e.getMessage() + ", client=" + client + ']');
-
-                if (client != null)
-                    client.forceClose();
-
-                throw e;
-            }
-
-            return client;
-        }
-    }
-
-    /**
-     * Establish TCP connection to remote hadoop process and returns client.
-     *
-     * @param desc Process descriptor.
-     * @return Client.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected HadoopCommunicationClient 
createTcpClient(HadoopProcessDescriptor desc) throws IgniteCheckedException {
-        String addr = desc.address();
-
-        int port = desc.tcpPort();
-
-        if (log.isDebugEnabled())
-            log.debug("Trying to connect to remote process [locProcDesc=" + 
locProcDesc + ", desc=" + desc + ']');
-
-        boolean conn = false;
-        HadoopTcpNioCommunicationClient client = null;
-        IgniteCheckedException errs = null;
-
-        int connectAttempts = 1;
-
-        long connTimeout0 = connTimeout;
-
-        int attempt = 1;
-
-        while (!conn) { // Reconnection on handshake timeout.
-            try {
-                SocketChannel ch = SocketChannel.open();
-
-                ch.configureBlocking(true);
-
-                ch.socket().setTcpNoDelay(tcpNoDelay);
-                ch.socket().setKeepAlive(true);
-
-                if (sockRcvBuf > 0)
-                    ch.socket().setReceiveBufferSize(sockRcvBuf);
-
-                if (sockSndBuf > 0)
-                    ch.socket().setSendBufferSize(sockSndBuf);
-
-                ch.socket().connect(new InetSocketAddress(addr, port), 
(int)connTimeout);
-
-                HandshakeFinish fin = new HandshakeFinish();
-
-                GridNioSession ses = nioSrvr.createSession(ch, 
F.asMap(HANDSHAKE_FINISH_META, fin)).get();
-
-                client = new HadoopTcpNioCommunicationClient(ses);
-
-                if (log.isDebugEnabled())
-                    log.debug("Waiting for handshake finish for client: " + 
client);
-
-                fin.await(connTimeout0);
-
-                conn = true;
-            }
-            catch (HadoopHandshakeTimeoutException e) {
-                if (client != null) {
-                    client.forceClose();
-
-                    client = null;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug(
-                        "Handshake timedout (will retry with increased 
timeout) [timeout=" + connTimeout0 +
-                            ", desc=" + desc + ", port=" + port + ", err=" + e 
+ ']');
-
-                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
-                    if (log.isDebugEnabled())
-                        log.debug("Handshake timed out (will stop attempts to 
perform the handshake) " +
-                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + 
maxConnTimeout +
-                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
-                            ", err=" + e.getMessage() + ", addr=" + addr + 
']');
-
-                    if (errs == null)
-                        errs = new IgniteCheckedException("Failed to connect 
to remote Hadoop process " +
-                            "(is process still running?) [desc=" + desc + ", 
addrs=" + addr + ']');
-
-                    errs.addSuppressed(e);
-
-                    break;
-                }
-                else {
-                    attempt++;
-
-                    connTimeout0 *= 2;
-
-                    // Continue loop.
-                }
-            }
-            catch (Exception e) {
-                if (client != null) {
-                    client.forceClose();
-
-                    client = null;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Client creation failed [addr=" + addr + ", 
port=" + port +
-                        ", err=" + e + ']');
-
-                if (X.hasCause(e, SocketTimeoutException.class))
-                    LT.warn(log, null, "Connect timed out (consider increasing 
'connTimeout' " +
-                        "configuration property) [addr=" + addr + ", port=" + 
port + ']');
-
-                if (errs == null)
-                    errs = new IgniteCheckedException("Failed to connect to 
remote Hadoop process (is process still running?) " +
-                        "[desc=" + desc + ", addrs=" + addr + ']');
-
-                errs.addSuppressed(e);
-
-                // Reconnect for the second time, if connection is not 
established.
-                if (connectAttempts < 2 &&
-                    (e instanceof ConnectException || X.hasCause(e, 
ConnectException.class))) {
-                    connectAttempts++;
-
-                    continue;
-                }
-
-                break;
-            }
-        }
-
-        if (client == null) {
-            assert errs != null;
-
-            if (X.hasCause(errs, ConnectException.class))
-                LT.warn(log, null, "Failed to connect to a remote Hadoop 
process (is process still running?). " +
-                    "Make sure operating system firewall is disabled on local 
and remote host) " +
-                    "[addrs=" + addr + ", port=" + port + ']');
-
-            throw errs;
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Created client: " + client);
-
-        return client;
-    }
-
-    /**
-     * @param desc Sender process descriptor.
-     * @param msg Communication message.
-     */
-    protected void notifyListener(HadoopProcessDescriptor desc, HadoopMessage 
msg) {
-        HadoopMessageListener lsnr = this.lsnr;
-
-        if (lsnr != null)
-            // Notify listener of a new message.
-            lsnr.onMessageReceived(desc, msg);
-        else if (log.isDebugEnabled())
-            log.debug("Received communication message without any registered 
listeners (will ignore) " +
-                "[senderProcDesc=" + desc + ", msg=" + msg + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopExternalCommunication.class, this);
-    }
-
-    /**
-     * This worker takes responsibility to shut the server down when stopping,
-     * No other thread shall stop passed server.
-     */
-    private class ShmemAcceptWorker extends GridWorker {
-        /** */
-        private final IpcSharedMemoryServerEndpoint srv;
-
-        /**
-         * @param srv Server.
-         */
-        ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
-            super(gridName, "shmem-communication-acceptor", 
HadoopExternalCommunication.this.log);
-
-            this.srv = srv;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            try {
-                while (!Thread.interrupted()) {
-                    ShmemWorker e = new ShmemWorker(srv.accept(), true);
-
-                    shmemWorkers.add(e);
-
-                    new IgniteThread(e).start();
-                }
-            }
-            catch (IgniteCheckedException e) {
-                if (!isCancelled())
-                    U.error(log, "Shmem server failed.", e);
-            }
-            finally {
-                srv.close();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            super.cancel();
-
-            srv.close();
-        }
-    }
-
-    /**
-     *
-     */
-    private class ShmemWorker extends GridWorker {
-        /** */
-        private final IpcEndpoint endpoint;
-
-        /** Adapter. */
-        private HadoopIpcToNioAdapter<HadoopMessage> adapter;
-
-        /**
-         * @param endpoint Endpoint.
-         */
-        private ShmemWorker(IpcEndpoint endpoint, boolean accepted) {
-            super(gridName, "shmem-worker", 
HadoopExternalCommunication.this.log);
-
-            this.endpoint = endpoint;
-
-            adapter = new HadoopIpcToNioAdapter<>(
-                HadoopExternalCommunication.this.log,
-                endpoint,
-                accepted,
-                srvLsnr,
-                filters());
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            try {
-                adapter.serve();
-            }
-            finally {
-                shmemWorkers.remove(this);
-
-                endpoint.close();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            super.cancel();
-
-            endpoint.close();
-        }
-
-        /** @{@inheritDoc} */
-        @Override protected void cleanup() {
-            super.cleanup();
-
-            endpoint.close();
-        }
-
-        /** @{@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ShmemWorker.class, this);
-        }
-
-        /**
-         * @return NIO session for this worker.
-         */
-        public GridNioSession session() {
-            return adapter.session();
-        }
-    }
-
-    /**
-     *
-     */
-    private static class HandshakeFinish {
-        /** Await latch. */
-        private CountDownLatch latch = new CountDownLatch(1);
-
-        /**
-         * Finishes handshake.
-         */
-        public void finish() {
-            latch.countDown();
-        }
-
-        /**
-         * @param time Time to wait.
-         * @throws HadoopHandshakeTimeoutException If failed to wait.
-         */
-        public void await(long time) throws HadoopHandshakeTimeoutException {
-            try {
-                if (!latch.await(time, TimeUnit.MILLISECONDS))
-                    throw new HadoopHandshakeTimeoutException("Failed to wait 
for handshake to finish [timeout=" +
-                        time + ']');
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new HadoopHandshakeTimeoutException("Failed to wait for 
handshake to finish (thread was " +
-                    "interrupted) [timeout=" + time + ']', e);
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    private class HandshakeAndBackpressureFilter extends GridNioFilterAdapter {
-        /**
-         * Assigns filter name to a filter.
-         */
-        protected HandshakeAndBackpressureFilter() {
-            super("HadoopHandshakeFilter");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionOpened(final GridNioSession ses) throws 
IgniteCheckedException {
-            if (ses.accepted()) {
-                if (log.isDebugEnabled())
-                    log.debug("Accepted connection, initiating handshake: " + 
ses);
-
-                // Server initiates handshake.
-                ses.send(locIdMsg).listen(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> fut) {
-                        try {
-                            // Make sure there were no errors.
-                            fut.get();
-                        }
-                        catch (IgniteCheckedException e) {
-                            log.warning("Failed to send handshake message, 
will close session: " + ses, e);
-
-                            ses.close();
-                        }
-                    }
-                });
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionClosed(GridNioSession ses) throws 
IgniteCheckedException {
-            proceedSessionClosed(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onExceptionCaught(GridNioSession ses, 
IgniteCheckedException ex) throws IgniteCheckedException {
-            proceedExceptionCaught(ses, ex);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, 
Object msg) throws IgniteCheckedException {
-            if (ses.meta(PROCESS_META) == null && !(msg instanceof 
ProcessHandshakeMessage))
-                log.warning("Writing message before handshake has finished 
[ses=" + ses + ", msg=" + msg + ']');
-
-            return proceedSessionWrite(ses, msg);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMessageReceived(GridNioSession ses, Object 
msg) throws IgniteCheckedException {
-            HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
-
-            UUID rmtProcId = desc == null ? null : desc.processId();
-
-            if (rmtProcId == null) {
-                if (!(msg instanceof ProcessHandshakeMessage)) {
-                    log.warning("Invalid handshake message received, will 
close connection [ses=" + ses +
-                        ", msg=" + msg + ']');
-
-                    ses.close();
-
-                    return;
-                }
-
-                ProcessHandshakeMessage nId = (ProcessHandshakeMessage)msg;
-
-                if (log.isDebugEnabled())
-                    log.debug("Received handshake message [ses=" + ses + ", 
msg=" + msg + ']');
-
-                ses.addMeta(PROCESS_META, nId.processDescriptor());
-
-                if (!ses.accepted())
-                    // Send handshake reply.
-                    ses.send(locIdMsg);
-                else {
-                    //
-                    rmtProcId = nId.processDescriptor().processId();
-
-                    if (log.isDebugEnabled())
-                        log.debug("Finished handshake with remote client: " + 
ses);
-
-                    Object sync = locks.tryLock(rmtProcId);
-
-                    if (sync != null) {
-                        try {
-                            if (clients.get(rmtProcId) == null) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Will reuse session for 
descriptor: " + rmtProcId);
-
-                                // Handshake finished flag is true.
-                                clients.put(rmtProcId, new 
HadoopTcpNioCommunicationClient(ses));
-                            }
-                            else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Will not reuse client as 
another already exists [locProcDesc=" +
-                                        locProcDesc + ", desc=" + desc + ']');
-                            }
-                        }
-                        finally {
-                            locks.unlock(rmtProcId, sync);
-                        }
-                    }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Concurrent connection is being 
established, will not reuse client session [" +
-                                "locProcDesc=" + locProcDesc + ", desc=" + 
desc + ']');
-                    }
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Handshake is finished for session [ses=" + ses 
+ ", locProcDesc=" + locProcDesc + ']');
-
-                HandshakeFinish to = ses.meta(HANDSHAKE_FINISH_META);
-
-                if (to != null)
-                    to.finish();
-
-                // Notify session opened (both parties).
-                proceedSessionOpened(ses);
-            }
-            else {
-                if (msgQueueLimit > 0) {
-                    GridNioMessageTracker tracker = ses.meta(TRACKER_META);
-
-                    if (tracker == null) {
-                        GridNioMessageTracker old = ses.addMeta(TRACKER_META, 
tracker =
-                            new GridNioMessageTracker(ses, msgQueueLimit));
-
-                        assert old == null;
-                    }
-
-                    tracker.onMessageReceived();
-                }
-
-                proceedMessageReceived(ses, msg);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession 
ses) throws IgniteCheckedException {
-            return proceedSessionClose(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionIdleTimeout(GridNioSession ses) throws 
IgniteCheckedException {
-            proceedSessionIdleTimeout(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionWriteTimeout(GridNioSession ses) throws 
IgniteCheckedException {
-            proceedSessionWriteTimeout(ses);
-        }
-    }
-
-    /**
-     * Process ID message.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public static class ProcessHandshakeMessage implements HadoopMessage {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Node ID. */
-        private HadoopProcessDescriptor procDesc;
-
-        /** */
-        public ProcessHandshakeMessage() {
-            // No-op.
-        }
-
-        /**
-         * @param procDesc Process descriptor.
-         */
-        private ProcessHandshakeMessage(HadoopProcessDescriptor procDesc) {
-            this.procDesc = procDesc;
-        }
-
-        /**
-         * @return Process ID.
-         */
-        public HadoopProcessDescriptor processDescriptor() {
-            return procDesc;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeObject(procDesc);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            procDesc = (HadoopProcessDescriptor)in.readObject();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ProcessHandshakeMessage.class, this);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
deleted file mode 100644
index b2a85e1..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/** Internal exception class for proper timeout handling. */
-class HadoopHandshakeTimeoutException extends IgniteCheckedException {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * @param msg Message.
-     */
-    HadoopHandshakeTimeoutException(String msg) {
-        super(msg);
-    }
-
-    /**
-     * @param msg Message.
-     * @param cause Cause.
-     */
-    HadoopHandshakeTimeoutException(String msg, @Nullable Throwable cause) {
-        super(msg, cause);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
deleted file mode 100644
index a8de999..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.ipc.IpcEndpoint;
-import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
-import org.apache.ignite.internal.util.nio.GridNioFilterChain;
-import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
-import org.apache.ignite.internal.util.nio.GridNioFuture;
-import org.apache.ignite.internal.util.nio.GridNioServerListener;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.nio.GridNioSessionImpl;
-
-/**
- * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically 
shared memory IPC)
- * communications.
- *
- * Note that this class consumes an entire thread inside {@link #serve()} 
method
- * in order to serve one {@link 
org.apache.ignite.internal.util.ipc.IpcEndpoint}.
- */
-public class HadoopIpcToNioAdapter<T> {
-    /** */
-    private final IpcEndpoint endp;
-
-    /** */
-    private final GridNioFilterChain<T> chain;
-
-    /** */
-    private final GridNioSessionImpl ses;
-
-    /** */
-    private final AtomicReference<CountDownLatch> latchRef = new 
AtomicReference<>();
-
-    /** */
-    private final ByteBuffer writeBuf;
-
-    /**
-     * @param log Log.
-     * @param endp Endpoint.
-     * @param lsnr Listener.
-     * @param filters Filters.
-     */
-    public HadoopIpcToNioAdapter(IgniteLogger log, IpcEndpoint endp, boolean 
accepted,
-        GridNioServerListener<T> lsnr, GridNioFilter... filters) {
-        this.endp = endp;
-
-        chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
-        ses = new GridNioSessionImpl(chain, null, null, accepted);
-
-        writeBuf = ByteBuffer.allocate(8 << 10);
-
-        writeBuf.order(ByteOrder.nativeOrder());
-    }
-
-    /**
-     * Serves given set of listeners repeatedly reading data from the endpoint.
-     *
-     * @throws InterruptedException If interrupted.
-     */
-    public void serve() throws InterruptedException {
-        try {
-            chain.onSessionOpened(ses);
-
-            InputStream in = endp.inputStream();
-
-            ByteBuffer readBuf = ByteBuffer.allocate(8 << 10);
-
-            readBuf.order(ByteOrder.nativeOrder());
-
-            assert readBuf.hasArray();
-
-            while (!Thread.interrupted()) {
-                int pos = readBuf.position();
-
-                int read = in.read(readBuf.array(), pos, readBuf.remaining());
-
-                if (read > 0) {
-                    readBuf.position(0);
-                    readBuf.limit(pos + read);
-
-                    chain.onMessageReceived(ses, readBuf);
-
-                    if (readBuf.hasRemaining())
-                        readBuf.compact();
-                    else
-                        readBuf.clear();
-
-                    CountDownLatch latch = latchRef.get();
-
-                    if (latch != null)
-                        latch.await();
-                }
-                else if (read < 0) {
-                    endp.close();
-
-                    break; // And close below.
-                }
-            }
-
-            // Assuming remote end closed connection - pushing event from head 
to tail.
-            chain.onSessionClosed(ses);
-        }
-        catch (Exception e) {
-            chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to 
read from IPC endpoint.", e));
-        }
-    }
-
-    /**
-     * Gets dummy session for this adapter.
-     *
-     * @return Session.
-     */
-    public GridNioSession session() {
-        return ses;
-    }
-
-    /**
-     * Handles write events on chain.
-     *
-     * @param msg Buffer to send.
-     * @return Send result.
-     */
-    private GridNioFuture<?> send(ByteBuffer msg) {
-        assert writeBuf.hasArray();
-
-        try {
-            while (msg.hasRemaining()) {
-                writeBuf.clear();
-
-                writeBuf.put(msg);
-
-                endp.outputStream().write(writeBuf.array(), 0, 
writeBuf.position());
-            }
-        }
-        catch (IOException | IgniteCheckedException e) {
-            return new GridNioFinishedFuture<Object>(e);
-        }
-
-        return new GridNioFinishedFuture<>((Object)null);
-    }
-
-    /**
-     * Filter forwarding messages from chain's head to this server.
-     */
-    private class HeadFilter extends GridNioFilterAdapter {
-        /**
-         * Assigns filter name.
-         */
-        protected HeadFilter() {
-            super("HeadFilter");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionOpened(GridNioSession ses) throws 
IgniteCheckedException {
-            proceedSessionOpened(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionClosed(GridNioSession ses) throws 
IgniteCheckedException {
-            proceedSessionClosed(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onExceptionCaught(GridNioSession ses, 
IgniteCheckedException ex) throws IgniteCheckedException {
-            proceedExceptionCaught(ses, ex);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, 
Object msg) {
-            assert ses == HadoopIpcToNioAdapter.this.ses : "ses=" + ses +
-                ", this.ses=" + HadoopIpcToNioAdapter.this.ses;
-
-            return send((ByteBuffer)msg);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMessageReceived(GridNioSession ses, Object 
msg) throws IgniteCheckedException {
-            proceedMessageReceived(ses, msg);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onPauseReads(GridNioSession ses) 
throws IgniteCheckedException {
-            // This call should be synced externally to avoid races.
-            boolean b = latchRef.compareAndSet(null, new CountDownLatch(1));
-
-            assert b;
-
-            return new GridNioFinishedFuture<>(b);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onResumeReads(GridNioSession ses) 
throws IgniteCheckedException {
-            // This call should be synced externally to avoid races.
-            CountDownLatch latch = latchRef.getAndSet(null);
-
-            if (latch != null)
-                latch.countDown();
-
-            return new GridNioFinishedFuture<Object>(latch != null);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession 
ses) {
-            assert ses == HadoopIpcToNioAdapter.this.ses;
-
-            boolean closed = HadoopIpcToNioAdapter.this.ses.setClosed();
-
-            if (closed)
-                endp.close();
-
-            return new GridNioFinishedFuture<>(closed);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionIdleTimeout(GridNioSession ses) throws 
IgniteCheckedException {
-            proceedSessionIdleTimeout(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionWriteTimeout(GridNioSession ses) throws 
IgniteCheckedException {
-            proceedSessionWriteTimeout(ses);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
deleted file mode 100644
index 3f79469..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
-import org.apache.ignite.internal.util.nio.GridNioFuture;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.marshaller.Marshaller;
-
-/**
- * Serialization filter.
- */
-public class HadoopMarshallerFilter extends GridNioFilterAdapter {
-    /** Marshaller. */
-    private Marshaller marshaller;
-
-    /**
-     * @param marshaller Marshaller to use.
-     */
-    public HadoopMarshallerFilter(Marshaller marshaller) {
-        super("HadoopMarshallerFilter");
-
-        this.marshaller = marshaller;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onSessionOpened(GridNioSession ses) throws 
IgniteCheckedException {
-        proceedSessionOpened(ses);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onSessionClosed(GridNioSession ses) throws 
IgniteCheckedException {
-        proceedSessionClosed(ses);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onExceptionCaught(GridNioSession ses, 
IgniteCheckedException ex) throws IgniteCheckedException {
-        proceedExceptionCaught(ses, ex);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, 
Object msg) throws IgniteCheckedException {
-        assert msg instanceof HadoopMessage : "Invalid message type: " + msg;
-
-        return proceedSessionWrite(ses, marshaller.marshal(msg));
-    }
-
-    @Override public void onMessageReceived(GridNioSession ses, Object msg) 
throws IgniteCheckedException {
-        assert msg instanceof byte[];
-
-        // Always unmarshal with system classloader.
-        proceedMessageReceived(ses, marshaller.unmarshal((byte[])msg, null));
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) 
throws IgniteCheckedException {
-        return proceedSessionClose(ses);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onSessionIdleTimeout(GridNioSession ses) throws 
IgniteCheckedException {
-        proceedSessionIdleTimeout(ses);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onSessionWriteTimeout(GridNioSession ses) throws 
IgniteCheckedException {
-        proceedSessionWriteTimeout(ses);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
deleted file mode 100644
index 6d50f43..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-
-/**
- * Hadoop communication message listener.
- */
-public interface HadoopMessageListener {
-    /**
-     * @param desc Process descriptor.
-     * @param msg Hadoop message.
-     */
-    public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage 
msg);
-
-    /**
-     * Called when connection to remote process was lost.
-     *
-     * @param desc Process descriptor.
-     */
-    public void onConnectionLost(HadoopProcessDescriptor desc);
-}
\ No newline at end of file

Reply via email to