This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e923f923690 IGNITE-27810 Use MessageSerializer for
TcpDiscoveryJoinRequestMessage (#12724)
e923f923690 is described below
commit e923f923690496315c51f7eac2123ec2294f9d72
Author: Dmitry Werner <[email protected]>
AuthorDate: Tue Feb 24 14:27:10 2026 +0500
IGNITE-27810 Use MessageSerializer for TcpDiscoveryJoinRequestMessage
(#12724)
---
.../discovery/DiscoveryMessageFactory.java | 3 ++
.../ignite/spi/discovery/tcp/ClientImpl.java | 2 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 7 +++
.../messages/TcpDiscoveryJoinRequestMessage.java | 61 ++++++++++++++++++++--
4 files changed, 69 insertions(+), 4 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index d188ac50704..a225c1abec7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -53,6 +53,8 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequestSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponseSerializer;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessageSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessageSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
@@ -110,5 +112,6 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new
TcpDiscoveryNodeFailedMessageSerializer());
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new
TcpDiscoveryStatusCheckMessageSerializer());
factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new,
new TcpDiscoveryNodeAddFinishedMessageSerializer());
+ factory.register((short)20, TcpDiscoveryJoinRequestMessage::new, new
TcpDiscoveryJoinRequestMessageSerializer());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 2e96845ff03..dfa7d8cfeef 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -787,6 +787,8 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryJoinRequestMessage joinReqMsg = new
TcpDiscoveryJoinRequestMessage(node, discoveryData);
+ joinReqMsg.prepareMarshal(spi.marshaller());
+
TcpDiscoveryNode nodef = node;
joinReqMsg.spanContainer().span(
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e6d3052e902..fa69db0cde1 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1123,6 +1123,8 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryJoinRequestMessage joinReqMsg = new
TcpDiscoveryJoinRequestMessage(locNode, discoveryData);
+ joinReqMsg.prepareMarshal(spi.marshaller());
+
joinReqMsg.spanContainer().span(
tracing.create(TraceableMessagesTable.traceName(joinReqMsg.getClass()))
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () ->
locNode.id().toString())
@@ -3305,6 +3307,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg instanceof TraceableMessage)
tracing.messages().beforeSend((TraceableMessage)msg);
+ if (msg instanceof TcpDiscoveryJoinRequestMessage)
+
((TcpDiscoveryJoinRequestMessage)msg).prepareMarshal(spi.marshaller());
+
sendMessageToClients(msg);
List<TcpDiscoveryNode> failedNodes;
@@ -6955,6 +6960,8 @@ class ServerImpl extends TcpDiscoveryImpl {
else if (msg instanceof
TcpDiscoveryJoinRequestMessage) {
TcpDiscoveryJoinRequestMessage req =
(TcpDiscoveryJoinRequestMessage)msg;
+ req.finishUnmarshal(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
+
// Current node holds connection with the node
that is joining the cluster. Therefore, it can
// save certificates with which the connection was
established to joining node attributes.
if (spi.nodeAuth != null &&
nodeId.equals(req.node().id()))
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index b3b4e3ae9a0..33b46585b43 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -17,7 +17,13 @@
package org.apache.ignite.spi.discovery.tcp.messages;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -27,15 +33,26 @@ import static
org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes;
* Initial message sent by a node that wants to enter topology.
* Sent to random node during SPI start. Then forwarded directly to
coordinator.
*/
-public class TcpDiscoveryJoinRequestMessage extends
TcpDiscoveryAbstractTraceableMessage {
+public class TcpDiscoveryJoinRequestMessage extends
TcpDiscoveryAbstractTraceableMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
/** New node that wants to join the topology. */
- private final TcpDiscoveryNode node;
+ private TcpDiscoveryNode node;
+
+ /** Serialized {@link #node}. */
+ // TODO Remove the field after completing
https://issues.apache.org/jira/browse/IGNITE-27899.
+ @Order(6)
+ byte[] nodeBytes;
/** Discovery data container. */
- private final DiscoveryDataPacket dataPacket;
+ @Order(7)
+ DiscoveryDataPacket dataPacket;
+
+ /** Constructor. */
+ public TcpDiscoveryJoinRequestMessage() {
+ // No-op.
+ }
/**
* Constructor.
@@ -65,7 +82,7 @@ public class TcpDiscoveryJoinRequestMessage extends
TcpDiscoveryAbstractTraceabl
}
/**
- * @return {@code true} flag.
+ * @return Responded flag.
*/
public boolean responded() {
return getFlag(RESPONDED_FLAG_POS);
@@ -78,6 +95,37 @@ public class TcpDiscoveryJoinRequestMessage extends
TcpDiscoveryAbstractTraceabl
setFlag(RESPONDED_FLAG_POS, responded);
}
+ /**
+ * @param marsh Marshaller.
+ */
+ public void prepareMarshal(Marshaller marsh) {
+ if (node != null && nodeBytes == null) {
+ try {
+ nodeBytes = U.marshal(marsh, node);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to marshal TcpDiscoveryNode
object", e);
+ }
+ }
+ }
+
+ /**
+ * @param marsh Marshaller.
+ * @param clsLdr Class loader.
+ */
+ public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) {
+ if (nodeBytes != null && node == null) {
+ try {
+ node = U.unmarshal(marsh, nodeBytes, clsLdr);
+
+ nodeBytes = null;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to unmarshal
TcpDiscoveryNode object", e);
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
// NOTE!
@@ -95,4 +143,9 @@ public class TcpDiscoveryJoinRequestMessage extends
TcpDiscoveryAbstractTraceabl
@Override public String toString() {
return S.toString(TcpDiscoveryJoinRequestMessage.class, this, "super",
super.toString());
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 20;
+ }
}