This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a1abd2ca936 IGNITE-26111 TcpDiscoverySpi uses MessageSerializer 
(#12243)
a1abd2ca936 is described below

commit a1abd2ca9360f1857008c90e6f4746b485dc258d
Author: Maksim Timonin <[email protected]>
AuthorDate: Thu Nov 13 16:26:15 2025 +0300

    IGNITE-26111 TcpDiscoverySpi uses MessageSerializer (#12243)
---
 .../discovery/DiscoveryMessageFactory.java         |  31 +++
 .../ignite/spi/discovery/tcp/ClientImpl.java       |  52 ++--
 .../ignite/spi/discovery/tcp/ServerImpl.java       |  71 +++---
 .../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java |  11 +
 .../spi/discovery/tcp/TcpDiscoveryIoSession.java   | 275 +++++++++++++++++++++
 .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java  |  63 +++--
 .../tcp/messages/TcpDiscoveryAbstractMessage.java  |  11 +
 .../messages/TcpDiscoveryCheckFailedMessage.java   |  23 +-
 .../ignite/internal/IgniteClientRejoinTest.java    |   8 +-
 .../IgniteDiscoveryMassiveNodeFailTest.java        |   8 +-
 .../CacheClientsConcurrentStartTest.java           |  10 +-
 .../dht/IgniteCacheTopologySplitAbstractTest.java  |   9 +-
 .../IncrementalSnapshotJoiningClientTest.java      |  10 +-
 .../IgniteTcpCommunicationConnectOnInitTest.java   |   9 +-
 .../TcpCommunicationSpiSkipMessageSendTest.java    |  13 +-
 .../discovery/LongClientConnectToClusterTest.java  |   7 +-
 .../spi/discovery/tcp/BlockTcpDiscoverySpi.java    |   6 +-
 .../tcp/DiscoveryUnmarshalVulnerabilityTest.java   |   1 +
 .../spi/discovery/tcp/IgniteClientConnectTest.java |  11 +-
 ...cpClientDiscoverySpiFailureTimeoutSelfTest.java |  16 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java         |  10 +-
 .../tcp/TcpDiscoveryCoordinatorFailureTest.java    |   8 +-
 .../discovery/tcp/TcpDiscoveryFailedJoinTest.java  |   7 +-
 .../tcp/TcpDiscoveryNetworkIssuesTest.java         |  15 +-
 .../tcp/TcpDiscoveryNodeJoinAndFailureTest.java    |   4 +-
 .../TcpDiscoveryPendingMessageDeliveryTest.java    |   5 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java    |  59 ++---
 .../tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java |   7 +-
 .../tcp/TcpDiscoverySpiReconnectDelayTest.java     |   7 +-
 .../tcp/TcpDiscoverySslSecuredUnsecuredTest.java   |   7 +-
 .../spi/discovery/tcp/TestTcpDiscoverySpi.java     |   6 +-
 31 files changed, 537 insertions(+), 243 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
new file mode 100644
index 00000000000..c30286a909a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import 
org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
+
+/** Message factory for discovery messages. */
+public class DiscoveryMessageFactory implements MessageFactoryProvider {
+    /** {@inheritDoc} */
+    @Override public void registerAll(MessageFactory factory) {
+        factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new 
TcpDiscoveryCheckFailedMessageSerializer());
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 643d4d3fb25..caf8a530ffa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -17,11 +17,8 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InterruptedIOException;
-import java.io.OutputStream;
 import java.io.StreamCorruptedException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -719,13 +716,13 @@ class ClientImpl extends TcpDiscoveryImpl {
             boolean openSock = false;
 
             Socket sock = null;
-            OutputStream out;
 
             try {
                 long tsNanos = System.nanoTime();
 
                 sock = spi.openSocket(addr, timeoutHelper);
-                out = spi.socketStream(sock);
+
+                TcpDiscoveryIoSession ses = createSession(sock);
 
                 openSock = true;
 
@@ -733,9 +730,9 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 req.client(true);
 
-                spi.writeToSocket(sock, out, req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeMessage(ses, req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, 
null, ackTimeout0);
+                TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, 
ackTimeout0);
 
                 UUID rmtNodeId = res.creatorNodeId();
 
@@ -788,7 +785,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 if (msg instanceof TraceableMessage)
                     tracing.messages().beforeSend((TraceableMessage)msg);
 
-                spi.writeToSocket(sock, out, msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeMessage(ses, msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos));
 
@@ -1179,7 +1176,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     + ":" + sockStream.sock.getPort());
 
                 try {
-                    InputStream in = sockStream.stream();
+                    TcpDiscoveryIoSession ses = createSession(sock);
 
                     assert sock.getKeepAlive() && sock.getTcpNoDelay() : 
"Socket wasn't configured properly:" +
                         " KeepAlive " + sock.getKeepAlive() +
@@ -1189,7 +1186,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         TcpDiscoveryAbstractMessage msg;
 
                         try {
-                            msg = U.unmarshal(spi.marshaller(), in, 
U.resolveClassLoader(spi.ignite().configuration()));
+                            msg = spi.readMessage(ses, sock.getSoTimeout());
                         }
                         catch (IgniteCheckedException e) {
                             if (log.isDebugEnabled())
@@ -1266,6 +1263,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private Socket sock;
 
+        /** */
+        private TcpDiscoveryIoSession ses;
+
         /** */
         private boolean clientAck;
 
@@ -1333,6 +1333,8 @@ class ClientImpl extends TcpDiscoveryImpl {
             synchronized (mux) {
                 this.sock = sock;
 
+                ses = createSession(sock);
+
                 this.clientAck = clientAck;
 
                 unackedMsg = null;
@@ -1387,11 +1389,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         msg.client(true);
 
                         try {
-                            spi.writeToSocket(
-                                sock,
-                                spi.socketStream(sock),
-                                msg,
-                                sockTimeout);
+                            spi.writeMessage(ses, msg, sockTimeout);
                         }
                         catch (IOException | IgniteCheckedException e) {
                             if (log.isDebugEnabled()) {
@@ -1434,11 +1432,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
 
-                    spi.writeToSocket(
-                        sock,
-                        spi.socketStream(sock),
-                        msg,
-                        sockTimeout);
+                    spi.writeMessage(ses, msg, sockTimeout);
 
                     IgniteUuid latencyCheckId = msg instanceof 
TcpDiscoveryRingLatencyCheckMessage ?
                         msg.id() : null;
@@ -1601,6 +1595,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     clientAck = joinRes.get2();
 
                     Socket sock = sockStream.socket();
+                    TcpDiscoveryIoSession ses = createSession(sock);
 
                     if (isInterrupted())
                         throw new InterruptedException();
@@ -1612,8 +1607,6 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         sock.setSoTimeout((int)spi.netTimeout);
 
-                        InputStream in = sockStream.stream();
-
                         assert sock.getKeepAlive() && sock.getTcpNoDelay() : 
"Socket wasn't configured properly:" +
                             " KeepAlive " + sock.getKeepAlive() +
                             " TcpNoDelay " + sock.getTcpNoDelay();
@@ -1621,8 +1614,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         List<TcpDiscoveryAbstractMessage> msgs = null;
 
                         while (!isInterrupted()) {
-                            TcpDiscoveryAbstractMessage msg = 
U.unmarshal(spi.marshaller(), in,
-                                
U.resolveClassLoader(spi.ignite().configuration()));
+                            TcpDiscoveryAbstractMessage msg = 
spi.readMessage(ses, sock.getSoTimeout());
 
                             if (msg instanceof 
TcpDiscoveryClientReconnectMessage) {
                                 TcpDiscoveryClientReconnectMessage res = 
(TcpDiscoveryClientReconnectMessage)msg;
@@ -2769,9 +2761,6 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private final Socket sock;
 
-        /** */
-        private final InputStream in;
-
         /**
          * @param sock Socket.
          * @throws IOException If failed to create stream.
@@ -2780,8 +2769,6 @@ class ClientImpl extends TcpDiscoveryImpl {
             assert sock != null;
 
             this.sock = sock;
-
-            this.in = new BufferedInputStream(sock.getInputStream());
         }
 
         /**
@@ -2792,13 +2779,6 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         }
 
-        /**
-         * @return Socket input stream.
-         */
-        InputStream stream() {
-            return in;
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
             return sock.toString();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 7905a5a8ac0..8b5bdadbebd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -17,11 +17,8 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.ObjectStreamException;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.io.StreamCorruptedException;
 import java.net.ConnectException;
@@ -877,11 +874,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         openedSock = true;
 
-                        spi.writeToSocket(sock, spi.socketStream(sock), new 
TcpDiscoveryPingRequest(locNodeId, clientNodeId),
+                        TcpDiscoveryIoSession ses = createSession(sock);
+
+                        spi.writeMessage(ses, new 
TcpDiscoveryPingRequest(locNodeId, clientNodeId),
                             
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                        TcpDiscoveryPingResponse res = spi.readMessage(sock, 
null, timeoutHelper.nextTimeoutChunk(
-                            spi.getAckTimeout()));
+                        TcpDiscoveryPingResponse res = spi.readMessage(ses, 
timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()));
 
                         if (locNodeId.equals(res.creatorNodeId())) {
                             if (log.isDebugEnabled())
@@ -1469,16 +1467,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                 long tsNanos = System.nanoTime();
 
                 sock = spi.openSocket(addr, timeoutHelper);
+                TcpDiscoveryIoSession ses = createSession(sock);
 
                 openSock = true;
 
                 TcpDiscoveryHandshakeRequest req = new 
TcpDiscoveryHandshakeRequest(locNodeId);
 
                 // Handshake.
-                spi.writeToSocket(sock, spi.socketStream(sock), req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeMessage(ses, req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, 
null, timeoutHelper.nextTimeoutChunk(
-                    ackTimeout0));
+                TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, 
timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                 if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                     boolean ignore = false;
@@ -1510,7 +1508,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Send message.
                 tsNanos = System.nanoTime();
 
-                spi.writeToSocket(sock, spi.socketStream(sock), msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeMessage(ses, msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 long tsNanos0 = System.nanoTime();
 
@@ -2957,8 +2955,8 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private Socket sock;
 
-        /** Output stream. */
-        private OutputStream out;
+        /** IO session. */
+        private TcpDiscoveryIoSession ses;
 
         /** Last time status message has been sent. */
         private long lastTimeStatusMsgSentNanos;
@@ -3522,7 +3520,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             try {
                                 sock = spi.openSocket(addr, timeoutHelper);
 
-                                out = spi.socketStream(sock);
+                                ses = createSession(sock);
 
                                 openSock = true;
 
@@ -3540,16 +3538,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         "] with timeout " + 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
                                 }
 
-                                spi.writeToSocket(sock, out, hndMsg,
-                                    
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                                spi.writeMessage(ses, hndMsg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 if (log.isDebugEnabled()) {
                                     log.debug("Reading handshake response with 
timeout " +
                                         
timeoutHelper.nextTimeoutChunk(ackTimeout0));
                                 }
 
-                                TcpDiscoveryHandshakeResponse res = 
spi.readMessage(sock, null,
-                                    
timeoutHelper.nextTimeoutChunk(ackTimeout0));
+                                TcpDiscoveryHandshakeResponse res = 
spi.readMessage(ses, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                 if (log.isDebugEnabled())
                                     log.debug("Handshake response: " + res);
@@ -3748,8 +3744,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         timeoutHelper = 
serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
 
                                     try {
-                                        spi.writeToSocket(sock, out, 
pendingMsg, timeoutHelper.nextTimeoutChunk(
-                                            spi.getSocketTimeout()));
+                                        spi.writeMessage(ses, pendingMsg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
                                     }
                                     finally {
                                         clearNodeAddedMessage(pendingMsg);
@@ -3800,11 +3795,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (latencyCheck && log.isInfoEnabled())
                                     log.info("Latency check message has been 
written to socket: " + msg.id());
 
-                                spi.writeToSocket(
-                                    sock,
-                                    out,
-                                    msg,
-                                    
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                                spi.writeMessage(ses, msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 long tsNanos0 = System.nanoTime();
 
@@ -5573,7 +5564,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
                 else if (leftNode.equals(next) && sock != null) {
                     try {
-                        spi.writeToSocket(sock, out, msg, 
spi.failureDetectionTimeoutEnabled() ?
+                        spi.writeMessage(ses, msg, 
spi.failureDetectionTimeoutEnabled() ?
                             spi.failureDetectionTimeout() : 
spi.getSocketTimeout());
 
                         if (log.isDebugEnabled())
@@ -6696,6 +6687,9 @@ class ServerImpl extends TcpDiscoveryImpl {
      * Thread that reads messages from the socket created for incoming 
connections.
      */
     private class SocketReader extends IgniteSpiThread {
+        /** IO session. */
+        private final TcpDiscoveryIoSession ses;
+
         /** Socket to read data from. */
         private final Socket sock;
 
@@ -6712,6 +6706,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             this.sock = sock;
 
+            ses = createSession(sock);
+
             setPriority(spi.threadPri);
         }
 
@@ -6731,8 +6727,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             boolean srvSock;
 
             try {
-                InputStream in;
-
                 try {
                     // Set socket options.
                     spi.configureSocketOptions(sock);
@@ -6744,15 +6738,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                     for (IgniteInClosure<Socket> connLsnr : 
spi.incomeConnLsnrs)
                         connLsnr.apply(sock);
 
-                    int rcvBufSize = sock.getReceiveBufferSize();
-
-                    in = new BufferedInputStream(sock.getInputStream(), 
rcvBufSize > 0 ? rcvBufSize : 8192);
-
                     byte[] buf = new byte[4];
                     int read = 0;
 
                     while (read < buf.length) {
-                        int r = in.read(buf, read, buf.length - read);
+                        int r = sock.getInputStream().read(buf, read, 
buf.length - read);
 
                         if (r >= 0)
                             read += r;
@@ -6787,7 +6777,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Restore timeout.
                     sock.setSoTimeout(timeout);
 
-                    TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, 
in, spi.netTimeout);
+                    TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, 
spi.netTimeout);
 
                     // Ping.
                     if (msg instanceof TcpDiscoveryPingRequest) {
@@ -6812,7 +6802,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     
res.clientExists(clientWorker.ping(timeoutHelper));
                             }
 
-                            spi.writeToSocket(sock, spi.socketStream(sock), 
res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                            spi.writeMessage(ses, res, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                             if (!(sock instanceof SSLSocket))
                                 sock.shutdownOutput();
@@ -6905,7 +6895,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             spi.getEffectiveSocketTimeout(srvSock) + " to " + 
rmtAddr + ":" + sock.getPort());
                     }
 
-                    spi.writeToSocket(sock, spi.socketStream(sock), res, 
spi.getEffectiveSocketTimeout(srvSock));
+                    spi.writeMessage(ses, res, 
spi.getEffectiveSocketTimeout(srvSock));
 
                     // It can happen if a remote node is stopped and it has a 
loopback address in the list of addresses,
                     // the local node sends a handshake request message on the 
loopback address, so we get here.
@@ -7028,8 +7018,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     try {
                         SecurityUtils.serializeVersion(1);
 
-                        TcpDiscoveryAbstractMessage msg = 
U.unmarshal(spi.marshaller(), in,
-                            
U.resolveClassLoader(spi.ignite().configuration()));
+                        // Use inifinite timeout for accepting new messages.
+                        TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, 
0);
 
                         msg.senderNodeId(nodeId);
 
@@ -7713,6 +7703,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Node ID. */
         private final UUID clientNodeId;
 
+        /** */
+        private final TcpDiscoveryIoSession ses;
+
         /** Socket. */
         private final Socket sock;
 
@@ -7741,6 +7734,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             this.sock = sock;
             this.clientNodeId = clientNodeId;
 
+            ses = createSession(sock);
+
             lastMetricsUpdateMsgTimeNanos = System.nanoTime();
         }
 
@@ -7804,7 +7799,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 byte[] msgBytes = msgT.get2();
 
                 if (msgBytes == null)
-                    msgBytes = U.marshal(spi.marshaller(), msg);
+                    msgBytes = ses.serializeMessage(msg);
 
                 DebugLogger msgLog = messageLogger(msg);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index d8d5164f4af..6ff2faa1974 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
@@ -439,6 +440,16 @@ abstract class TcpDiscoveryImpl {
         return res;
     }
 
+    /**
+     * Instantiates IO session for exchanging discovery messages with remote 
node.
+     *
+     * @param sock Socket to remote node.
+     * @return IO session for writing and reading {@link 
TcpDiscoveryAbstractMessage}.
+     */
+    TcpDiscoveryIoSession createSession(Socket sock) {
+        return new TcpDiscoveryIoSession(sock, spi);
+    }
+
     /**
      * @param msg Message.
      * @return Message logger.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
new file mode 100644
index 00000000000..1bf398c9780
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StreamCorruptedException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
+
+/**
+ * Handles I/O operations between discovery nodes in the cluster. This class 
encapsulates the socket connection used
+ * by the {@link TcpDiscoverySpi} to exchange discovery protocol messages 
between nodes.
+ * <p>
+ * Currently, there are two modes for message serialization:
+ * <ul>
+ *     <li>Using {@link MessageSerializer} for messages implementing the 
{@link Message} interface.</li>
+ *     <li>Deprecated: Using {@link JdkMarshaller} for messages that have not 
yet been refactored.</li>
+ * </ul>
+ * A leading byte is used to distinguish between the modes. The byte will be 
removed in future.
+ */
+public class TcpDiscoveryIoSession {
+    /** Default size of buffer used for buffering socket in/out. */
+    private static final int DFLT_SOCK_BUFFER_SIZE = 8192;
+
+    /** Size for an intermediate buffer for serializing discovery messages. */
+    private static final int MSG_BUFFER_SIZE = 100;
+
+    /** Leading byte for messages use {@link JdkMarshaller} for serialization. 
*/
+    // TODO: remove these flags after refactoring all discovery messages.
+    static final byte JAVA_SERIALIZATION = (byte)1;
+
+    /** Leading byte for messages use {@link MessageSerializer} for 
serialization. */
+    static final byte MESSAGE_SERIALIZATION = (byte)2;
+
+    /** */
+    private final TcpDiscoverySpi spi;
+
+    /** Loads discovery messages classes during java deserialization. */
+    private final ClassLoader clsLdr;
+
+    /** */
+    private final Socket sock;
+
+    /** */
+    private final DirectMessageWriter msgWriter;
+
+    /** */
+    private final DirectMessageReader msgReader;
+
+    /** Buffered socket output stream. */
+    private final OutputStream out;
+
+    /** Buffered socket input stream. */
+    private final InputStream in;
+
+    /** Intermediate buffer for serializing discovery messages. */
+    private final ByteBuffer msgBuf;
+
+    /**
+     * Creates a new discovery I/O session bound to the given socket.
+     *
+     * @param sock Socket connected to a remote discovery node.
+     * @param spi  Discovery SPI instance owning this session.
+     * @throws IgniteException If an I/O error occurs while initializing 
buffers.
+     */
+    TcpDiscoveryIoSession(Socket sock, TcpDiscoverySpi spi) {
+        this.sock = sock;
+        this.spi = spi;
+
+        clsLdr = U.resolveClassLoader(spi.ignite().configuration());
+
+        msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
+
+        msgWriter = new DirectMessageWriter(spi.messageFactory());
+        msgReader = new DirectMessageReader(spi.messageFactory(), null);
+
+        try {
+            int sendBufSize = sock.getSendBufferSize() > 0 ? 
sock.getSendBufferSize() : DFLT_SOCK_BUFFER_SIZE;
+            int rcvBufSize = sock.getReceiveBufferSize() > 0 ? 
sock.getReceiveBufferSize() : DFLT_SOCK_BUFFER_SIZE;
+
+            out = new BufferedOutputStream(sock.getOutputStream(), 
sendBufSize);
+            in = new BufferedInputStream(sock.getInputStream(), rcvBufSize);
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * Writes a discovery message to the underlying socket output stream.
+     *
+     * @param msg Message to send to the remote node.
+     * @throws IgniteCheckedException If serialization fails.
+     */
+    void writeMessage(TcpDiscoveryAbstractMessage msg) throws 
IgniteCheckedException, IOException {
+        if (!(msg instanceof Message)) {
+            out.write(JAVA_SERIALIZATION);
+
+            U.marshal(spi.marshaller(), msg, out);
+
+            return;
+        }
+
+        try {
+            out.write(MESSAGE_SERIALIZATION);
+
+            serializeMessage((Message)msg, out);
+
+            out.flush();
+        }
+        catch (Exception e) {
+            // Keep logic similar to `U.marshal(...)`.
+            if (e instanceof IgniteCheckedException)
+                throw (IgniteCheckedException)e;
+
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Reads the next discovery message from the socket input stream.
+     *
+     * @param <T> Type of the expected message.
+     * @return Deserialized message instance.
+     * @throws IgniteCheckedException If deserialization fails.
+     */
+    <T> T readMessage() throws IgniteCheckedException, IOException {
+        byte serMode = (byte)in.read();
+
+        if (JAVA_SERIALIZATION == serMode)
+            return U.unmarshal(spi.marshaller(), in, clsLdr);
+
+        try {
+            if (MESSAGE_SERIALIZATION != serMode) {
+                detectSslAlert(serMode, in);
+
+                throw new IgniteCheckedException("Received unexpected byte 
while reading discovery message: " + serMode);
+            }
+
+            byte b0 = (byte)in.read();
+            byte b1 = (byte)in.read();
+
+            Message msg = spi.messageFactory().create(makeMessageType(b0, b1));
+
+            msgReader.reset();
+            msgReader.setBuffer(msgBuf);
+
+            MessageSerializer msgSer = 
spi.messageFactory().serializer(msg.directType());
+
+            boolean finished;
+
+            do {
+                msgBuf.clear();
+
+                int read = in.read(msgBuf.array(), 0, msgBuf.limit());
+
+                if (read == -1)
+                    throw new EOFException("Connection closed before message 
was fully read.");
+
+                msgBuf.limit(read);
+
+                finished = msgSer.readFrom(msg, msgReader);
+            } while (!finished);
+
+            return (T)msg;
+        }
+        catch (Exception e) {
+            // Keep logic similar to `U.marshal(...)`.
+            if (e instanceof IgniteCheckedException)
+                throw (IgniteCheckedException)e;
+
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Serializes a discovery message into a byte array.
+     *
+     * @param msg Discovery message to serialize.
+     * @return Serialized byte array containing the message data.
+     * @throws IgniteCheckedException If serialization fails.
+     * @throws IOException If serialization fails.
+     */
+    byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws 
IgniteCheckedException, IOException {
+        if (!(msg instanceof Message))
+            return U.marshal(spi.marshaller(), msg);
+
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            serializeMessage((Message)msg, out);
+
+            return out.toByteArray();
+        }
+    }
+
+    /** @return Socket. */
+    public Socket socket() {
+        return sock;
+    }
+
+    /**
+     * Serializes a discovery message into given output stream.
+     *
+     * @param m Discovery message to serialize.
+     * @param out Output stream to write serialized message.
+     * @throws IOException If serialization fails.
+     */
+    private void serializeMessage(Message m, OutputStream out) throws 
IOException {
+        MessageSerializer msgSer = 
spi.messageFactory().serializer(m.directType());
+
+        msgWriter.reset();
+        msgWriter.setBuffer(msgBuf);
+
+        boolean finished;
+
+        do {
+            finished = msgSer.writeTo(m, msgWriter);
+
+            out.write(msgBuf.array(), 0, msgBuf.position());
+
+            msgBuf.clear();
+        }
+        while (!finished);
+    }
+
+    /**
+     * Checks wheter input stream contains SSL alert.
+     * See handling {@code StreamCorruptedException} in {@link #readMessage()}.
+     * Keeps logic similar to {@link 
java.io.ObjectInputStream#readStreamHeader}.
+     */
+    private void detectSslAlert(byte firstByte, InputStream in) throws 
IOException {
+        byte[] hdr = new byte[4];
+        hdr[0] = firstByte;
+        int read = in.readNBytes(hdr, 1, 3);
+
+        if (read < 3)
+            throw new EOFException();
+
+        String hex = String.format("%02x%02x%02x%02x", hdr[0], hdr[1], hdr[2], 
hdr[3]);
+
+        if (hex.matches("15....00"))
+            throw new StreamCorruptedException("invalid stream header: " + 
hex);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index b82f3fca33a..1ce14f60cdd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
-import java.io.BufferedOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.io.StreamCorruptedException;
@@ -58,6 +56,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import 
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
@@ -73,6 +73,9 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiAdapter;
@@ -450,6 +453,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
     /** */
     protected IgniteSpiContext spiCtx;
 
+    /** Discovery messages factory. */
+    private MessageFactory msgFactory;
+
     /** For test purposes. */
     private boolean skipAddrsRandomization = false;
 
@@ -1113,6 +1119,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
         locNodeVer = ver;
     }
 
+    /** @return Discovery messages factory. */
+    public MessageFactory messageFactory() {
+        return msgFactory;
+    }
+
     /**
      * Gets ID of the local node.
      *
@@ -1565,18 +1576,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
         return openSocket(createSocket(), sockAddr, timeoutHelper);
     }
 
-    /**
-     * @param sock Socket.
-     * @return Buffered stream wrapping socket stream.
-     * @throws IOException If failed.
-     */
-    final BufferedOutputStream socketStream(Socket sock) throws IOException {
-        int bufSize = sock.getSendBufferSize();
-
-        return bufSize > 0 ? new BufferedOutputStream(sock.getOutputStream(), 
bufSize) :
-            new BufferedOutputStream(sock.getOutputStream());
-    }
-
     /**
      * Connects to remote address sending {@code U.IGNITE_HEADER} when 
connection is established.
      *
@@ -1692,6 +1691,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
         try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
             OutputStream out = sock.getOutputStream();
 
+            // Write Ignite header without leading byte.
+            if (msg != null) {
+                byte mode = msg instanceof Message ? 
TcpDiscoveryIoSession.MESSAGE_SERIALIZATION : 
TcpDiscoveryIoSession.JAVA_SERIALIZATION;
+
+                out.write(mode);
+            }
+
             out.write(data);
 
             out.flush();
@@ -1728,22 +1734,24 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
     /**
      * Writes message to the socket.
      *
-     * @param sock Socket.
-     * @param out Stream to write to.
+     * @param ses IO session.
      * @param msg Message.
      * @param timeout Timeout.
      * @throws IOException If IO failed or write timed out.
      * @throws IgniteCheckedException If marshalling failed.
      */
-    protected void writeToSocket(Socket sock,
-        OutputStream out,
+    protected void writeMessage(
+        TcpDiscoveryIoSession ses,
         TcpDiscoveryAbstractMessage msg,
-        long timeout) throws IOException, IgniteCheckedException {
+        long timeout
+    ) throws IOException, IgniteCheckedException {
+        Socket sock = ses.socket();
+
         assert sock != null;
         assert msg != null;
 
         try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
-            U.marshal(marshaller(), msg, out);
+            ses.writeMessage(msg);
         }
         catch (IgniteCheckedException e) {
             SSLException sslEx = checkSslException(sock, e);
@@ -1782,15 +1790,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
     /**
      * Reads message from the socket limiting read time.
      *
-     * @param sock Socket.
-     * @param in Input stream (in case socket stream was wrapped).
+     * @param ses IO session.
      * @param timeout Socket timeout for this operation.
      * @return Message.
      * @throws IOException If IO failed or read timed out.
      * @throws IgniteCheckedException If unmarshalling failed.
      */
-    protected <T> T readMessage(Socket sock, @Nullable InputStream in, long 
timeout) throws IOException,
+    protected <T> T readMessage(TcpDiscoveryIoSession ses, long timeout) 
throws IOException,
         IgniteCheckedException {
+        Socket sock = ses.socket();
+
         assert sock != null;
 
         int oldTimeout = sock.getSoTimeout();
@@ -1798,10 +1807,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
         try {
             sock.setSoTimeout((int)timeout);
 
-            T res = U.unmarshal(marshaller(), in == null ? 
sock.getInputStream() : in,
-                U.resolveClassLoader(ignite.configuration()));
-
-            return res;
+            return ses.readMessage();
         }
         catch (IOException | IgniteCheckedException e) {
             if (X.hasCause(e, SocketTimeoutException.class))
@@ -2109,6 +2115,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
 
         registerMBean(igniteInstanceName, new TcpDiscoverySpiMBeanImpl(this), 
TcpDiscoverySpiMBean.class);
 
+        msgFactory = new IgniteMessageFactoryImpl(
+            new MessageFactoryProvider[] { new DiscoveryMessageFactory() });
+
         impl.spiStart(igniteInstanceName);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 93d8bcf9e5e..505980bef3d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -58,6 +59,7 @@ public abstract class TcpDiscoveryAbstractMessage implements 
Serializable {
     private transient UUID sndNodeId;
 
     /** Message ID. */
+    @Order(0)
     private IgniteUuid id;
 
     /**
@@ -135,6 +137,15 @@ public abstract class TcpDiscoveryAbstractMessage 
implements Serializable {
         return id;
     }
 
+    /**
+     * Sets message ID.
+     *
+     * @param id Message ID.
+     */
+    public void id(IgniteUuid id) {
+        this.id = id;
+    }
+
     /**
      * Gets sender node ID.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java
index e282410f40e..c65db903e0f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java
@@ -18,17 +18,24 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  * Message telling joining node that it failed coordinator's validation check.
  */
-public class TcpDiscoveryCheckFailedMessage extends 
TcpDiscoveryAbstractMessage {
+public class TcpDiscoveryCheckFailedMessage extends 
TcpDiscoveryAbstractMessage implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Coordinator version. */
-    private final String err;
+    @Order(value = 1, method = "error")
+    private String err;
+
+    /** */
+    public TcpDiscoveryCheckFailedMessage() {
+    }
 
     /**
      * Constructor.
@@ -49,6 +56,18 @@ public class TcpDiscoveryCheckFailedMessage extends 
TcpDiscoveryAbstractMessage
         return err;
     }
 
+    /**
+     * @param err message from coordinator.
+     */
+    public void error(String err) {
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 0;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryCheckFailedMessage.class, this, "super", 
super.toString());
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
index 285a70efc74..b29a8171fbc 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
@@ -45,6 +44,7 @@ import 
org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -363,12 +363,12 @@ public class IgniteClientRejoinTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
-            if (blockAll || block && sock.getPort() == 47500)
+            if (blockAll || block && ses.socket().getPort() == 47500)
                 throw new SocketException("Test discovery exception");
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
index 304991e46df..b69d65446ff 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Arrays;
@@ -33,6 +32,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -313,14 +313,14 @@ public class IgniteDiscoveryMassiveNodeFailTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
-            assertNotFailedNode(sock);
+            assertNotFailedNode(ses.socket());
 
             if (isDrop(msg))
                 return;
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
 
         /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
index 75cab20d4ac..4dfa5f43da5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -40,6 +38,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -75,9 +74,8 @@ public class CacheClientsConcurrentStartTest extends 
GridCommonAbstractTest {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
         TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
-            @Override protected void writeToSocket(
-                Socket sock,
-                OutputStream out,
+            @Override protected void writeMessage(
+                TcpDiscoveryIoSession ses,
                 TcpDiscoveryAbstractMessage msg,
                 long timeout
             ) throws IOException, IgniteCheckedException {
@@ -92,7 +90,7 @@ public class CacheClientsConcurrentStartTest extends 
GridCommonAbstractTest {
                     }
                 }
 
-                super.writeToSocket(sock, out, msg, timeout);
+                super.writeMessage(ses, msg, timeout);
             }
         };
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
index 2c055f2dd99..fdfb3b90cd3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -38,6 +37,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -228,13 +228,12 @@ public abstract class 
IgniteCacheTopologySplitAbstractTest extends GridCommonAbs
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
-            checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), 
timeout);
+            
checkSegmented((InetSocketAddress)ses.socket().getRemoteSocketAddress(), 
timeout);
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
index 0ba28ca3e01..aaae9d6b9e2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
@@ -18,8 +18,6 @@
 package 
org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
@@ -39,6 +37,7 @@ import 
org.apache.ignite.internal.util.distributed.InitMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -219,9 +218,8 @@ public class IncrementalSnapshotJoiningClientTest extends 
AbstractIncrementalSna
     /** */
     private static class ClientBlockingDiscoverySpi extends TcpDiscoverySpi {
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(
-            Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(
+            TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout
         ) throws IOException, IgniteCheckedException {
@@ -231,7 +229,7 @@ public class IncrementalSnapshotJoiningClientTest extends 
AbstractIncrementalSna
                 U.awaitQuiet(unblockClientJoinReq);
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
index 8bce7675f88..ab8022dbdd2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.spi.communication.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -37,6 +36,7 @@ import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -208,15 +208,14 @@ public class IgniteTcpCommunicationConnectOnInitTest 
extends GridCommonAbstractT
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(
-            Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(
+            TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout
         ) throws IOException, IgniteCheckedException {
             awaitLatch();
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
index 0e26ff95b76..aeba1e4d7c1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
@@ -18,9 +18,6 @@
 package org.apache.ignite.spi.communication.tcp;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.Set;
 import java.util.UUID;
@@ -45,10 +42,10 @@ import 
org.apache.ignite.plugin.segmentation.SegmentationPolicy;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
 import org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 /**
@@ -281,7 +278,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends 
GridCommonAbstractTe
         private final CountDownLatch netDisabledLatch = new CountDownLatch(1);
 
         /** {@inheritDoc} */
-        @Override protected <T> T readMessage(Socket sock, @Nullable 
InputStream in,
+        @Override protected <T> T readMessage(TcpDiscoveryIoSession ses,
             long timeout) throws IOException, IgniteCheckedException {
             if (netDisabled) {
                 U.sleep(timeout);
@@ -289,11 +286,11 @@ public class TcpCommunicationSpiSkipMessageSendTest 
extends GridCommonAbstractTe
                 throw new SocketTimeoutException("CustomDiscoverySpi: network 
is disabled.");
             }
             else
-                return super.readMessage(sock, in, timeout);
+                return super.readMessage(ses, timeout);
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (netDisabled) {
                 netDisabledLatch.countDown();
@@ -301,7 +298,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends 
GridCommonAbstractTe
                 throw new SocketTimeoutException("CustomDiscoverySpi: network 
is disabled.");
             }
             else
-                super.writeToSocket(sock, out, msg, timeout);
+                super.writeMessage(ses, msg, timeout);
         }
 
         /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
index 8b0c912eed5..3a02b54260d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.spi.discovery;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
 import java.util.Collections;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -28,6 +26,7 @@ import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -147,7 +146,7 @@ public class LongClientConnectToClusterTest extends 
GridCommonAbstractTest {
         public static final int DELAY_MSG_PERIOD_MILLIS = 2_000;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, 
IgniteCheckedException {
             if (msg instanceof TcpDiscoveryNodeAddFinishedMessage && 
msg.topologyVersion() == 3) {
                 log.info("Catched discovery message: " + msg);
@@ -162,7 +161,7 @@ public class LongClientConnectToClusterTest extends 
GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
index 65688863a84..f72eac99994 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.Socket;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -88,13 +87,12 @@ public class BlockTcpDiscoverySpi extends 
TestTcpDiscoverySpi {
     }
 
     /** {@inheritDoc} */
-    @Override protected void writeToSocket(Socket sock,
-        OutputStream out,
+    @Override protected void writeMessage(TcpDiscoveryIoSession ses,
         TcpDiscoveryAbstractMessage msg,
         long timeout) throws IOException, IgniteCheckedException {
         if (spiCtx != null)
             apply(spiCtx.localNode(), msg);
 
-        super.writeToSocket(sock, out, msg, timeout);
+        super.writeMessage(ses, msg, timeout);
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
index a184cd42fe4..b8a82d52cc9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
@@ -175,6 +175,7 @@ public class DiscoveryUnmarshalVulnerabilityTest extends 
GridCommonAbstractTest
             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
         ) {
             oos.write(U.IGNITE_HEADER);
+            oos.write((byte)1); // Flag for java serialization.
             oos.write(data);
         }
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
index 472682c1996..ca316204037 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
@@ -18,10 +18,8 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -211,9 +209,8 @@ public class IgniteClientConnectTest extends 
GridCommonAbstractTest {
      */
     class TestTcpDiscoverySpi extends TcpDiscoverySpi {
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(
-            Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(
+            TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout
         ) throws IOException, IgniteCheckedException {
@@ -228,10 +225,10 @@ public class IgniteClientConnectTest extends 
GridCommonAbstractTest {
                         fail("Unexpected interrupt on nodeAddFinishedDelay");
                     }
 
-                super.writeToSocket(sock, out, msg, timeout);
+                super.writeMessage(ses, msg, timeout);
             }
             else
-                super.writeToSocket(sock, out, msg, timeout);
+                super.writeMessage(ses, msg, timeout);
         }
 
         /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 2708b606007..7eef5a2739b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
@@ -42,7 +40,6 @@ import 
org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
-import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -475,8 +472,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest 
extends TcpClientDiscov
         }
 
         /**  */
-        @Override protected void writeToSocket(Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (writeToSocketDelay > 0) {
@@ -490,8 +486,8 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest 
extends TcpClientDiscov
                 }
             }
 
-            if (sock.getSoTimeout() >= writeToSocketDelay)
-                super.writeToSocket(sock, out, msg, timeout);
+            if (ses.socket().getSoTimeout() >= writeToSocketDelay)
+                super.writeMessage(ses, msg, timeout);
             else
                 throw new SocketTimeoutException("Write to socket delay 
timeout exception.");
         }
@@ -521,14 +517,14 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest 
extends TcpClientDiscov
         }
 
         /** {@inheritDoc} */
-        @Override protected <T> T readMessage(Socket sock, @Nullable 
InputStream in, long timeout)
+        @Override protected <T> T readMessage(TcpDiscoveryIoSession ses, long 
timeout)
             throws IOException, IgniteCheckedException {
             long currTimeout = getLocalNode().isClient() ?
                 clientFailureDetectionTimeout() : failureDetectionTimeout();
 
             if (readDelay < currTimeout) {
                 try {
-                    return super.readMessage(sock, in, timeout);
+                    return super.readMessage(ses, timeout);
                 }
                 catch (Exception e) {
                     err = e;
@@ -537,7 +533,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest 
extends TcpClientDiscov
                 }
             }
             else {
-                T msg = super.readMessage(sock, in, timeout);
+                T msg = super.readMessage(ses, timeout);
 
                 if (msg instanceof TcpDiscoveryPingRequest) {
                     try {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index f3d45f9f097..6ca4782c65c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
@@ -2573,19 +2572,18 @@ public class TcpClientDiscoverySpiSelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
-            if (!onMessage(sock, msg))
+            if (!onMessage(ses.socket(), msg))
                 return;
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
 
             if (afterWrite != null)
-                afterWrite.apply(msg, sock);
+                afterWrite.apply(msg, ses.socket());
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
index 10c6aa14242..f6b70244744 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Collections;
@@ -302,9 +301,8 @@ public class TcpDiscoveryCoordinatorFailureTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(
-            Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(
+            TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout
         ) throws IOException, IgniteCheckedException {
@@ -314,7 +312,7 @@ public class TcpDiscoveryCoordinatorFailureTest extends 
GridCommonAbstractTest {
                 msg = new TcpDiscoveryConnectionCheckMessage(locNode);
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
index 481eb24a0fb..e1e647c01e6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -199,10 +198,10 @@ public class TcpDiscoveryFailedJoinTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
-            if (sock.getPort() != FAIL_PORT)
-                super.writeToSocket(sock, out, msg, timeout);
+            if (ses.socket().getPort() != FAIL_PORT)
+                super.writeMessage(ses, msg, timeout);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index 7162083af9c..3b2c9258044 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -17,7 +17,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -538,9 +537,9 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
 
         Object spis = GridTestUtils.getFieldValue(disco, 
GridManagerAdapter.class, "spis");
 
-        OutputStream out = GridTestUtils.getFieldValue(((Object[])spis)[0], 
"impl", "msgWorker", "out");
+        TcpDiscoveryIoSession ses = 
GridTestUtils.getFieldValue(((Object[])spis)[0], "impl", "msgWorker", "ses");
 
-        out.close();
+        ses.socket().getOutputStream().close();
     }
 
     /**
@@ -627,21 +626,21 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             BiConsumer<Socket, TcpDiscoveryHandshakeRequest> hsRqLsnr;
             BiConsumer<Socket, TcpDiscoveryHandshakeResponse> hsRespLsnr;
 
             if (msg instanceof TcpDiscoveryHandshakeRequest && (hsRqLsnr = 
this.hsRqLsnr.get()) != null)
-                hsRqLsnr.accept(sock, (TcpDiscoveryHandshakeRequest)msg);
+                hsRqLsnr.accept(ses.socket(), 
(TcpDiscoveryHandshakeRequest)msg);
 
             if (msg instanceof TcpDiscoveryHandshakeResponse && (hsRespLsnr = 
this.hsRespLsnr.get()) != null)
-                hsRespLsnr.accept(sock, (TcpDiscoveryHandshakeResponse)msg);
+                hsRespLsnr.accept(ses.socket(), 
(TcpDiscoveryHandshakeResponse)msg);
 
-            if (dropMsg(sock))
+            if (dropMsg(ses.socket()))
                 return;
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java
index 0bdf0a0558a..f449be13c8e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java
@@ -167,8 +167,8 @@ public class TcpDiscoveryNodeJoinAndFailureTest extends 
GridCommonAbstractTest {
 
                     if (nodeId.equals(node2Id)) {
                         Object workerObj = GridTestUtils.getFieldValue(impl, 
"msgWorker");
-
-                        OutputStream out = 
GridTestUtils.getFieldValue(workerObj, "out");
+                        TcpDiscoveryIoSession ses = 
GridTestUtils.getFieldValue(workerObj, "ses");
+                        OutputStream out = GridTestUtils.getFieldValue(ses, 
"out");
 
                         try {
                             out.close();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
index ddb52738db5..15cb1f90382 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.Socket;
 import java.util.Set;
 import org.apache.ignite.Ignite;
@@ -265,10 +264,10 @@ public class TcpDiscoveryPendingMessageDeliveryTest 
extends GridCommonAbstractTe
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (!blockMsgs)
-                super.writeToSocket(sock, out, msg, timeout);
+                super.writeMessage(ses, msg, timeout);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index c01fc8a5cbc..3195284ba92 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -19,9 +19,7 @@ package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -2448,9 +2446,8 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(
-            Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(
+            TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             // Test relies on an error in this thread only.
@@ -2459,14 +2456,14 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
             if (startTest && !(msg instanceof 
TcpDiscoveryConnectionCheckMessage) && ringMsgWorkerThread) {
                 int errPort = errPortSupplier.get();
 
-                if (sock.getPort() == errPort) {
+                if (ses.socket().getPort() == errPort) {
                     log.info("Fail write on message send [port=" + errPort + 
", msg=" + msg + ']');
 
                     throw new SocketTimeoutException();
                 }
                 else if (locNode.discoveryPort() == errPort) {
                     if (sleepEndTime == 0) {
-                        errNextPort = sock.getPort();
+                        errNextPort = ses.socket().getPort();
 
                         sleepEndTime = System.currentTimeMillis() + 3000;
                     }
@@ -2487,8 +2484,8 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
                         log.info("Stop sleep on message send: " + msg);
 
-                        if (sock.getPort() == errNextPort) {
-                            log.info("Fail write after sleep [port=" + 
sock.getPort() + ", msg=" + msg + ']');
+                        if (ses.socket().getPort() == errNextPort) {
+                            log.info("Fail write after sleep [port=" + 
ses.socket().getPort() + ", msg=" + msg + ']');
 
                             throw new SocketTimeoutException();
                         }
@@ -2496,7 +2493,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
     }
 
@@ -2514,7 +2511,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         static volatile boolean checkClientNodeAddFinished;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryNodeAddedMessage) {
@@ -2536,7 +2533,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
 
         /**
@@ -2580,7 +2577,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         private volatile boolean failed;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             boolean add = msgIds.add(msg.id());
@@ -2591,7 +2588,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                 failed = true;
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
     }
 
@@ -2603,8 +2600,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         private volatile boolean stopBeforeSndAck;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (stopBeforeSndAck) {
@@ -2616,7 +2612,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                         if (custMsg instanceof 
StartRoutineAckDiscoveryMessage) {
                             log.info("Skip message send and stop node: " + 
msg);
 
-                            sock.close();
+                            ses.socket().close();
 
                             GridTestUtils.runAsync(new Callable<Object>() {
                                 @Override public Object call() throws 
Exception {
@@ -2635,7 +2631,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
     }
 
@@ -2666,8 +2662,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock,
-            OutputStream out,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses,
             TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (stop)
@@ -2678,7 +2673,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
                 log.info("IO error on message send [locNode=" + locNode + ", 
msg=" + msg + ']');
 
-                sock.close();
+                ses.socket().close();
 
                 throw new SocketTimeoutException();
             }
@@ -2688,7 +2683,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                 failMsg.compareAndSet(false, true)) {
                 log.info("IO error on message send [locNode=" + locNode + ", 
msg=" + msg + ']');
 
-                sock.close();
+                ses.socket().close();
 
                 throw new SocketTimeoutException();
             }
@@ -2700,7 +2695,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
                 log.info("Skip messages send and stop node [locNode=" + 
locNode + ", msg=" + msg + ']');
 
-                sock.close();
+                ses.socket().close();
 
                 GridTestUtils.runAsync(new Callable<Object>() {
                     @Override public Object call() throws Exception {
@@ -2713,7 +2708,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                 return;
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
     }
 
@@ -2728,7 +2723,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         private boolean stop;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryCustomEventMessage && latch != 
null) {
                 log.info("Stop node on custom event: " + msg);
@@ -2741,7 +2736,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
             if (stop)
                 return;
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
     }
 
@@ -2762,7 +2757,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         private boolean debug;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryNodeAddedMessage) {
                 if (nodeAdded1 != null) {
@@ -2790,7 +2785,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
             if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
                 log.info("--- Send custom event: " + msg);
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
     }
 
@@ -2818,7 +2813,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
 
             if (stop) {
@@ -2835,7 +2830,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
     }
 
@@ -2847,12 +2842,12 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
         private volatile boolean stop;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (stop)
                 throw new RuntimeException("Failing ring message worker 
explicitly");
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
                 stop = true;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
index 2368cb6d19b..55f83d72fc6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import org.apache.ignite.IgniteCheckedException;
@@ -303,13 +302,13 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest 
extends AbstractDiscoverySelf
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg, long timeout)
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg, long timeout)
             throws IOException, IgniteCheckedException {
             if (!(msg instanceof TcpDiscoveryPingRequest)) {
                 if (cntConnCheckMsg && msg instanceof 
TcpDiscoveryConnectionCheckMessage)
                     connCheckStatusMsgCntSent++;
 
-                super.writeToSocket(sock, out, msg, timeout);
+                super.writeMessage(ses, msg, timeout);
 
                 return;
             }
@@ -329,7 +328,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends 
AbstractDiscoverySelf
                 }
             }
             else
-                super.writeToSocket(sock, out, msg, timeout);
+                super.writeMessage(ses, msg, timeout);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
index e06733a3810..5a0726e7a60 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.Socket;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -400,13 +399,13 @@ public class TcpDiscoverySpiReconnectDelayTest extends 
GridCommonAbstractTest {
         private final AtomicInteger failReconReq = new AtomicInteger();
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
 
-            if (!onMessage(sock, msg))
+            if (!onMessage(ses.socket(), msg))
                 return;
 
-            super.writeToSocket(sock, out, msg, timeout);
+            super.writeMessage(ses, msg, timeout);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
index 4c876d14b0a..dcf9931ef92 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.StreamCorruptedException;
-import java.net.Socket;
 import java.util.concurrent.Callable;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.IgniteCheckedException;
@@ -28,7 +26,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 /**
@@ -177,7 +174,7 @@ public class TcpDiscoverySslSecuredUnsecuredTest extends 
GridCommonAbstractTest
         }
 
         /** {@inheritDoc} */
-        @Override protected <T> T readMessage(final Socket sock, @Nullable 
final InputStream in,
+        @Override protected <T> T readMessage(final TcpDiscoveryIoSession ses,
             final long timeout) throws IOException, IgniteCheckedException {
             if (cnt-- > 0) {
                 if (plain)
@@ -186,7 +183,7 @@ public class TcpDiscoverySslSecuredUnsecuredTest extends 
GridCommonAbstractTest
                     throw new SSLException("Test SSL exception");
             }
 
-            return super.readMessage(sock, in, timeout);
+            return super.readMessage(ses, timeout);
         }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index 669b3112fd8..e3038bf5dc6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
@@ -48,7 +46,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi 
implements IgniteDiscov
     private IgniteDiscoverySpiInternalListener internalLsnr;
 
     /** {@inheritDoc} */
-    @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+    @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
         IgniteCheckedException {
         if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
             return;
@@ -61,7 +59,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi 
implements IgniteDiscov
                 internalLsnr.beforeReconnect(locNode, log);
         }
 
-        super.writeToSocket(sock, out, msg, timeout);
+        super.writeMessage(ses, msg, timeout);
     }
 
     /** {@inheritDoc} */


Reply via email to