This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e7f9a7a95d IGNITE-28290 Added utility discovery collection message 
(#12890)
2e7f9a7a95d is described below

commit 2e7f9a7a95d02fc9d92e2fabbce1633e90697389
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Mar 19 14:27:16 2026 +0300

    IGNITE-28290 Added utility discovery collection message (#12890)
---
 .../discovery/DiscoveryMessageFactory.java         |  10 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       |  21 ++-
 .../TcpDiscoveryClientReconnectMessage.java        |  40 ++----
 .../messages/TcpDiscoveryCollectionMessage.java    | 143 +++++++++++++++++++++
 .../tcp/messages/TcpDiscoveryNodeAddedMessage.java |  29 +----
 5 files changed, 182 insertions(+), 61 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 02316cc3784..26f744bac52 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -111,7 +111,9 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingReques
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
-import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageMarshallableSerializer;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageSerializer;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageMarshallableSerializer;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -172,6 +174,9 @@ public class DiscoveryMessageFactory implements 
MessageFactoryProvider {
 
     /** {@inheritDoc} */
     @Override public void registerAll(MessageFactory factory) {
+        factory.register((short)-200, TcpDiscoveryCollectionMessage::new,
+            new 
TcpDiscoveryCollectionMessageMarshallableSerializer(cstDataMarshall, 
cstDataMarshallClsLdr));
+
         factory.register((short)-115, SchemaAlterTableAddColumnOperation::new,
             new SchemaAlterTableAddColumnOperationSerializer());
         factory.register((short)-114, SchemaIndexCreateOperation::new,
@@ -228,8 +233,7 @@ public class DiscoveryMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)25, 
DistributedMetaStorageUpdateAckMessage::new, new 
DistributedMetaStorageUpdateAckMessageSerializer());
         factory.register((short)26, DistributedMetaStorageCasMessage::new, new 
DistributedMetaStorageCasMessageSerializer());
         factory.register((short)27, DistributedMetaStorageCasAckMessage::new, 
new DistributedMetaStorageCasAckMessageSerializer());
-        factory.register((short)28, TcpDiscoveryClientReconnectMessage::new,
-            new 
TcpDiscoveryClientReconnectMessageMarshallableSerializer(cstDataMarshall, 
cstDataMarshallClsLdr));
+        factory.register((short)28, TcpDiscoveryClientReconnectMessage::new, 
new TcpDiscoveryClientReconnectMessageSerializer());
         factory.register((short)29, TcpDiscoveryNodeAddedMessage::new,
             new 
TcpDiscoveryNodeAddedMessageMarshallableSerializer(cstDataMarshall, 
cstDataMarshallClsLdr));
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9006c05df58..8662d940e7b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1879,18 +1879,27 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 nodeAddedMsg.topology(topToSnd);
 
-                Collection<TcpDiscoveryAbstractMessage> msgs0 = null;
-
                 if (msgs != null) {
-                    msgs0 = new ArrayList<>(msgs.size());
+                    Collection<TcpDiscoveryAbstractMessage> msgs0 = new 
ArrayList<>(msgs.size());
 
                     for (PendingMessage pendingMsg : msgs) {
-                        if (pendingMsg.msg != null)
+                        if (pendingMsg.msg == null)
+                            continue;
+
+                        if (pendingMsg.msg == nodeAddedMsg) {
+                            TcpDiscoveryNodeAddedMessage msg0 = 
(TcpDiscoveryNodeAddedMessage)pendingMsg.msg;
+                            msg0 = new TcpDiscoveryNodeAddedMessage(msg0);
+                            // Removes message self-inclusion and prevents 
infinite write/read message cycles and stack overflow.
+                            msg0.messages(null);
+
+                            msgs0.add(msg0);
+                        }
+                        else
                             msgs0.add(pendingMsg.msg);
                     }
-                }
 
-                nodeAddedMsg.messages(msgs0);
+                    nodeAddedMsg.messages(msgs0);
+                }
 
                 Map<Long, Collection<ClusterNode>> hist;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
index 1e38accc112..e76c9ea182a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
@@ -18,23 +18,22 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Message telling that client node is reconnecting to topology.
  */
 @TcpDiscoveryEnsureDelivery
-public class TcpDiscoveryClientReconnectMessage extends 
TcpDiscoveryAbstractMessage implements MarshallableMessage {
+public class TcpDiscoveryClientReconnectMessage extends 
TcpDiscoveryAbstractMessage implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -46,16 +45,9 @@ public class TcpDiscoveryClientReconnectMessage extends 
TcpDiscoveryAbstractMess
     @Order(1)
     IgniteUuid lastMsgId;
 
-    /** Pending messages. */
-    @GridToStringExclude
-    private Collection<TcpDiscoveryAbstractMessage> msgs;
-
-    /**
-     * TODO: Use direct messages or a message container after 
https://issues.apache.org/jira/browse/IGNITE-25883
-     * Srialized bytes of {@link #msgs}.
-     */
+    /** Pending messages holder. */
     @Order(2)
-    byte[] msgsBytes;
+    @Nullable TcpDiscoveryCollectionMessage pendingMsgsMsg;
 
     /** Constructor for {@link DiscoveryMessageFactory}. */
     public TcpDiscoveryClientReconnectMessage() {
@@ -91,15 +83,15 @@ public class TcpDiscoveryClientReconnectMessage extends 
TcpDiscoveryAbstractMess
     /**
      * @param msgs Pending messages.
      */
-    public void pendingMessages(Collection<TcpDiscoveryAbstractMessage> msgs) {
-        this.msgs = msgs;
+    public void pendingMessages(@Nullable 
Collection<TcpDiscoveryAbstractMessage> msgs) {
+        pendingMsgsMsg = F.isEmpty(msgs) ? null : new 
TcpDiscoveryCollectionMessage(msgs);
     }
 
     /**
      * @return Pending messages.
      */
     public Collection<TcpDiscoveryAbstractMessage> pendingMessages() {
-        return msgs;
+        return pendingMsgsMsg == null ? Collections.emptyList() : 
pendingMsgsMsg.messages();
     }
 
     /**
@@ -131,18 +123,6 @@ public class TcpDiscoveryClientReconnectMessage extends 
TcpDiscoveryAbstractMess
             Objects.equals(lastMsgId, other.lastMsgId);
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
-        if (msgs != null)
-            msgsBytes = U.marshal(marsh, msgs);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader 
clsLdr) throws IgniteCheckedException {
-        if (msgsBytes != null)
-            msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
-    }
-
     /** {@inheritDoc} */
     @Override public short directType() {
         return 28;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java
new file mode 100644
index 00000000000..7a01679666f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883
+ * Message to transfer a collection of {@link TcpDiscoveryAbstractMessage} 
with the original order.
+ * Several of them might be a {@link Message}, several may not and require the 
original marshalling.
+ */
+public class TcpDiscoveryCollectionMessage implements MarshallableMessage {
+    /** {@link TcpDiscoveryAbstractMessage} messages which are a {@link 
Message}. */
+    @Order(0)
+    @Nullable Map<Integer, Message> writableMsgs;
+
+    /** Marshallable or Java-serializable messages which are not a {@link 
Message}. */
+    @Nullable private Map<Integer, TcpDiscoveryAbstractMessage> 
marshallableMsgs;
+
+    /** Marshalled {@link #marshallableMsgs}. */
+    @Order(1)
+    @GridToStringExclude
+    @Nullable byte[] marshallableMsgsBytes;
+
+    /** Constructor for {@link DiscoveryMessageFactory}. */
+    public TcpDiscoveryCollectionMessage() {
+        // No-op.
+    }
+
+    /** @param msgs Discovery messages to hold. */
+    public 
TcpDiscoveryCollectionMessage(Collection<TcpDiscoveryAbstractMessage> msgs) {
+        if (F.isEmpty(msgs))
+            return;
+
+        // Keeps the original message order.
+        int idx = 0;
+
+        for (TcpDiscoveryAbstractMessage m : msgs) {
+            if (m instanceof Message) {
+                if (writableMsgs == null)
+                    writableMsgs = U.newHashMap(msgs.size());
+
+                writableMsgs.put(idx++, (Message)m);
+
+                continue;
+            }
+
+            if (marshallableMsgs == null)
+                marshallableMsgs = U.newHashMap(msgs.size());
+
+            marshallableMsgs.put(idx++, m);
+        }
+    }
+
+    /** @param marsh marshaller. */
+    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
+        if (marshallableMsgs != null)
+            marshallableMsgsBytes = U.marshal(marsh, marshallableMsgs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader 
clsLdr) throws IgniteCheckedException {
+        if (marshallableMsgsBytes != null)
+            marshallableMsgs = U.unmarshal(marsh, marshallableMsgsBytes, 
clsLdr);
+
+        marshallableMsgsBytes = null;
+    }
+
+    /**
+     * Gets pending messages sent to new node by its previous.
+     *
+     * @return Pending messages from previous node.
+     */
+    public Collection<TcpDiscoveryAbstractMessage> messages() {
+        if (F.isEmpty(writableMsgs) && F.isEmpty(marshallableMsgs))
+            return Collections.emptyList();
+
+        int totalSz = (F.isEmpty(writableMsgs) ? 0 : writableMsgs.size())
+            + (F.isEmpty(marshallableMsgs) ? 0 : marshallableMsgs.size());
+
+        List<TcpDiscoveryAbstractMessage> res = new ArrayList<>(totalSz);
+
+        for (int i = 0; i < totalSz; ++i) {
+            Message m = F.isEmpty(writableMsgs) ? null : writableMsgs.get(i);
+
+            if (m == null) {
+                TcpDiscoveryAbstractMessage adm = marshallableMsgs.get(i);
+
+                assert adm != null;
+
+                res.add(adm);
+            }
+            else {
+                assert marshallableMsgs == null || marshallableMsgs.get(i) == 
null;
+                assert m instanceof TcpDiscoveryAbstractMessage;
+
+                res.add((TcpDiscoveryAbstractMessage)m);
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return -200;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryCollectionMessage.class, this, "super", 
super.toString());
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index e4c6bddb008..bdb75e19783 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
@@ -58,16 +59,9 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractTraceableM
     @Order(1)
     DiscoveryDataPacket dataPacket;
 
-    /** Pending messages from previous node. */
-    private Collection<TcpDiscoveryAbstractMessage> msgs;
-
-    /**
-     * TODO: Use direct messages or a message container after 
https://issues.apache.org/jira/browse/IGNITE-25883
-     * Marshalled {@link #msgs}.
-     */
+    /** Pending messages containner. */
     @Order(2)
-    @GridToStringExclude
-    byte[] msgsBytes;
+    @Nullable TcpDiscoveryCollectionMessage pendingMsgsMsg;
 
     /** Current topology. Initialized by coordinator. */
     @GridToStringInclude
@@ -131,8 +125,7 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractTraceableM
 
         node = msg.node;
         nodeBytes = msg.nodeBytes;
-        msgs = msg.msgs;
-        msgsBytes = msg.msgsBytes;
+        pendingMsgsMsg = msg.pendingMsgsMsg;
         top = msg.top;
         topBytes = msg.topBytes;
         clientTop = msg.clientTop;
@@ -156,8 +149,8 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractTraceableM
      *
      * @return Pending messages from previous node.
      */
-    @Nullable public Collection<TcpDiscoveryAbstractMessage> messages() {
-        return msgs;
+    public @Nullable Collection<TcpDiscoveryAbstractMessage> messages() {
+        return pendingMsgsMsg == null ? null : pendingMsgsMsg.messages();
     }
 
     /**
@@ -166,8 +159,7 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractTraceableM
      * @param msgs Pending messages to send to new node.
      */
     public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> 
msgs) {
-        this.msgs = msgs;
-        msgsBytes = null;
+        pendingMsgsMsg = F.isEmpty(msgs) ? null : new 
TcpDiscoveryCollectionMessage(msgs);
     }
 
     /**
@@ -259,9 +251,6 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractTraceableM
         if (node != null)
             nodeBytes = U.marshal(marsh, node);
 
-        if (msgs != null)
-            msgsBytes = U.marshal(marsh, msgs);
-
         if (top != null)
             topBytes = U.marshal(marsh, top);
 
@@ -274,9 +263,6 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractTraceableM
         if (nodeBytes != null)
             node = U.unmarshal(marsh, nodeBytes, clsLdr);
 
-        if (msgsBytes != null)
-            msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
-
         if (topBytes != null)
             top = U.unmarshal(marsh, topBytes, clsLdr);
 
@@ -286,7 +272,6 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractTraceableM
         nodeBytes = null;
         topBytes = null;
         topHistBytes = null;
-        msgsBytes = null;
     }
 
 

Reply via email to