This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2e7f9a7a95d IGNITE-28290 Added utility discovery collection message
(#12890)
2e7f9a7a95d is described below
commit 2e7f9a7a95d02fc9d92e2fabbce1633e90697389
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Mar 19 14:27:16 2026 +0300
IGNITE-28290 Added utility discovery collection message (#12890)
---
.../discovery/DiscoveryMessageFactory.java | 10 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 21 ++-
.../TcpDiscoveryClientReconnectMessage.java | 40 ++----
.../messages/TcpDiscoveryCollectionMessage.java | 143 +++++++++++++++++++++
.../tcp/messages/TcpDiscoveryNodeAddedMessage.java | 29 +----
5 files changed, 182 insertions(+), 61 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 02316cc3784..26f744bac52 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -111,7 +111,9 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingReques
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
-import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageMarshallableSerializer;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageSerializer;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageMarshallableSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -172,6 +174,9 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
+ factory.register((short)-200, TcpDiscoveryCollectionMessage::new,
+ new
TcpDiscoveryCollectionMessageMarshallableSerializer(cstDataMarshall,
cstDataMarshallClsLdr));
+
factory.register((short)-115, SchemaAlterTableAddColumnOperation::new,
new SchemaAlterTableAddColumnOperationSerializer());
factory.register((short)-114, SchemaIndexCreateOperation::new,
@@ -228,8 +233,7 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)25,
DistributedMetaStorageUpdateAckMessage::new, new
DistributedMetaStorageUpdateAckMessageSerializer());
factory.register((short)26, DistributedMetaStorageCasMessage::new, new
DistributedMetaStorageCasMessageSerializer());
factory.register((short)27, DistributedMetaStorageCasAckMessage::new,
new DistributedMetaStorageCasAckMessageSerializer());
- factory.register((short)28, TcpDiscoveryClientReconnectMessage::new,
- new
TcpDiscoveryClientReconnectMessageMarshallableSerializer(cstDataMarshall,
cstDataMarshallClsLdr));
+ factory.register((short)28, TcpDiscoveryClientReconnectMessage::new,
new TcpDiscoveryClientReconnectMessageSerializer());
factory.register((short)29, TcpDiscoveryNodeAddedMessage::new,
new
TcpDiscoveryNodeAddedMessageMarshallableSerializer(cstDataMarshall,
cstDataMarshallClsLdr));
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9006c05df58..8662d940e7b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1879,18 +1879,27 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.topology(topToSnd);
- Collection<TcpDiscoveryAbstractMessage> msgs0 = null;
-
if (msgs != null) {
- msgs0 = new ArrayList<>(msgs.size());
+ Collection<TcpDiscoveryAbstractMessage> msgs0 = new
ArrayList<>(msgs.size());
for (PendingMessage pendingMsg : msgs) {
- if (pendingMsg.msg != null)
+ if (pendingMsg.msg == null)
+ continue;
+
+ if (pendingMsg.msg == nodeAddedMsg) {
+ TcpDiscoveryNodeAddedMessage msg0 =
(TcpDiscoveryNodeAddedMessage)pendingMsg.msg;
+ msg0 = new TcpDiscoveryNodeAddedMessage(msg0);
+ // Removes message self-inclusion and prevents
infinite write/read message cycles and stack overflow.
+ msg0.messages(null);
+
+ msgs0.add(msg0);
+ }
+ else
msgs0.add(pendingMsg.msg);
}
- }
- nodeAddedMsg.messages(msgs0);
+ nodeAddedMsg.messages(msgs0);
+ }
Map<Long, Collection<ClusterNode>> hist;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
index 1e38accc112..e76c9ea182a 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
@@ -18,23 +18,22 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.Collection;
+import java.util.Collections;
import java.util.Objects;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
/**
* Message telling that client node is reconnecting to topology.
*/
@TcpDiscoveryEnsureDelivery
-public class TcpDiscoveryClientReconnectMessage extends
TcpDiscoveryAbstractMessage implements MarshallableMessage {
+public class TcpDiscoveryClientReconnectMessage extends
TcpDiscoveryAbstractMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
@@ -46,16 +45,9 @@ public class TcpDiscoveryClientReconnectMessage extends
TcpDiscoveryAbstractMess
@Order(1)
IgniteUuid lastMsgId;
- /** Pending messages. */
- @GridToStringExclude
- private Collection<TcpDiscoveryAbstractMessage> msgs;
-
- /**
- * TODO: Use direct messages or a message container after
https://issues.apache.org/jira/browse/IGNITE-25883
- * Srialized bytes of {@link #msgs}.
- */
+ /** Pending messages holder. */
@Order(2)
- byte[] msgsBytes;
+ @Nullable TcpDiscoveryCollectionMessage pendingMsgsMsg;
/** Constructor for {@link DiscoveryMessageFactory}. */
public TcpDiscoveryClientReconnectMessage() {
@@ -91,15 +83,15 @@ public class TcpDiscoveryClientReconnectMessage extends
TcpDiscoveryAbstractMess
/**
* @param msgs Pending messages.
*/
- public void pendingMessages(Collection<TcpDiscoveryAbstractMessage> msgs) {
- this.msgs = msgs;
+ public void pendingMessages(@Nullable
Collection<TcpDiscoveryAbstractMessage> msgs) {
+ pendingMsgsMsg = F.isEmpty(msgs) ? null : new
TcpDiscoveryCollectionMessage(msgs);
}
/**
* @return Pending messages.
*/
public Collection<TcpDiscoveryAbstractMessage> pendingMessages() {
- return msgs;
+ return pendingMsgsMsg == null ? Collections.emptyList() :
pendingMsgsMsg.messages();
}
/**
@@ -131,18 +123,6 @@ public class TcpDiscoveryClientReconnectMessage extends
TcpDiscoveryAbstractMess
Objects.equals(lastMsgId, other.lastMsgId);
}
- /** {@inheritDoc} */
- @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
- if (msgs != null)
- msgsBytes = U.marshal(marsh, msgs);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
- if (msgsBytes != null)
- msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 28;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java
new file mode 100644
index 00000000000..7a01679666f
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883
+ * Message to transfer a collection of {@link TcpDiscoveryAbstractMessage}
with the original order.
+ * Several of them might be a {@link Message}, several may not and require the
original marshalling.
+ */
+public class TcpDiscoveryCollectionMessage implements MarshallableMessage {
+ /** {@link TcpDiscoveryAbstractMessage} messages which are a {@link
Message}. */
+ @Order(0)
+ @Nullable Map<Integer, Message> writableMsgs;
+
+ /** Marshallable or Java-serializable messages which are not a {@link
Message}. */
+ @Nullable private Map<Integer, TcpDiscoveryAbstractMessage>
marshallableMsgs;
+
+ /** Marshalled {@link #marshallableMsgs}. */
+ @Order(1)
+ @GridToStringExclude
+ @Nullable byte[] marshallableMsgsBytes;
+
+ /** Constructor for {@link DiscoveryMessageFactory}. */
+ public TcpDiscoveryCollectionMessage() {
+ // No-op.
+ }
+
+ /** @param msgs Discovery messages to hold. */
+ public
TcpDiscoveryCollectionMessage(Collection<TcpDiscoveryAbstractMessage> msgs) {
+ if (F.isEmpty(msgs))
+ return;
+
+ // Keeps the original message order.
+ int idx = 0;
+
+ for (TcpDiscoveryAbstractMessage m : msgs) {
+ if (m instanceof Message) {
+ if (writableMsgs == null)
+ writableMsgs = U.newHashMap(msgs.size());
+
+ writableMsgs.put(idx++, (Message)m);
+
+ continue;
+ }
+
+ if (marshallableMsgs == null)
+ marshallableMsgs = U.newHashMap(msgs.size());
+
+ marshallableMsgs.put(idx++, m);
+ }
+ }
+
+ /** @param marsh marshaller. */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (marshallableMsgs != null)
+ marshallableMsgsBytes = U.marshal(marsh, marshallableMsgs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ if (marshallableMsgsBytes != null)
+ marshallableMsgs = U.unmarshal(marsh, marshallableMsgsBytes,
clsLdr);
+
+ marshallableMsgsBytes = null;
+ }
+
+ /**
+ * Gets pending messages sent to new node by its previous.
+ *
+ * @return Pending messages from previous node.
+ */
+ public Collection<TcpDiscoveryAbstractMessage> messages() {
+ if (F.isEmpty(writableMsgs) && F.isEmpty(marshallableMsgs))
+ return Collections.emptyList();
+
+ int totalSz = (F.isEmpty(writableMsgs) ? 0 : writableMsgs.size())
+ + (F.isEmpty(marshallableMsgs) ? 0 : marshallableMsgs.size());
+
+ List<TcpDiscoveryAbstractMessage> res = new ArrayList<>(totalSz);
+
+ for (int i = 0; i < totalSz; ++i) {
+ Message m = F.isEmpty(writableMsgs) ? null : writableMsgs.get(i);
+
+ if (m == null) {
+ TcpDiscoveryAbstractMessage adm = marshallableMsgs.get(i);
+
+ assert adm != null;
+
+ res.add(adm);
+ }
+ else {
+ assert marshallableMsgs == null || marshallableMsgs.get(i) ==
null;
+ assert m instanceof TcpDiscoveryAbstractMessage;
+
+ res.add((TcpDiscoveryAbstractMessage)m);
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -200;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryCollectionMessage.class, this, "super",
super.toString());
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index e4c6bddb008..bdb75e19783 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
@@ -58,16 +59,9 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
@Order(1)
DiscoveryDataPacket dataPacket;
- /** Pending messages from previous node. */
- private Collection<TcpDiscoveryAbstractMessage> msgs;
-
- /**
- * TODO: Use direct messages or a message container after
https://issues.apache.org/jira/browse/IGNITE-25883
- * Marshalled {@link #msgs}.
- */
+ /** Pending messages containner. */
@Order(2)
- @GridToStringExclude
- byte[] msgsBytes;
+ @Nullable TcpDiscoveryCollectionMessage pendingMsgsMsg;
/** Current topology. Initialized by coordinator. */
@GridToStringInclude
@@ -131,8 +125,7 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
node = msg.node;
nodeBytes = msg.nodeBytes;
- msgs = msg.msgs;
- msgsBytes = msg.msgsBytes;
+ pendingMsgsMsg = msg.pendingMsgsMsg;
top = msg.top;
topBytes = msg.topBytes;
clientTop = msg.clientTop;
@@ -156,8 +149,8 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
*
* @return Pending messages from previous node.
*/
- @Nullable public Collection<TcpDiscoveryAbstractMessage> messages() {
- return msgs;
+ public @Nullable Collection<TcpDiscoveryAbstractMessage> messages() {
+ return pendingMsgsMsg == null ? null : pendingMsgsMsg.messages();
}
/**
@@ -166,8 +159,7 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
* @param msgs Pending messages to send to new node.
*/
public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage>
msgs) {
- this.msgs = msgs;
- msgsBytes = null;
+ pendingMsgsMsg = F.isEmpty(msgs) ? null : new
TcpDiscoveryCollectionMessage(msgs);
}
/**
@@ -259,9 +251,6 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
if (node != null)
nodeBytes = U.marshal(marsh, node);
- if (msgs != null)
- msgsBytes = U.marshal(marsh, msgs);
-
if (top != null)
topBytes = U.marshal(marsh, top);
@@ -274,9 +263,6 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
if (nodeBytes != null)
node = U.unmarshal(marsh, nodeBytes, clsLdr);
- if (msgsBytes != null)
- msgs = U.unmarshal(marsh, msgsBytes, clsLdr);
-
if (topBytes != null)
top = U.unmarshal(marsh, topBytes, clsLdr);
@@ -286,7 +272,6 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
nodeBytes = null;
topBytes = null;
topHistBytes = null;
- msgsBytes = null;
}