This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 04c41beca34 IGNITE-27556 : MessageSerializer for
TcpDiscoveryNodeAddedMessage v2 simplified (#12790)
04c41beca34 is described below
commit 04c41beca34da8c81c96266442d44730872a8e4c
Author: Vladimir Steshin <[email protected]>
AuthorDate: Tue Mar 17 16:52:11 2026 +0300
IGNITE-27556 : MessageSerializer for TcpDiscoveryNodeAddedMessage v2
simplified (#12790)
---
.../discovery/DiscoveryMessageFactory.java | 4 +
.../TcpDiscoveryClientReconnectMessage.java | 5 +-
.../TcpDiscoveryNodeAddFinishedMessage.java | 3 +
.../tcp/messages/TcpDiscoveryNodeAddedMessage.java | 117 ++++++++++++++++++---
4 files changed, 112 insertions(+), 17 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index f289b84f4a9..02316cc3784 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -132,6 +132,8 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMes
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessageMarshallableSerializer;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessageMarshallableSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessageSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
@@ -228,6 +230,8 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)27, DistributedMetaStorageCasAckMessage::new,
new DistributedMetaStorageCasAckMessageSerializer());
factory.register((short)28, TcpDiscoveryClientReconnectMessage::new,
new
TcpDiscoveryClientReconnectMessageMarshallableSerializer(cstDataMarshall,
cstDataMarshallClsLdr));
+ factory.register((short)29, TcpDiscoveryNodeAddedMessage::new,
+ new
TcpDiscoveryNodeAddedMessageMarshallableSerializer(cstDataMarshall,
cstDataMarshallClsLdr));
// DiscoveryCustomMessage
factory.register((short)500, CacheStatisticsModeChangeMessage::new,
new CacheStatisticsModeChangeMessageSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
index 3eaf562e5a7..1e38accc112 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
@@ -50,7 +50,10 @@ public class TcpDiscoveryClientReconnectMessage extends
TcpDiscoveryAbstractMess
@GridToStringExclude
private Collection<TcpDiscoveryAbstractMessage> msgs;
- /** Srialized bytes of {@link #msgs}. */
+ /**
+ * TODO: Use direct messages or a message container after
https://issues.apache.org/jira/browse/IGNITE-25883
+ * Srialized bytes of {@link #msgs}.
+ */
@Order(2)
byte[] msgsBytes;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 451689eabe0..c11bee62e85 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -123,6 +123,7 @@ public class TcpDiscoveryNodeAddFinishedMessage extends
TcpDiscoveryAbstractTrac
*/
public void clientNodeAttributes(Map<String, Object> clientNodeAttrs) {
this.clientNodeAttrs = clientNodeAttrs;
+ clientNodeAttrsBytes = null;
}
/** {@inheritDoc} */
@@ -135,6 +136,8 @@ public class TcpDiscoveryNodeAddFinishedMessage extends
TcpDiscoveryAbstractTrac
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
if (clientNodeAttrsBytes != null)
clientNodeAttrs = U.unmarshal(marsh, clientNodeAttrsBytes, clsLdr);
+
+ clientNodeAttrsBytes = null;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 36540d8b7df..e4c6bddb008 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -20,37 +20,64 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;
/**
+ * TODO: Use NodeMessage for {@link TcpDiscoveryNode} and {@link ClusterNode}
after https://issues.apache.org/jira/browse/IGNITE-27899
* Message telling nodes that new node should be added to topology.
* When newly added node receives the message it connects to its next and
finishes
* join process.
*/
@TcpDiscoveryEnsureDelivery
@TcpDiscoveryRedirectToClient
-public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableMessage {
+public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage {
/** */
private static final long serialVersionUID = 0L;
/** Added node. */
- private final TcpDiscoveryNode node;
+ private TcpDiscoveryNode node;
+
+ /** Marshalled {@link #node}. */
+ @Order(0)
+ @GridToStringExclude
+ byte[] nodeBytes;
/** */
- private DiscoveryDataPacket dataPacket;
+ @Order(1)
+ DiscoveryDataPacket dataPacket;
/** Pending messages from previous node. */
private Collection<TcpDiscoveryAbstractMessage> msgs;
+ /**
+ * TODO: Use direct messages or a message container after
https://issues.apache.org/jira/browse/IGNITE-25883
+ * Marshalled {@link #msgs}.
+ */
+ @Order(2)
+ @GridToStringExclude
+ byte[] msgsBytes;
+
/** Current topology. Initialized by coordinator. */
@GridToStringInclude
private Collection<TcpDiscoveryNode> top;
+ /** Marshalled {@link #top}. */
+ @Order(3)
+ @GridToStringExclude
+ @Nullable byte[] topBytes;
+
/** */
@GridToStringInclude
private transient Collection<TcpDiscoveryNode> clientTop;
@@ -58,8 +85,19 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
/** Topology snapshots history. */
private Map<Long, Collection<ClusterNode>> topHist;
+ /** Marshalled {@link #topHist}. */
+ @Order(4)
+ @GridToStringExclude
+ @Nullable byte[] topHistBytes;
+
/** Start time of the first grid node. */
- private final long gridStartTime;
+ @Order(5)
+ long gridStartTime;
+
+ /** Constructor for {@link DiscoveryMessageFactory}. */
+ public TcpDiscoveryNodeAddedMessage() {
+ // No-op.
+ }
/**
* Constructor.
@@ -69,7 +107,8 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
* @param dataPacket container for collecting discovery data across the
cluster.
* @param gridStartTime Start time of the first grid node.
*/
- public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId,
+ public TcpDiscoveryNodeAddedMessage(
+ UUID creatorNodeId,
TcpDiscoveryNode node,
DiscoveryDataPacket dataPacket,
long gridStartTime
@@ -90,13 +129,17 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
super(msg);
- this.node = msg.node;
- this.msgs = msg.msgs;
- this.top = msg.top;
- this.clientTop = msg.clientTop;
- this.topHist = msg.topHist;
- this.dataPacket = msg.dataPacket;
- this.gridStartTime = msg.gridStartTime;
+ node = msg.node;
+ nodeBytes = msg.nodeBytes;
+ msgs = msg.msgs;
+ msgsBytes = msg.msgsBytes;
+ top = msg.top;
+ topBytes = msg.topBytes;
+ clientTop = msg.clientTop;
+ topHist = msg.topHist;
+ topHistBytes = msg.topHistBytes;
+ dataPacket = msg.dataPacket;
+ gridStartTime = msg.gridStartTime;
}
/**
@@ -122,10 +165,9 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
*
* @param msgs Pending messages to send to new node.
*/
- public void messages(
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs
- ) {
+ public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage>
msgs) {
this.msgs = msgs;
+ msgsBytes = null;
}
/**
@@ -144,6 +186,7 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
*/
public void topology(@Nullable Collection<TcpDiscoveryNode> top) {
this.top = top;
+ topBytes = null;
}
/**
@@ -152,7 +195,7 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
public void clientTopology(Collection<TcpDiscoveryNode> top) {
assert top != null && !top.isEmpty() : top;
- this.clientTop = top;
+ clientTop = top;
}
/**
@@ -178,6 +221,7 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
*/
public void topologyHistory(@Nullable Map<Long, Collection<ClusterNode>>
topHist) {
this.topHist = topHist;
+ topHistBytes = null;
}
/**
@@ -210,6 +254,47 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
return gridStartTime;
}
+ /** @param marsh marshaller. */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (node != null)
+ nodeBytes = U.marshal(marsh, node);
+
+ if (msgs != null)
+ msgsBytes = U.marshal(marsh, msgs);
+
+ if (top != null)
+ topBytes = U.marshal(marsh, top);
+
+ if (topHist != null)
+ topHistBytes = U.marshal(marsh, topHist);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ if (nodeBytes != null)
+ node = U.unmarshal(marsh, nodeBytes, clsLdr);
+
+ if (msgsBytes != null)
+ msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
+
+ if (topBytes != null)
+ top = U.unmarshal(marsh, topBytes, clsLdr);
+
+ if (topHistBytes != null)
+ topHist = U.unmarshal(marsh, topHistBytes, clsLdr);
+
+ nodeBytes = null;
+ topBytes = null;
+ topHistBytes = null;
+ msgsBytes = null;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 29;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super",
super.toString());