This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c0df41f0b2 IGNITE-25482 Use MessageSerializer for handshake messages 
(#12380)
6c0df41f0b2 is described below

commit 6c0df41f0b2f76899da11e7bb99612ac1d9d4bba
Author: Maksim Timonin <[email protected]>
AuthorDate: Fri Oct 17 18:04:25 2025 +0300

    IGNITE-25482 Use MessageSerializer for handshake messages (#12380)
---
 .../apache/ignite/internal/MessageProcessor.java   |   6 +-
 .../internal/MessageSerializerGenerator.java       |   7 +-
 .../managers/communication/GridIoManager.java      |   6 +-
 .../communication/GridIoMessageFactory.java        |  13 ++-
 .../ignite/internal/util/nio/GridDirectParser.java |   8 +-
 .../ignite/internal/util/nio/GridNioServer.java    |  51 ++++++++-
 .../extensions/communication/MessageFormatter.java |   7 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java    |   4 +-
 .../spi/communication/tcp/TcpCommunicationSpi.java |  17 +--
 .../tcp/internal/ClusterStateProvider.java         |   7 ++
 .../tcp/internal/GridNioServerWrapper.java         |  24 +++--
 .../tcp/internal/InboundConnectionHandler.java     |   4 +-
 .../tcp/internal/TcpHandshakeExecutor.java         | 119 ++++++++++++++++-----
 .../tcp/messages/HandshakeMessage.java             |  77 ++++++-------
 .../tcp/messages/HandshakeWaitMessage.java         |  21 +---
 .../communication/tcp/messages/NodeIdMessage.java  |  49 +++------
 .../tcp/messages/RecoveryLastReceivedMessage.java  |  32 ++----
 ...iteIoCommunicationMessageSerializationTest.java |   9 +-
 .../tcp/TcpCommunicationHandshakeTimeoutTest.java  |   5 +-
 .../ignite/testframework/GridSpiTestContext.java   |   4 +-
 20 files changed, 263 insertions(+), 207 deletions(-)

diff --git 
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageProcessor.java
 
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageProcessor.java
index 177d8a87ad2..afbc411bb32 100644
--- 
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageProcessor.java
+++ 
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageProcessor.java
@@ -68,11 +68,15 @@ public class MessageProcessor extends AbstractProcessor {
     /** Base interface that every message must implement. */
     static final String MESSAGE_INTERFACE = 
"org.apache.ignite.plugin.extensions.communication.Message";
 
+    /** This is the only message with zero fields. A serializer must be 
generated due to restrictions in our communication process. */
+    static final String HANDSHAKE_WAIT_MESSAGE = 
"org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage";
+
     /**
      * Processes all classes implementing the {@code Message} interface and 
generates corresponding serializer code.
      */
     @Override public boolean process(Set<? extends TypeElement> annotations, 
RoundEnvironment roundEnv) {
         TypeMirror msgType = 
processingEnv.getElementUtils().getTypeElement(MESSAGE_INTERFACE).asType();
+        TypeMirror handshakeWaitMsgType = 
processingEnv.getElementUtils().getTypeElement(HANDSHAKE_WAIT_MESSAGE).asType();
 
         Map<TypeElement, List<VariableElement>> msgFields = new HashMap<>();
 
@@ -90,7 +94,7 @@ public class MessageProcessor extends AbstractProcessor {
 
             List<VariableElement> fields = orderedFields(clazz);
 
-            if (!fields.isEmpty())
+            if (!fields.isEmpty() || 
processingEnv.getTypeUtils().isAssignable(clazz.asType(), handshakeWaitMsgType))
                 msgFields.put(clazz, fields);
         }
 
diff --git 
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
 
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
index d052aeb1dda..2ad8b96c728 100644
--- 
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
+++ 
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
@@ -650,9 +650,10 @@ class MessageSerializerGenerator {
 
     /** */
     private void finish(List<String> code) {
-        // Remove the last empty line for the last "case".
-        String removed = code.remove(code.size() - 1);
-        assert EMPTY.equals(removed) : removed;
+        String lastLine = code.get(code.size() - 1);
+
+        if (EMPTY.equals(lastLine))
+            code.remove(code.size() - 1);
 
         code.add(line("}"));
         code.add(EMPTY);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index f2f23c443c3..280f4fb1e77 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -433,13 +433,11 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
         }
         else {
             formatter = new MessageFormatter() {
-                @Override public MessageWriter writer(UUID rmtNodeId, 
MessageFactory msgFactory) {
-                    assert rmtNodeId != null;
-
+                @Override public MessageWriter writer(MessageFactory 
msgFactory) {
                     return new DirectMessageWriter(msgFactory);
                 }
 
-                @Override public MessageReader reader(UUID rmtNodeId, 
MessageFactory msgFactory) {
+                @Override public MessageReader reader(MessageFactory 
msgFactory) {
                     return new DirectMessageReader(msgFactory, 
ctx.cacheObjects());
                 }
             };
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 920eef60d96..752448712fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -87,6 +87,8 @@ import 
org.apache.ignite.internal.codegen.GridQueryNextPageRequestSerializer;
 import org.apache.ignite.internal.codegen.GridQueryNextPageResponseSerializer;
 import org.apache.ignite.internal.codegen.GridTaskCancelRequestSerializer;
 import org.apache.ignite.internal.codegen.GridTaskResultRequestSerializer;
+import org.apache.ignite.internal.codegen.HandshakeMessageSerializer;
+import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
 import 
org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
 import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
 import 
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
@@ -96,6 +98,8 @@ import 
org.apache.ignite.internal.codegen.MetadataRequestMessageSerializer;
 import 
org.apache.ignite.internal.codegen.MissingMappingRequestMessageSerializer;
 import 
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
 import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
+import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
+import 
org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
 import 
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
 import 
org.apache.ignite.internal.codegen.ServiceSingleNodeDeploymentResultBatchSerializer;
 import org.apache.ignite.internal.codegen.SessionChannelMessageSerializer;
@@ -271,10 +275,11 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)-25, TxLock::new, new TxLockSerializer());
         factory.register((short)-24, TxLocksRequest::new, new 
TxLocksRequestSerializer());
         factory.register((short)-23, TxLocksResponse::new, new 
TxLocksResponseSerializer());
-        factory.register(TcpCommunicationSpi.NODE_ID_MSG_TYPE, 
NodeIdMessage::new);
-        factory.register(TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE, 
RecoveryLastReceivedMessage::new);
-        factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE, 
HandshakeMessage::new);
-        factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, 
HandshakeWaitMessage::new);
+        factory.register(TcpCommunicationSpi.NODE_ID_MSG_TYPE, 
NodeIdMessage::new, new NodeIdMessageSerializer());
+        factory.register(TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE, 
RecoveryLastReceivedMessage::new,
+            new RecoveryLastReceivedMessageSerializer());
+        factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE, 
HandshakeMessage::new, new HandshakeMessageSerializer());
+        factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, 
HandshakeWaitMessage::new, new HandshakeWaitMessageSerializer());
         factory.register((short)0, GridJobCancelRequest::new, new 
GridJobCancelRequestSerializer());
         factory.register((short)1, GridJobExecuteRequest::new);
         factory.register((short)2, GridJobExecuteResponse::new);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index cde984913a8..65db20856c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
@@ -83,8 +84,11 @@ public class GridDirectParser implements GridNioParser {
 
             boolean finished = false;
 
-            if (msg != null && buf.hasRemaining())
-                finished = msg.readFrom(buf, reader);
+            if (msg != null && buf.hasRemaining()) {
+                MessageSerializer msgSer = 
msgFactory.serializer(msg.directType());
+
+                finished = msgSer.readFrom(msg, buf, reader);
+            }
 
             if (finished) {
                 if (reader != null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index aaa78cd1769..849e6485eae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -57,6 +57,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
 import org.apache.ignite.internal.processors.tracing.MTC;
 import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
 import org.apache.ignite.internal.processors.tracing.NoopSpan;
@@ -85,7 +86,9 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -289,6 +292,9 @@ public class GridNioServer<T> {
     /** Tracing processor. */
     private Tracing tracing;
 
+    /** Message factory. */
+    private final MessageFactory msgFactory;
+
     /**
      * @param addr Address.
      * @param port Port.
@@ -341,6 +347,7 @@ public class GridNioServer<T> {
         @Nullable GridWorkerListener workerLsnr,
         @Nullable MetricRegistryImpl mreg,
         Tracing tracing,
+        MessageFactory msgFactory,
         GridNioFilter... filters
     ) throws IgniteCheckedException {
         if (port != -1)
@@ -368,6 +375,7 @@ public class GridNioServer<T> {
         this.readWriteSelectorsAssign = readWriteSelectorsAssign;
         this.lsnr = lsnr;
         this.tracing = tracing == null ? new NoopTracing() : tracing;
+        this.msgFactory = msgFactory;
 
         filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), 
filters);
 
@@ -543,6 +551,13 @@ public class GridNioServer<T> {
         return locAddr;
     }
 
+    /**
+     * @return Message factory.
+     */
+    public MessageFactory messageFactory() {
+        return msgFactory;
+    }
+
     /**
      * @return Selector spins.
      */
@@ -1593,7 +1608,16 @@ public class GridNioServer<T> {
 
                 int startPos = buf.position();
 
-                finished = msg.writeTo(buf, writer);
+                if (messageFactory() == null) {
+                    assert msg instanceof ClientMessage;  // TODO: Will 
refactor in IGNITE-26554.
+
+                    finished = msg.writeTo(buf, writer);
+                }
+                else {
+                    MessageSerializer msgSer = 
messageFactory().serializer(msg.directType());
+
+                    finished = msgSer.writeTo(msg, buf, writer);
+                }
 
                 span.addTag(SOCKET_WRITE_BYTES, () -> 
Integer.toString(buf.position() - startPos));
 
@@ -1783,7 +1807,16 @@ public class GridNioServer<T> {
 
                 int startPos = buf.position();
 
-                finished = msg.writeTo(buf, writer);
+                if (msgFactory == null) {
+                    assert msg instanceof ClientMessage;  // TODO: Will 
refactor in IGNITE-26554.
+
+                    finished = msg.writeTo(buf, writer);
+                }
+                else {
+                    MessageSerializer msgSer = 
msgFactory.serializer(msg.directType());
+
+                    finished = msgSer.writeTo(msg, buf, writer);
+                }
 
                 span.addTag(SOCKET_WRITE_BYTES, () -> 
Integer.toString(buf.position() - startPos));
 
@@ -3860,6 +3893,9 @@ public class GridNioServer<T> {
         /** Tracing processor */
         private Tracing tracing;
 
+        /** Message factory. */
+        private MessageFactory msgFactory;
+
         /**
          * Finishes building the instance.
          *
@@ -3891,6 +3927,7 @@ public class GridNioServer<T> {
                 workerLsnr,
                 mreg,
                 tracing,
+                msgFactory,
                 filters != null ? Arrays.copyOf(filters, filters.length) : 
EMPTY_FILTERS
             );
 
@@ -4165,6 +4202,16 @@ public class GridNioServer<T> {
 
             return this;
         }
+
+        /**
+         * @param msgFactory Message factory.
+         * @return This for chaining.
+         */
+        public Builder<T> messageFactory(MessageFactory msgFactory) {
+            this.msgFactory = msgFactory;
+
+            return this;
+        }
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
index 4979ce253b6..80f41b7910c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.plugin.extensions.communication;
 
-import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.plugin.Extension;
 
@@ -35,20 +34,18 @@ public interface MessageFormatter extends Extension {
     /**
      * Creates new message writer instance.
      *
-     * @param rmtNodeId Remote node ID.
      * @param msgFactory Message factory.
      * @return Message writer.
      * @throws IgniteCheckedException In case of error.
      */
-    public MessageWriter writer(UUID rmtNodeId, MessageFactory msgFactory) 
throws IgniteCheckedException;
+    public MessageWriter writer(MessageFactory msgFactory) throws 
IgniteCheckedException;
 
     /**
      * Creates new message reader instance.
      *
-     * @param rmtNodeId Remote node ID.
      * @param msgFactory Message factory.
      * @return Message reader.
      * @throws IgniteCheckedException In case of error.
      */
-    public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) 
throws IgniteCheckedException;
+    public MessageReader reader(MessageFactory msgFactory) throws 
IgniteCheckedException;
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index dfa7745d89b..bcb5b00c7c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -776,11 +776,11 @@ public abstract class IgniteSpiAdapter implements 
IgniteSpi {
 
             if (msgFormatter0 == null) {
                 msgFormatter0 = new MessageFormatter() {
-                    @Override public MessageWriter writer(UUID rmtNodeId, 
MessageFactory msgFactory) {
+                    @Override public MessageWriter writer(MessageFactory 
msgFactory) {
                         throw new IgniteException("Failed to write message, 
node is not started.");
                     }
 
-                    @Override public MessageReader reader(UUID rmtNodeId, 
MessageFactory msgFactory) {
+                    @Override public MessageReader reader(MessageFactory 
msgFactory) {
                         throw new IgniteException("Failed to read message, 
node is not started.");
                     }
                 };
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 27e2d69dc23..c74814b76af 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -19,7 +19,6 @@ package org.apache.ignite.spi.communication.tcp;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.nio.channels.Channel;
 import java.util.BitSet;
 import java.util.Collection;
@@ -822,6 +821,11 @@ public class TcpCommunicationSpi extends 
TcpCommunicationConfigInitializer {
         ctxInitLatch.countDown();
     }
 
+    /** @return {@code true} if {@code IgniteSpiContext} is initialized. */
+    public boolean spiContextInitialized() {
+        return ctxInitLatch.getCount() == 0;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteSpiContext getSpiContext() {
         if (ctxInitLatch.getCount() > 0) {
@@ -1169,17 +1173,6 @@ public class TcpCommunicationSpi extends 
TcpCommunicationConfigInitializer {
         return S.toString(TcpCommunicationSpi.class, this);
     }
 
-    /**
-     * Write message type to byte buffer.
-     *
-     * @param buf Byte buffer.
-     * @param type Message type.
-     */
-    public static void writeMessageType(ByteBuffer buf, short type) {
-        buf.put((byte)(type & 0xFF));
-        buf.put((byte)((type >> 8) & 0xFF));
-    }
-
     /**
      * Concatenates the two parameter bytes to form a message type value.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ClusterStateProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ClusterStateProvider.java
index f6fe46cb181..b75bc033d9a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ClusterStateProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ClusterStateProvider.java
@@ -140,6 +140,13 @@ public class ClusterStateProvider {
         return spiCtxWithoutLatchSupplier.get();
     }
 
+    /**
+     * @return {@code true} if {@code IgniteSpiContext} is available.
+     */
+    public boolean spiContextAvailable() {
+        return tcpCommSpi.spiContextInitialized();
+    }
+
     /**
      * @return Outbound messages queue size.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
index d35471fa52a..c88a9ba00a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
@@ -50,6 +50,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManager;
 import org.apache.ignite.internal.managers.tracing.GridTracingManager;
 import org.apache.ignite.internal.processors.metric.GridMetricManager;
@@ -106,6 +108,7 @@ import static 
org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.CONN_IDX_META;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.CONSISTENT_ID_META;
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE;
 import static 
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.handshakeTimeoutException;
 import static 
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.isRecoverableException;
 import static 
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.nodeAddresses;
@@ -817,6 +820,10 @@ public class GridNioServerWrapper {
                     }
 
                     @Override public MessageSerializer serializer(short type) {
+                        // Enable sending wait message for a communication 
peer while context isn't initialized.
+                        if (impl == null && type == HANDSHAKE_WAIT_MSG_TYPE)
+                            return new HandshakeWaitMessageSerializer();
+
                         return get().serializer(type);
                     }
 
@@ -848,9 +855,7 @@ public class GridNioServerWrapper {
 
                         assert formatter != null;
 
-                        ConnectionKey key = ses.meta(CONN_IDX_META);
-
-                        return key != null ? formatter.reader(key.nodeId(), 
msgFactory) : null;
+                        return formatter.reader(msgFactory);
                     }
                 };
 
@@ -860,6 +865,10 @@ public class GridNioServerWrapper {
                     private MessageFormatter formatter;
 
                     @Override public MessageWriter writer(GridNioSession ses) 
throws IgniteCheckedException {
+                        // Enable sending wait message for a communication 
peer while context isn't initialized.
+                        if (!stateProvider.spiContextAvailable())
+                            return new DirectMessageWriter(msgFactory);
+
                         final IgniteSpiContext ctx = 
stateProvider.getSpiContextWithoutInitialLatch();
 
                         if (formatter == null || context != ctx) {
@@ -870,9 +879,7 @@ public class GridNioServerWrapper {
 
                         assert formatter != null;
 
-                        ConnectionKey key = ses.meta(CONN_IDX_META);
-
-                        return key != null ? formatter.writer(key.nodeId(), 
msgFactory) : null;
+                        return formatter.writer(msgFactory);
                     }
                 };
 
@@ -933,7 +940,8 @@ public class GridNioServerWrapper {
                     .skipRecoveryPredicate(skipRecoveryPred)
                     .messageQueueSizeListener(queueSizeMonitor)
                     .tracing(tracing)
-                    .readWriteSelectorsAssign(cfg.usePairedConnections());
+                    .readWriteSelectorsAssign(cfg.usePairedConnections())
+                    .messageFactory(msgFactory);
 
                 if (metricMgr != null) {
                     builder.workerListener(workersRegistry)
@@ -1199,7 +1207,7 @@ public class GridNioServerWrapper {
         handshakeTimeoutExecutorService.schedule(timeoutObj, timeout, 
TimeUnit.MILLISECONDS);
 
         try {
-            return tcpHandshakeExecutor.tcpHandshake(ch, rmtNodeId, sslMeta, 
msg);
+            return tcpHandshakeExecutor.tcpHandshake(ch, rmtNodeId, sslMeta, 
msg, stateProvider.getSpiContext());
         }
         finally {
             if (!timeoutObj.cancel())
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
index a288f95dcc7..b6cb5552c3b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
@@ -353,7 +353,7 @@ public class InboundConnectionHandler extends 
GridNioServerListenerAdapter<Messa
 
                     assert fut != null : msg;
 
-                    
fut.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0));
+                    fut.onConnected(((NodeIdMessage)msg).nodeId());
 
                     nioSrvWrapper.nio().closeFromWorkerThread(ses);
 
@@ -468,7 +468,7 @@ public class InboundConnectionHandler extends 
GridNioServerListenerAdapter<Messa
         ConnectionKey connKey;
 
         if (msg instanceof NodeIdMessage) {
-            sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0);
+            sndId = ((NodeIdMessage)msg).nodeId();
             connKey = new ConnectionKey(sndId, 0, -1);
         }
         else {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java
index bd6cbc7cd73..12c1a932ab1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java
@@ -25,12 +25,19 @@ import java.util.UUID;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import 
org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
 import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
 import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
 import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
 import 
org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
+import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.plugin.extensions.communication.Message.DIRECT_TYPE_SIZE;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE;
@@ -69,6 +76,7 @@ public class TcpHandshakeExecutor {
      * @param rmtNodeId Expected remote node.
      * @param sslMeta Required data for ssl.
      * @param msg Handshake message which should be sent during handshake.
+     * @param spiCtx Spi context.
      * @return Handshake response from predefined variants from {@link 
RecoveryLastReceivedMessage}.
      * @throws IgniteCheckedException If handshake failed.
      */
@@ -76,18 +84,17 @@ public class TcpHandshakeExecutor {
         SocketChannel ch,
         UUID rmtNodeId,
         GridSslMeta sslMeta,
-        HandshakeMessage msg
+        HandshakeMessage msg,
+        IgniteSpiContext spiCtx
     ) throws IgniteCheckedException {
         BlockingTransport transport = stateProvider.isSslEnabled() ?
-            new SslTransport(sslMeta, ch, directBuffer, log) : new 
TcpTransport(ch);
+            new SslTransport(sslMeta, ch, directBuffer, log, spiCtx) : new 
TcpTransport(ch, spiCtx);
 
-        ByteBuffer buf = transport.receiveNodeId();
+        UUID rmtNodeId0 = transport.receiveNodeId();
 
-        if (buf == null)
+        if (rmtNodeId0 == null)
             return NEED_WAIT;
 
-        UUID rmtNodeId0 = U.bytesToUuid(buf.array(), DIRECT_TYPE_SIZE);
-
         if (!rmtNodeId.equals(rmtNodeId0))
             throw new HandshakeException("Remote node ID is not as expected 
[expected=" + rmtNodeId + ", rcvd=" + rmtNodeId0 + ']');
         else if (log.isDebugEnabled())
@@ -98,9 +105,7 @@ public class TcpHandshakeExecutor {
 
         transport.sendHandshake(msg);
 
-        buf = transport.receiveAcknowledge();
-
-        long rcvCnt = buf.getLong(DIRECT_TYPE_SIZE);
+        long rcvCnt = transport.receiveAcknowledge();
 
         if (log.isDebugEnabled())
             log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", 
rcvCnt=" + rcvCnt + ']');
@@ -119,13 +124,29 @@ public class TcpHandshakeExecutor {
      * Encapsulates handshake logic.
      */
     private abstract static class BlockingTransport {
+        /** Message reader. */
+        private final MessageReader reader;
+
+        /** Message writer. */
+        private final MessageWriter writer;
+
+        /** Message factory. */
+        private final MessageFactory msgFactory;
+
+        /** */
+        BlockingTransport(IgniteSpiContext spiCtx) throws 
IgniteCheckedException {
+            msgFactory = spiCtx.messageFactory();
+            reader = spiCtx.messageFormatter().reader(msgFactory);
+            writer = spiCtx.messageFormatter().writer(msgFactory);
+        }
+
         /**
          * Receive {@link NodeIdMessage}.
          *
-         * @return Buffer with {@link NodeIdMessage}.
+         * @return UUID from {@link NodeIdMessage}, or {@code null} if need 
wait.
          * @throws IgniteCheckedException If failed.
          */
-        ByteBuffer receiveNodeId() throws IgniteCheckedException {
+        @Nullable UUID receiveNodeId() throws IgniteCheckedException {
             ByteBuffer buf = 
ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE)
                     .order(ByteOrder.LITTLE_ENDIAN);
 
@@ -140,12 +161,21 @@ public class TcpHandshakeExecutor {
 
                     if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
                         return null;
+
+                    assert msgType == TcpCommunicationSpi.NODE_ID_MSG_TYPE;
                 }
 
                 totalBytes += readBytes;
             }
 
-            return buf;
+            buf.position(DIRECT_TYPE_SIZE);
+
+            NodeIdMessage nodeIdMsg = new NodeIdMessage();
+
+            msgFactory.serializer(nodeIdMsg.directType()).readFrom(nodeIdMsg, 
buf, reader);
+            reader.reset();
+
+            return nodeIdMsg.nodeId();
         }
 
         /**
@@ -159,7 +189,8 @@ public class TcpHandshakeExecutor {
                     .order(ByteOrder.LITTLE_ENDIAN)
                     .put(U.IGNITE_HEADER);
 
-            msg.writeTo(buf, null);
+            msgFactory.serializer(msg.directType()).writeTo(msg, buf, writer);
+
             buf.flip();
 
             write(buf);
@@ -168,24 +199,54 @@ public class TcpHandshakeExecutor {
         /**
          * Receive {@link RecoveryLastReceivedMessage} acknowledge message.
          *
-         * @return Buffer with message.
-         * @throws IgniteCheckedException If failed.
+         * @return Received count.
          */
-        ByteBuffer receiveAcknowledge() throws IgniteCheckedException {
+        long receiveAcknowledge() throws IgniteCheckedException {
             ByteBuffer buf = 
ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE)
                     .order(ByteOrder.LITTLE_ENDIAN);
 
-            for (int totalBytes = 0; totalBytes < 
RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                int readBytes = read(buf);
+            boolean fininshed = false;
+
+            RecoveryLastReceivedMessage msg = new 
RecoveryLastReceivedMessage();
+            RecoveryLastReceivedMessageSerializer msgSer =
+                
(RecoveryLastReceivedMessageSerializer)msgFactory.serializer(msg.directType());
+
+            short msgType = 0;
+            int readPos = 0;
+            int readBytes = 0;
+
+            // Might read less than MESSAGE_FULL_SIZE, due to optimizaton for 
writing long values.
+            // For this reason read byte by byte while not finished. To avoid 
case when we read more than needed.
+            while (!fininshed) {
+                // Read byte by byte.
+                buf.limit(buf.position() + 1);
+
+                readBytes += read(buf);
 
                 if (readBytes == -1)
-                    throw new HandshakeException("Failed to read remote node 
recovery handshake " +
-                            "(connection closed).");
+                    throw new HandshakeException("Failed to read remote node 
recovery handshake (connection closed).");
 
-                totalBytes += readBytes;
+                if (msgType == 0 && readBytes >= DIRECT_TYPE_SIZE) {
+                    msgType = makeMessageType(buf.get(0), buf.get(1));
+
+                    assert msgType == msg.directType();
+
+                    readPos = DIRECT_TYPE_SIZE;
+                }
+
+                if (msgType == 0)
+                    continue;
+
+                buf.position(readPos);
+
+                fininshed = msgSer.readFrom(msg, buf, reader);
+
+                readPos = buf.position();
             }
 
-            return buf;
+            reader.reset();
+
+            return msg.received();
         }
 
         /**
@@ -222,7 +283,9 @@ public class TcpHandshakeExecutor {
         private final SocketChannel ch;
 
         /** */
-        TcpTransport(SocketChannel ch) {
+        TcpTransport(SocketChannel ch, IgniteSpiContext spiCtx) throws 
IgniteCheckedException {
+            super(spiCtx);
+
             this.ch = ch;
         }
 
@@ -262,7 +325,15 @@ public class TcpHandshakeExecutor {
         private final ByteBuffer readBuf;
 
         /** */
-        SslTransport(GridSslMeta meta, SocketChannel ch, boolean directBuf, 
IgniteLogger log) throws IgniteCheckedException {
+        SslTransport(
+            GridSslMeta meta,
+            SocketChannel ch,
+            boolean directBuf,
+            IgniteLogger log,
+            IgniteSpiContext spiCtx
+        ) throws IgniteCheckedException {
+            super(spiCtx);
+
             try {
                 this.ch = ch;
                 handler = new BlockingSslHandler(meta.sslEngine(), ch, 
directBuf, ByteOrder.LITTLE_ENDIAN, log);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
index 1cdfde15e46..7951170e028 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
@@ -17,37 +17,36 @@
 
 package org.apache.ignite.spi.communication.tcp.messages;
 
-import java.nio.ByteBuffer;
 import java.util.UUID;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 
 /**
  * Handshake message.
  */
-@IgniteCodeGeneratingFail
 public class HandshakeMessage implements Message {
     /** Message body size in bytes. */
-    private static final int MESSAGE_SIZE = 36;
+    private static final int MESSAGE_SIZE = 36 + 1; // additional byte for 
null flag of UUID value.
 
     /** Full message size (with message type) in bytes. */
     public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + 
DIRECT_TYPE_SIZE;
 
     /** */
+    @Order(0)
     private UUID nodeId;
 
     /** */
+    @Order(value = 1, method = "received")
     private long rcvCnt;
 
     /** */
+    @Order(value = 2, method = "connectCount")
     private long connectCnt;
 
     /** */
+    @Order(value = 3, method = "connectionIndex")
     private int connIdx;
 
     /**
@@ -102,52 +101,38 @@ public class HandshakeMessage implements Message {
     }
 
     /**
-     * @return Message size in bytes.
+     * @param connIdx Connection index.
      */
-    public int getMessageSize() {
-        return MESSAGE_FULL_SIZE;
+    public void connectionIndex(int connIdx) {
+        this.connIdx = connIdx;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        if (buf.remaining() < MESSAGE_FULL_SIZE)
-            return false;
-
-        TcpCommunicationSpi.writeMessageType(buf, directType());
-
-        byte[] bytes = U.uuidToBytes(nodeId);
-
-        assert bytes.length == 16 : bytes.length;
-
-        buf.put(bytes);
-
-        buf.putLong(rcvCnt);
-
-        buf.putLong(connectCnt);
-
-        buf.putInt(connIdx);
-
-        return true;
+    /**
+     * @param connectCnt Connect count.
+     */
+    public void connectCount(long connectCnt) {
+        this.connectCnt = connectCnt;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        if (buf.remaining() < MESSAGE_SIZE)
-            return false;
-
-        byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
-
-        buf.get(nodeIdBytes);
-
-        nodeId = U.bytesToUuid(nodeIdBytes, 0);
-
-        rcvCnt = buf.getLong();
-
-        connectCnt = buf.getLong();
+    /**
+     * @param rcvCnt Number of received messages.
+     */
+    public void received(long rcvCnt) {
+        this.rcvCnt = rcvCnt;
+    }
 
-        connIdx = buf.getInt();
+    /**
+     * @param nodeId Node ID.
+     */
+    public void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
 
-        return true;
+    /**
+     * @return Message size in bytes.
+     */
+    public int getMessageSize() {
+        return MESSAGE_FULL_SIZE;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
index 4c4f052e76d..b1f95a8fb92 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
@@ -17,17 +17,13 @@
 
 package org.apache.ignite.spi.communication.tcp.messages;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE;
 
 /**
- * Message requesting to wait until node's SPI context initialize.
+ * Message requesting to wait until node's SPI context initialize.
  */
 public class HandshakeWaitMessage implements Message {
     /** Full message size (with message type) in bytes. */
@@ -40,21 +36,6 @@ public class HandshakeWaitMessage implements Message {
         // No-op.
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        if (buf.remaining() < MESSAGE_FULL_SIZE)
-            return false;
-
-        TcpCommunicationSpi.writeMessageType(buf, directType());
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        return true;
-    }
-
     /** {@inheritDoc} */
     @Override public short directType() {
         return HANDSHAKE_WAIT_MSG_TYPE;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
index ac935a8375b..8b28a129d68 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
@@ -17,29 +17,25 @@
 
 package org.apache.ignite.spi.communication.tcp.messages;
 
-import java.nio.ByteBuffer;
 import java.util.UUID;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 
 /**
  * Node ID message.
  */
-@IgniteCodeGeneratingFail
 public class NodeIdMessage implements Message {
     /** Message body size (with message type) in bytes. */
-    static final int MESSAGE_SIZE = 16;
+    static final int MESSAGE_SIZE = 1 + 16;  // null flag, UUID value.
 
     /** Full message size (with message type) in bytes. */
     public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + 
DIRECT_TYPE_SIZE;
 
     /** */
-    private byte[] nodeIdBytes;
+    @Order(0)
+    private UUID nodeId;
 
     /** */
     public NodeIdMessage() {
@@ -52,42 +48,21 @@ public class NodeIdMessage implements Message {
     public NodeIdMessage(UUID nodeId) {
         assert nodeId != null;
 
-        nodeIdBytes = U.uuidToBytes(nodeId);
-
-        assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + 
MESSAGE_SIZE;
+        this.nodeId = nodeId;
     }
 
     /**
      * @return Node ID bytes.
      */
-    public byte[] nodeIdBytes() {
-        return nodeIdBytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        assert nodeIdBytes.length == MESSAGE_SIZE;
-
-        if (buf.remaining() < MESSAGE_FULL_SIZE)
-            return false;
-
-        TcpCommunicationSpi.writeMessageType(buf, directType());
-
-        buf.put(nodeIdBytes);
-
-        return true;
+    public UUID nodeId() {
+        return nodeId;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        if (buf.remaining() < MESSAGE_SIZE)
-            return false;
-
-        nodeIdBytes = new byte[MESSAGE_SIZE];
-
-        buf.get(nodeIdBytes);
-
-        return true;
+    /**
+     * @param nodeId Node ID bytes.
+     */
+    public void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
index 3df65ddb57d..76c961ed0e1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
@@ -17,18 +17,14 @@
 
 package org.apache.ignite.spi.communication.tcp.messages;
 
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 
 /**
  * Recovery acknowledgment message.
  */
-@IgniteCodeGeneratingFail
 public class RecoveryLastReceivedMessage implements Message {
     /** */
     public static final long ALREADY_CONNECTED = -1;
@@ -49,6 +45,7 @@ public class RecoveryLastReceivedMessage implements Message {
     public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + 
DIRECT_TYPE_SIZE;
 
     /** */
+    @Order(value = 0, method = "received")
     private long rcvCnt;
 
     /**
@@ -72,26 +69,11 @@ public class RecoveryLastReceivedMessage implements Message 
{
         return rcvCnt;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        if (buf.remaining() < MESSAGE_FULL_SIZE)
-            return false;
-
-        TcpCommunicationSpi.writeMessageType(buf, directType());
-
-        buf.putLong(rcvCnt);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        if (buf.remaining() < MESSAGE_SIZE)
-            return false;
-
-        rcvCnt = buf.getLong();
-
-        return true;
+    /**
+     * @param rcvCnt Number of received messages.
+     */
+    public void received(long rcvCnt) {
+        this.rcvCnt = rcvCnt;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java
index 0657da4a940..6750812a6a1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.managers.communication;
 
+import java.util.UUID;
 import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -35,11 +35,8 @@ public class IgniteIoCommunicationMessageSerializationTest 
extends AbstractCommu
 
     /** {@inheritDoc} */
     @Override protected Message initializeMessage(Message msg) throws 
Exception {
-        if (msg instanceof NodeIdMessage) {
-            int msgSize = U.field(NodeIdMessage.class, "MESSAGE_SIZE");
-
-            FieldUtils.writeField(msg, "nodeIdBytes", new byte[msgSize], true);
-        }
+        if (msg instanceof NodeIdMessage)
+            FieldUtils.writeField(msg, "nodeId", UUID.randomUUID(), true);
 
         return msg;
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationHandshakeTimeoutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationHandshakeTimeoutTest.java
index 22ed899f373..46c0e85fbbb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationHandshakeTimeoutTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationHandshakeTimeoutTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.resource.DependencyResolver;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
+import org.apache.ignite.spi.IgniteSpiContext;
 import org.apache.ignite.spi.communication.tcp.internal.TcpHandshakeExecutor;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -136,7 +137,7 @@ public class TcpCommunicationHandshakeTimeoutTest extends 
GridCommonAbstractTest
 
         /** {@inheritDoc} */
         @Override public long tcpHandshake(SocketChannel ch, UUID rmtNodeId, 
GridSslMeta sslMeta,
-            HandshakeMessage msg) throws IgniteCheckedException {
+            HandshakeMessage msg, IgniteSpiContext spiCtx) throws 
IgniteCheckedException {
             if (needToDelayd.get()) {
                 needToDelayd.set(false);
 
@@ -144,7 +145,7 @@ public class TcpCommunicationHandshakeTimeoutTest extends 
GridCommonAbstractTest
                     LockSupport.parkNanos(10_000_000);
             }
 
-            return delegate.tcpHandshake(ch, rmtNodeId, sslMeta, msg);
+            return delegate.tcpHandshake(ch, rmtNodeId, sslMeta, msg, spiCtx);
         }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 7f5d876e21f..bd14ef0e300 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -543,11 +543,11 @@ public class GridSpiTestContext implements 
IgniteSpiContext {
     @Override public MessageFormatter messageFormatter() {
         if (formatter == null) {
             formatter = new MessageFormatter() {
-                @Override public MessageWriter writer(UUID rmtNodeId, 
MessageFactory msgFactory) {
+                @Override public MessageWriter writer(MessageFactory 
msgFactory) {
                     return new DirectMessageWriter(msgFactory);
                 }
 
-                @Override public MessageReader reader(UUID rmtNodeId, 
MessageFactory msgFactory) {
+                @Override public MessageReader reader(MessageFactory 
msgFactory) {
                     return new DirectMessageReader(msgFactory, null);
                 }
             };

Reply via email to