This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6c0df41f0b2 IGNITE-25482 Use MessageSerializer for handshake messages
(#12380)
6c0df41f0b2 is described below
commit 6c0df41f0b2f76899da11e7bb99612ac1d9d4bba
Author: Maksim Timonin <[email protected]>
AuthorDate: Fri Oct 17 18:04:25 2025 +0300
IGNITE-25482 Use MessageSerializer for handshake messages (#12380)
---
.../apache/ignite/internal/MessageProcessor.java | 6 +-
.../internal/MessageSerializerGenerator.java | 7 +-
.../managers/communication/GridIoManager.java | 6 +-
.../communication/GridIoMessageFactory.java | 13 ++-
.../ignite/internal/util/nio/GridDirectParser.java | 8 +-
.../ignite/internal/util/nio/GridNioServer.java | 51 ++++++++-
.../extensions/communication/MessageFormatter.java | 7 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 4 +-
.../spi/communication/tcp/TcpCommunicationSpi.java | 17 +--
.../tcp/internal/ClusterStateProvider.java | 7 ++
.../tcp/internal/GridNioServerWrapper.java | 24 +++--
.../tcp/internal/InboundConnectionHandler.java | 4 +-
.../tcp/internal/TcpHandshakeExecutor.java | 119 ++++++++++++++++-----
.../tcp/messages/HandshakeMessage.java | 77 ++++++-------
.../tcp/messages/HandshakeWaitMessage.java | 21 +---
.../communication/tcp/messages/NodeIdMessage.java | 49 +++------
.../tcp/messages/RecoveryLastReceivedMessage.java | 32 ++----
...iteIoCommunicationMessageSerializationTest.java | 9 +-
.../tcp/TcpCommunicationHandshakeTimeoutTest.java | 5 +-
.../ignite/testframework/GridSpiTestContext.java | 4 +-
20 files changed, 263 insertions(+), 207 deletions(-)
diff --git
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageProcessor.java
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageProcessor.java
index 177d8a87ad2..afbc411bb32 100644
---
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageProcessor.java
+++
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageProcessor.java
@@ -68,11 +68,15 @@ public class MessageProcessor extends AbstractProcessor {
/** Base interface that every message must implement. */
static final String MESSAGE_INTERFACE =
"org.apache.ignite.plugin.extensions.communication.Message";
+ /** This is the only message with zero fields. A serializer must be
generated due to restrictions in our communication process. */
+ static final String HANDSHAKE_WAIT_MESSAGE =
"org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage";
+
/**
* Processes all classes implementing the {@code Message} interface and
generates corresponding serializer code.
*/
@Override public boolean process(Set<? extends TypeElement> annotations,
RoundEnvironment roundEnv) {
TypeMirror msgType =
processingEnv.getElementUtils().getTypeElement(MESSAGE_INTERFACE).asType();
+ TypeMirror handshakeWaitMsgType =
processingEnv.getElementUtils().getTypeElement(HANDSHAKE_WAIT_MESSAGE).asType();
Map<TypeElement, List<VariableElement>> msgFields = new HashMap<>();
@@ -90,7 +94,7 @@ public class MessageProcessor extends AbstractProcessor {
List<VariableElement> fields = orderedFields(clazz);
- if (!fields.isEmpty())
+ if (!fields.isEmpty() ||
processingEnv.getTypeUtils().isAssignable(clazz.asType(), handshakeWaitMsgType))
msgFields.put(clazz, fields);
}
diff --git
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
index d052aeb1dda..2ad8b96c728 100644
---
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
+++
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
@@ -650,9 +650,10 @@ class MessageSerializerGenerator {
/** */
private void finish(List<String> code) {
- // Remove the last empty line for the last "case".
- String removed = code.remove(code.size() - 1);
- assert EMPTY.equals(removed) : removed;
+ String lastLine = code.get(code.size() - 1);
+
+ if (EMPTY.equals(lastLine))
+ code.remove(code.size() - 1);
code.add(line("}"));
code.add(EMPTY);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index f2f23c443c3..280f4fb1e77 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -433,13 +433,11 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
}
else {
formatter = new MessageFormatter() {
- @Override public MessageWriter writer(UUID rmtNodeId,
MessageFactory msgFactory) {
- assert rmtNodeId != null;
-
+ @Override public MessageWriter writer(MessageFactory
msgFactory) {
return new DirectMessageWriter(msgFactory);
}
- @Override public MessageReader reader(UUID rmtNodeId,
MessageFactory msgFactory) {
+ @Override public MessageReader reader(MessageFactory
msgFactory) {
return new DirectMessageReader(msgFactory,
ctx.cacheObjects());
}
};
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 920eef60d96..752448712fc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -87,6 +87,8 @@ import
org.apache.ignite.internal.codegen.GridQueryNextPageRequestSerializer;
import org.apache.ignite.internal.codegen.GridQueryNextPageResponseSerializer;
import org.apache.ignite.internal.codegen.GridTaskCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridTaskResultRequestSerializer;
+import org.apache.ignite.internal.codegen.HandshakeMessageSerializer;
+import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
import
org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
import
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
@@ -96,6 +98,8 @@ import
org.apache.ignite.internal.codegen.MetadataRequestMessageSerializer;
import
org.apache.ignite.internal.codegen.MissingMappingRequestMessageSerializer;
import
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
+import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
+import
org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
import
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
import
org.apache.ignite.internal.codegen.ServiceSingleNodeDeploymentResultBatchSerializer;
import org.apache.ignite.internal.codegen.SessionChannelMessageSerializer;
@@ -271,10 +275,11 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)-25, TxLock::new, new TxLockSerializer());
factory.register((short)-24, TxLocksRequest::new, new
TxLocksRequestSerializer());
factory.register((short)-23, TxLocksResponse::new, new
TxLocksResponseSerializer());
- factory.register(TcpCommunicationSpi.NODE_ID_MSG_TYPE,
NodeIdMessage::new);
- factory.register(TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE,
RecoveryLastReceivedMessage::new);
- factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE,
HandshakeMessage::new);
- factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE,
HandshakeWaitMessage::new);
+ factory.register(TcpCommunicationSpi.NODE_ID_MSG_TYPE,
NodeIdMessage::new, new NodeIdMessageSerializer());
+ factory.register(TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE,
RecoveryLastReceivedMessage::new,
+ new RecoveryLastReceivedMessageSerializer());
+ factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE,
HandshakeMessage::new, new HandshakeMessageSerializer());
+ factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE,
HandshakeWaitMessage::new, new HandshakeWaitMessageSerializer());
factory.register((short)0, GridJobCancelRequest::new, new
GridJobCancelRequestSerializer());
factory.register((short)1, GridJobExecuteRequest::new);
factory.register((short)2, GridJobExecuteResponse::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index cde984913a8..65db20856c9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
@@ -83,8 +84,11 @@ public class GridDirectParser implements GridNioParser {
boolean finished = false;
- if (msg != null && buf.hasRemaining())
- finished = msg.readFrom(buf, reader);
+ if (msg != null && buf.hasRemaining()) {
+ MessageSerializer msgSer =
msgFactory.serializer(msg.directType());
+
+ finished = msgSer.readFrom(msg, buf, reader);
+ }
if (finished) {
if (reader != null)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index aaa78cd1769..849e6485eae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -57,6 +57,7 @@ import
org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.NoopSpan;
@@ -85,7 +86,9 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -289,6 +292,9 @@ public class GridNioServer<T> {
/** Tracing processor. */
private Tracing tracing;
+ /** Message factory. */
+ private final MessageFactory msgFactory;
+
/**
* @param addr Address.
* @param port Port.
@@ -341,6 +347,7 @@ public class GridNioServer<T> {
@Nullable GridWorkerListener workerLsnr,
@Nullable MetricRegistryImpl mreg,
Tracing tracing,
+ MessageFactory msgFactory,
GridNioFilter... filters
) throws IgniteCheckedException {
if (port != -1)
@@ -368,6 +375,7 @@ public class GridNioServer<T> {
this.readWriteSelectorsAssign = readWriteSelectorsAssign;
this.lsnr = lsnr;
this.tracing = tracing == null ? new NoopTracing() : tracing;
+ this.msgFactory = msgFactory;
filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(),
filters);
@@ -543,6 +551,13 @@ public class GridNioServer<T> {
return locAddr;
}
+ /**
+ * @return Message factory.
+ */
+ public MessageFactory messageFactory() {
+ return msgFactory;
+ }
+
/**
* @return Selector spins.
*/
@@ -1593,7 +1608,16 @@ public class GridNioServer<T> {
int startPos = buf.position();
- finished = msg.writeTo(buf, writer);
+ if (messageFactory() == null) {
+ assert msg instanceof ClientMessage; // TODO: Will
refactor in IGNITE-26554.
+
+ finished = msg.writeTo(buf, writer);
+ }
+ else {
+ MessageSerializer msgSer =
messageFactory().serializer(msg.directType());
+
+ finished = msgSer.writeTo(msg, buf, writer);
+ }
span.addTag(SOCKET_WRITE_BYTES, () ->
Integer.toString(buf.position() - startPos));
@@ -1783,7 +1807,16 @@ public class GridNioServer<T> {
int startPos = buf.position();
- finished = msg.writeTo(buf, writer);
+ if (msgFactory == null) {
+ assert msg instanceof ClientMessage; // TODO: Will
refactor in IGNITE-26554.
+
+ finished = msg.writeTo(buf, writer);
+ }
+ else {
+ MessageSerializer msgSer =
msgFactory.serializer(msg.directType());
+
+ finished = msgSer.writeTo(msg, buf, writer);
+ }
span.addTag(SOCKET_WRITE_BYTES, () ->
Integer.toString(buf.position() - startPos));
@@ -3860,6 +3893,9 @@ public class GridNioServer<T> {
/** Tracing processor */
private Tracing tracing;
+ /** Message factory. */
+ private MessageFactory msgFactory;
+
/**
* Finishes building the instance.
*
@@ -3891,6 +3927,7 @@ public class GridNioServer<T> {
workerLsnr,
mreg,
tracing,
+ msgFactory,
filters != null ? Arrays.copyOf(filters, filters.length) :
EMPTY_FILTERS
);
@@ -4165,6 +4202,16 @@ public class GridNioServer<T> {
return this;
}
+
+ /**
+ * @param msgFactory Message factory.
+ * @return This for chaining.
+ */
+ public Builder<T> messageFactory(MessageFactory msgFactory) {
+ this.msgFactory = msgFactory;
+
+ return this;
+ }
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
index 4979ce253b6..80f41b7910c 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
@@ -17,7 +17,6 @@
package org.apache.ignite.plugin.extensions.communication;
-import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.plugin.Extension;
@@ -35,20 +34,18 @@ public interface MessageFormatter extends Extension {
/**
* Creates new message writer instance.
*
- * @param rmtNodeId Remote node ID.
* @param msgFactory Message factory.
* @return Message writer.
* @throws IgniteCheckedException In case of error.
*/
- public MessageWriter writer(UUID rmtNodeId, MessageFactory msgFactory)
throws IgniteCheckedException;
+ public MessageWriter writer(MessageFactory msgFactory) throws
IgniteCheckedException;
/**
* Creates new message reader instance.
*
- * @param rmtNodeId Remote node ID.
* @param msgFactory Message factory.
* @return Message reader.
* @throws IgniteCheckedException In case of error.
*/
- public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory)
throws IgniteCheckedException;
+ public MessageReader reader(MessageFactory msgFactory) throws
IgniteCheckedException;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index dfa7745d89b..bcb5b00c7c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -776,11 +776,11 @@ public abstract class IgniteSpiAdapter implements
IgniteSpi {
if (msgFormatter0 == null) {
msgFormatter0 = new MessageFormatter() {
- @Override public MessageWriter writer(UUID rmtNodeId,
MessageFactory msgFactory) {
+ @Override public MessageWriter writer(MessageFactory
msgFactory) {
throw new IgniteException("Failed to write message,
node is not started.");
}
- @Override public MessageReader reader(UUID rmtNodeId,
MessageFactory msgFactory) {
+ @Override public MessageReader reader(MessageFactory
msgFactory) {
throw new IgniteException("Failed to read message,
node is not started.");
}
};
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 27e2d69dc23..c74814b76af 100755
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -19,7 +19,6 @@ package org.apache.ignite.spi.communication.tcp;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.util.BitSet;
import java.util.Collection;
@@ -822,6 +821,11 @@ public class TcpCommunicationSpi extends
TcpCommunicationConfigInitializer {
ctxInitLatch.countDown();
}
+ /** @return {@code true} if {@code IgniteSpiContext} is initialized. */
+ public boolean spiContextInitialized() {
+ return ctxInitLatch.getCount() == 0;
+ }
+
/** {@inheritDoc} */
@Override public IgniteSpiContext getSpiContext() {
if (ctxInitLatch.getCount() > 0) {
@@ -1169,17 +1173,6 @@ public class TcpCommunicationSpi extends
TcpCommunicationConfigInitializer {
return S.toString(TcpCommunicationSpi.class, this);
}
- /**
- * Write message type to byte buffer.
- *
- * @param buf Byte buffer.
- * @param type Message type.
- */
- public static void writeMessageType(ByteBuffer buf, short type) {
- buf.put((byte)(type & 0xFF));
- buf.put((byte)((type >> 8) & 0xFF));
- }
-
/**
* Concatenates the two parameter bytes to form a message type value.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ClusterStateProvider.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ClusterStateProvider.java
index f6fe46cb181..b75bc033d9a 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ClusterStateProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ClusterStateProvider.java
@@ -140,6 +140,13 @@ public class ClusterStateProvider {
return spiCtxWithoutLatchSupplier.get();
}
+ /**
+ * @return {@code true} if {@code IgniteSpiContext} is available.
+ */
+ public boolean spiContextAvailable() {
+ return tcpCommSpi.spiContextInitialized();
+ }
+
/**
* @return Outbound messages queue size.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
index d35471fa52a..c88a9ba00a8 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
@@ -50,6 +50,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.GridManager;
import org.apache.ignite.internal.managers.tracing.GridTracingManager;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
@@ -106,6 +108,7 @@ import static
org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.CONN_IDX_META;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.CONSISTENT_ID_META;
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE;
import static
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.handshakeTimeoutException;
import static
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.isRecoverableException;
import static
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.nodeAddresses;
@@ -817,6 +820,10 @@ public class GridNioServerWrapper {
}
@Override public MessageSerializer serializer(short type) {
+ // Enable sending wait message for a communication
peer while context isn't initialized.
+ if (impl == null && type == HANDSHAKE_WAIT_MSG_TYPE)
+ return new HandshakeWaitMessageSerializer();
+
return get().serializer(type);
}
@@ -848,9 +855,7 @@ public class GridNioServerWrapper {
assert formatter != null;
- ConnectionKey key = ses.meta(CONN_IDX_META);
-
- return key != null ? formatter.reader(key.nodeId(),
msgFactory) : null;
+ return formatter.reader(msgFactory);
}
};
@@ -860,6 +865,10 @@ public class GridNioServerWrapper {
private MessageFormatter formatter;
@Override public MessageWriter writer(GridNioSession ses)
throws IgniteCheckedException {
+ // Enable sending wait message for a communication
peer while context isn't initialized.
+ if (!stateProvider.spiContextAvailable())
+ return new DirectMessageWriter(msgFactory);
+
final IgniteSpiContext ctx =
stateProvider.getSpiContextWithoutInitialLatch();
if (formatter == null || context != ctx) {
@@ -870,9 +879,7 @@ public class GridNioServerWrapper {
assert formatter != null;
- ConnectionKey key = ses.meta(CONN_IDX_META);
-
- return key != null ? formatter.writer(key.nodeId(),
msgFactory) : null;
+ return formatter.writer(msgFactory);
}
};
@@ -933,7 +940,8 @@ public class GridNioServerWrapper {
.skipRecoveryPredicate(skipRecoveryPred)
.messageQueueSizeListener(queueSizeMonitor)
.tracing(tracing)
- .readWriteSelectorsAssign(cfg.usePairedConnections());
+ .readWriteSelectorsAssign(cfg.usePairedConnections())
+ .messageFactory(msgFactory);
if (metricMgr != null) {
builder.workerListener(workersRegistry)
@@ -1199,7 +1207,7 @@ public class GridNioServerWrapper {
handshakeTimeoutExecutorService.schedule(timeoutObj, timeout,
TimeUnit.MILLISECONDS);
try {
- return tcpHandshakeExecutor.tcpHandshake(ch, rmtNodeId, sslMeta,
msg);
+ return tcpHandshakeExecutor.tcpHandshake(ch, rmtNodeId, sslMeta,
msg, stateProvider.getSpiContext());
}
finally {
if (!timeoutObj.cancel())
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
index a288f95dcc7..b6cb5552c3b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/InboundConnectionHandler.java
@@ -353,7 +353,7 @@ public class InboundConnectionHandler extends
GridNioServerListenerAdapter<Messa
assert fut != null : msg;
-
fut.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0));
+ fut.onConnected(((NodeIdMessage)msg).nodeId());
nioSrvWrapper.nio().closeFromWorkerThread(ses);
@@ -468,7 +468,7 @@ public class InboundConnectionHandler extends
GridNioServerListenerAdapter<Messa
ConnectionKey connKey;
if (msg instanceof NodeIdMessage) {
- sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0);
+ sndId = ((NodeIdMessage)msg).nodeId();
connKey = new ConnectionKey(sndId, 0, -1);
}
else {
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java
index bd6cbc7cd73..12c1a932ab1 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java
@@ -25,12 +25,19 @@ import java.util.UUID;
import javax.net.ssl.SSLException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import
org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
import
org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
+import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.plugin.extensions.communication.Message.DIRECT_TYPE_SIZE;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE;
@@ -69,6 +76,7 @@ public class TcpHandshakeExecutor {
* @param rmtNodeId Expected remote node.
* @param sslMeta Required data for ssl.
* @param msg Handshake message which should be sent during handshake.
+ * @param spiCtx Spi context.
* @return Handshake response from predefined variants from {@link
RecoveryLastReceivedMessage}.
* @throws IgniteCheckedException If handshake failed.
*/
@@ -76,18 +84,17 @@ public class TcpHandshakeExecutor {
SocketChannel ch,
UUID rmtNodeId,
GridSslMeta sslMeta,
- HandshakeMessage msg
+ HandshakeMessage msg,
+ IgniteSpiContext spiCtx
) throws IgniteCheckedException {
BlockingTransport transport = stateProvider.isSslEnabled() ?
- new SslTransport(sslMeta, ch, directBuffer, log) : new
TcpTransport(ch);
+ new SslTransport(sslMeta, ch, directBuffer, log, spiCtx) : new
TcpTransport(ch, spiCtx);
- ByteBuffer buf = transport.receiveNodeId();
+ UUID rmtNodeId0 = transport.receiveNodeId();
- if (buf == null)
+ if (rmtNodeId0 == null)
return NEED_WAIT;
- UUID rmtNodeId0 = U.bytesToUuid(buf.array(), DIRECT_TYPE_SIZE);
-
if (!rmtNodeId.equals(rmtNodeId0))
throw new HandshakeException("Remote node ID is not as expected
[expected=" + rmtNodeId + ", rcvd=" + rmtNodeId0 + ']');
else if (log.isDebugEnabled())
@@ -98,9 +105,7 @@ public class TcpHandshakeExecutor {
transport.sendHandshake(msg);
- buf = transport.receiveAcknowledge();
-
- long rcvCnt = buf.getLong(DIRECT_TYPE_SIZE);
+ long rcvCnt = transport.receiveAcknowledge();
if (log.isDebugEnabled())
log.debug("Received handshake message [rmtNode=" + rmtNodeId + ",
rcvCnt=" + rcvCnt + ']');
@@ -119,13 +124,29 @@ public class TcpHandshakeExecutor {
* Encapsulates handshake logic.
*/
private abstract static class BlockingTransport {
+ /** Message reader. */
+ private final MessageReader reader;
+
+ /** Message writer. */
+ private final MessageWriter writer;
+
+ /** Message factory. */
+ private final MessageFactory msgFactory;
+
+ /** */
+ BlockingTransport(IgniteSpiContext spiCtx) throws
IgniteCheckedException {
+ msgFactory = spiCtx.messageFactory();
+ reader = spiCtx.messageFormatter().reader(msgFactory);
+ writer = spiCtx.messageFormatter().writer(msgFactory);
+ }
+
/**
* Receive {@link NodeIdMessage}.
*
- * @return Buffer with {@link NodeIdMessage}.
+ * @return UUID from {@link NodeIdMessage}, or {@code null} if need
wait.
* @throws IgniteCheckedException If failed.
*/
- ByteBuffer receiveNodeId() throws IgniteCheckedException {
+ @Nullable UUID receiveNodeId() throws IgniteCheckedException {
ByteBuffer buf =
ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE)
.order(ByteOrder.LITTLE_ENDIAN);
@@ -140,12 +161,21 @@ public class TcpHandshakeExecutor {
if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
return null;
+
+ assert msgType == TcpCommunicationSpi.NODE_ID_MSG_TYPE;
}
totalBytes += readBytes;
}
- return buf;
+ buf.position(DIRECT_TYPE_SIZE);
+
+ NodeIdMessage nodeIdMsg = new NodeIdMessage();
+
+ msgFactory.serializer(nodeIdMsg.directType()).readFrom(nodeIdMsg,
buf, reader);
+ reader.reset();
+
+ return nodeIdMsg.nodeId();
}
/**
@@ -159,7 +189,8 @@ public class TcpHandshakeExecutor {
.order(ByteOrder.LITTLE_ENDIAN)
.put(U.IGNITE_HEADER);
- msg.writeTo(buf, null);
+ msgFactory.serializer(msg.directType()).writeTo(msg, buf, writer);
+
buf.flip();
write(buf);
@@ -168,24 +199,54 @@ public class TcpHandshakeExecutor {
/**
* Receive {@link RecoveryLastReceivedMessage} acknowledge message.
*
- * @return Buffer with message.
- * @throws IgniteCheckedException If failed.
+ * @return Received count.
*/
- ByteBuffer receiveAcknowledge() throws IgniteCheckedException {
+ long receiveAcknowledge() throws IgniteCheckedException {
ByteBuffer buf =
ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE)
.order(ByteOrder.LITTLE_ENDIAN);
- for (int totalBytes = 0; totalBytes <
RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int readBytes = read(buf);
+ boolean fininshed = false;
+
+ RecoveryLastReceivedMessage msg = new
RecoveryLastReceivedMessage();
+ RecoveryLastReceivedMessageSerializer msgSer =
+
(RecoveryLastReceivedMessageSerializer)msgFactory.serializer(msg.directType());
+
+ short msgType = 0;
+ int readPos = 0;
+ int readBytes = 0;
+
+ // Might read less than MESSAGE_FULL_SIZE, due to optimizaton for
writing long values.
+ // For this reason read byte by byte while not finished. To avoid
case when we read more than needed.
+ while (!fininshed) {
+ // Read byte by byte.
+ buf.limit(buf.position() + 1);
+
+ readBytes += read(buf);
if (readBytes == -1)
- throw new HandshakeException("Failed to read remote node
recovery handshake " +
- "(connection closed).");
+ throw new HandshakeException("Failed to read remote node
recovery handshake (connection closed).");
- totalBytes += readBytes;
+ if (msgType == 0 && readBytes >= DIRECT_TYPE_SIZE) {
+ msgType = makeMessageType(buf.get(0), buf.get(1));
+
+ assert msgType == msg.directType();
+
+ readPos = DIRECT_TYPE_SIZE;
+ }
+
+ if (msgType == 0)
+ continue;
+
+ buf.position(readPos);
+
+ fininshed = msgSer.readFrom(msg, buf, reader);
+
+ readPos = buf.position();
}
- return buf;
+ reader.reset();
+
+ return msg.received();
}
/**
@@ -222,7 +283,9 @@ public class TcpHandshakeExecutor {
private final SocketChannel ch;
/** */
- TcpTransport(SocketChannel ch) {
+ TcpTransport(SocketChannel ch, IgniteSpiContext spiCtx) throws
IgniteCheckedException {
+ super(spiCtx);
+
this.ch = ch;
}
@@ -262,7 +325,15 @@ public class TcpHandshakeExecutor {
private final ByteBuffer readBuf;
/** */
- SslTransport(GridSslMeta meta, SocketChannel ch, boolean directBuf,
IgniteLogger log) throws IgniteCheckedException {
+ SslTransport(
+ GridSslMeta meta,
+ SocketChannel ch,
+ boolean directBuf,
+ IgniteLogger log,
+ IgniteSpiContext spiCtx
+ ) throws IgniteCheckedException {
+ super(spiCtx);
+
try {
this.ch = ch;
handler = new BlockingSslHandler(meta.sslEngine(), ch,
directBuf, ByteOrder.LITTLE_ENDIAN, log);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
index 1cdfde15e46..7951170e028 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
@@ -17,37 +17,36 @@
package org.apache.ignite.spi.communication.tcp.messages;
-import java.nio.ByteBuffer;
import java.util.UUID;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
/**
* Handshake message.
*/
-@IgniteCodeGeneratingFail
public class HandshakeMessage implements Message {
/** Message body size in bytes. */
- private static final int MESSAGE_SIZE = 36;
+ private static final int MESSAGE_SIZE = 36 + 1; // additional byte for
null flag of UUID value.
/** Full message size (with message type) in bytes. */
public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE +
DIRECT_TYPE_SIZE;
/** */
+ @Order(0)
private UUID nodeId;
/** */
+ @Order(value = 1, method = "received")
private long rcvCnt;
/** */
+ @Order(value = 2, method = "connectCount")
private long connectCnt;
/** */
+ @Order(value = 3, method = "connectionIndex")
private int connIdx;
/**
@@ -102,52 +101,38 @@ public class HandshakeMessage implements Message {
}
/**
- * @return Message size in bytes.
+ * @param connIdx Connection index.
*/
- public int getMessageSize() {
- return MESSAGE_FULL_SIZE;
+ public void connectionIndex(int connIdx) {
+ this.connIdx = connIdx;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- TcpCommunicationSpi.writeMessageType(buf, directType());
-
- byte[] bytes = U.uuidToBytes(nodeId);
-
- assert bytes.length == 16 : bytes.length;
-
- buf.put(bytes);
-
- buf.putLong(rcvCnt);
-
- buf.putLong(connectCnt);
-
- buf.putInt(connIdx);
-
- return true;
+ /**
+ * @param connectCnt Connect count.
+ */
+ public void connectCount(long connectCnt) {
+ this.connectCnt = connectCnt;
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (buf.remaining() < MESSAGE_SIZE)
- return false;
-
- byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
-
- buf.get(nodeIdBytes);
-
- nodeId = U.bytesToUuid(nodeIdBytes, 0);
-
- rcvCnt = buf.getLong();
-
- connectCnt = buf.getLong();
+ /**
+ * @param rcvCnt Number of received messages.
+ */
+ public void received(long rcvCnt) {
+ this.rcvCnt = rcvCnt;
+ }
- connIdx = buf.getInt();
+ /**
+ * @param nodeId Node ID.
+ */
+ public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
- return true;
+ /**
+ * @return Message size in bytes.
+ */
+ public int getMessageSize() {
+ return MESSAGE_FULL_SIZE;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
index 4c4f052e76d..b1f95a8fb92 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
@@ -17,17 +17,13 @@
package org.apache.ignite.spi.communication.tcp.messages;
-import java.nio.ByteBuffer;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE;
/**
- * Message requesting to wait until node's SPI context initialize.
+ * Message requesting to wait until node's SPI context initialize.
*/
public class HandshakeWaitMessage implements Message {
/** Full message size (with message type) in bytes. */
@@ -40,21 +36,6 @@ public class HandshakeWaitMessage implements Message {
// No-op.
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- TcpCommunicationSpi.writeMessageType(buf, directType());
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return HANDSHAKE_WAIT_MSG_TYPE;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
index ac935a8375b..8b28a129d68 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
@@ -17,29 +17,25 @@
package org.apache.ignite.spi.communication.tcp.messages;
-import java.nio.ByteBuffer;
import java.util.UUID;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
/**
* Node ID message.
*/
-@IgniteCodeGeneratingFail
public class NodeIdMessage implements Message {
/** Message body size (with message type) in bytes. */
- static final int MESSAGE_SIZE = 16;
+ static final int MESSAGE_SIZE = 1 + 16; // null flag, UUID value.
/** Full message size (with message type) in bytes. */
public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE +
DIRECT_TYPE_SIZE;
/** */
- private byte[] nodeIdBytes;
+ @Order(0)
+ private UUID nodeId;
/** */
public NodeIdMessage() {
@@ -52,42 +48,21 @@ public class NodeIdMessage implements Message {
public NodeIdMessage(UUID nodeId) {
assert nodeId != null;
- nodeIdBytes = U.uuidToBytes(nodeId);
-
- assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " +
MESSAGE_SIZE;
+ this.nodeId = nodeId;
}
/**
* @return Node ID bytes.
*/
- public byte[] nodeIdBytes() {
- return nodeIdBytes;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- assert nodeIdBytes.length == MESSAGE_SIZE;
-
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- TcpCommunicationSpi.writeMessageType(buf, directType());
-
- buf.put(nodeIdBytes);
-
- return true;
+ public UUID nodeId() {
+ return nodeId;
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (buf.remaining() < MESSAGE_SIZE)
- return false;
-
- nodeIdBytes = new byte[MESSAGE_SIZE];
-
- buf.get(nodeIdBytes);
-
- return true;
+ /**
+ * @param nodeId Node ID bytes.
+ */
+ public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
index 3df65ddb57d..76c961ed0e1 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
@@ -17,18 +17,14 @@
package org.apache.ignite.spi.communication.tcp.messages;
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
/**
* Recovery acknowledgment message.
*/
-@IgniteCodeGeneratingFail
public class RecoveryLastReceivedMessage implements Message {
/** */
public static final long ALREADY_CONNECTED = -1;
@@ -49,6 +45,7 @@ public class RecoveryLastReceivedMessage implements Message {
public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE +
DIRECT_TYPE_SIZE;
/** */
+ @Order(value = 0, method = "received")
private long rcvCnt;
/**
@@ -72,26 +69,11 @@ public class RecoveryLastReceivedMessage implements Message
{
return rcvCnt;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- TcpCommunicationSpi.writeMessageType(buf, directType());
-
- buf.putLong(rcvCnt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (buf.remaining() < MESSAGE_SIZE)
- return false;
-
- rcvCnt = buf.getLong();
-
- return true;
+ /**
+ * @param rcvCnt Number of received messages.
+ */
+ public void received(long rcvCnt) {
+ this.rcvCnt = rcvCnt;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java
index 0657da4a940..6750812a6a1 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.managers.communication;
+import java.util.UUID;
import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -35,11 +35,8 @@ public class IgniteIoCommunicationMessageSerializationTest
extends AbstractCommu
/** {@inheritDoc} */
@Override protected Message initializeMessage(Message msg) throws
Exception {
- if (msg instanceof NodeIdMessage) {
- int msgSize = U.field(NodeIdMessage.class, "MESSAGE_SIZE");
-
- FieldUtils.writeField(msg, "nodeIdBytes", new byte[msgSize], true);
- }
+ if (msg instanceof NodeIdMessage)
+ FieldUtils.writeField(msg, "nodeId", UUID.randomUUID(), true);
return msg;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationHandshakeTimeoutTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationHandshakeTimeoutTest.java
index 22ed899f373..46c0e85fbbb 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationHandshakeTimeoutTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationHandshakeTimeoutTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.resource.DependencyResolver;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
+import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.communication.tcp.internal.TcpHandshakeExecutor;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -136,7 +137,7 @@ public class TcpCommunicationHandshakeTimeoutTest extends
GridCommonAbstractTest
/** {@inheritDoc} */
@Override public long tcpHandshake(SocketChannel ch, UUID rmtNodeId,
GridSslMeta sslMeta,
- HandshakeMessage msg) throws IgniteCheckedException {
+ HandshakeMessage msg, IgniteSpiContext spiCtx) throws
IgniteCheckedException {
if (needToDelayd.get()) {
needToDelayd.set(false);
@@ -144,7 +145,7 @@ public class TcpCommunicationHandshakeTimeoutTest extends
GridCommonAbstractTest
LockSupport.parkNanos(10_000_000);
}
- return delegate.tcpHandshake(ch, rmtNodeId, sslMeta, msg);
+ return delegate.tcpHandshake(ch, rmtNodeId, sslMeta, msg, spiCtx);
}
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 7f5d876e21f..bd14ef0e300 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -543,11 +543,11 @@ public class GridSpiTestContext implements
IgniteSpiContext {
@Override public MessageFormatter messageFormatter() {
if (formatter == null) {
formatter = new MessageFormatter() {
- @Override public MessageWriter writer(UUID rmtNodeId,
MessageFactory msgFactory) {
+ @Override public MessageWriter writer(MessageFactory
msgFactory) {
return new DirectMessageWriter(msgFactory);
}
- @Override public MessageReader reader(UUID rmtNodeId,
MessageFactory msgFactory) {
+ @Override public MessageReader reader(MessageFactory
msgFactory) {
return new DirectMessageReader(msgFactory, null);
}
};