This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8a26b182415 IGNITE-27899 : Make TcpDiscoveryNode implement message v2
(#12955)
8a26b182415 is described below
commit 8a26b182415f0415f0f03094c54cb4c611e9a1c0
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Mar 30 19:57:41 2026 +0300
IGNITE-27899 : Make TcpDiscoveryNode implement message v2 (#12955)
---
.../internal/MessageSerializerGenerator.java | 2 +-
.../ignite/internal/ClusterMetricsSnapshot.java | 3 +
.../discovery/DiscoveryMessageFactory.java | 6 ++
.../managers/discovery/GridDiscoveryManager.java | 13 +---
.../apache/ignite/lang/IgniteProductVersion.java | 30 +++++---
.../ignite/spi/discovery/tcp/ClientImpl.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 34 +++++++--
.../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 26 +++----
.../discovery/tcp/internal/TcpDiscoveryNode.java | 82 ++++++++++++++++++----
.../messages/TcpDiscoveryJoinRequestMessage.java | 23 +-----
.../TcpDiscoveryNodeAddFinishedMessage.java | 3 +-
.../tcp/messages/TcpDiscoveryNodeAddedMessage.java | 65 +++--------------
.../affinity/GridAffinityAssignmentV2Test.java | 9 ++-
.../TcpDiscoveryDeadNodeAddressResolvingTest.java | 7 +-
.../junits/multijvm/IgniteProcessProxy.java | 2 +-
15 files changed, 172 insertions(+), 135 deletions(-)
diff --git
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
index fef259eec6a..893a1ccaa48 100644
---
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
+++
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
@@ -724,7 +724,7 @@ public class MessageSerializerGenerator {
String descName = field.getSimpleName() + "CollDesc";
String typeName = desc.substring(desc.indexOf(' ') + 1,
desc.indexOf('('));
- fields.add("private final static " + typeName + " " + descName + " = "
+ desc + ";");
+ fields.add("private static final " + typeName + " " + descName + " = "
+ desc + ";");
return descName;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
index 60484f3280a..4d2795abffa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
@@ -109,6 +109,9 @@ public class ClusterMetricsSnapshot implements
ClusterMetrics {
* Creates snapshot based on the handled message.
*/
public ClusterMetricsSnapshot(NodeMetricsMessage m) {
+ // As in #deserialize().
+ m.lastUpdateTime = U.currentTimeMillis();
+
this.m = m;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 52c1091d054..a61c994b7fb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -129,6 +129,8 @@ import
org.apache.ignite.internal.util.distributed.FullMessage;
import org.apache.ignite.internal.util.distributed.FullMessageSerializer;
import org.apache.ignite.internal.util.distributed.InitMessage;
import org.apache.ignite.internal.util.distributed.InitMessageSerializer;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteProductVersionSerializer;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -136,6 +138,8 @@ import
org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDisc
import
org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDiscoveryMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import
org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacketSerializer;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import
org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNodeMarshallableSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
@@ -223,6 +227,8 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register(-200, TcpDiscoveryCollectionMessage::new,
new TcpDiscoveryCollectionMessageMarshallableSerializer(marsh,
clsLdr));
+ factory.register(-117, TcpDiscoveryNode::new, new
TcpDiscoveryNodeMarshallableSerializer(marsh, clsLdr));
+ factory.register(-116, IgniteProductVersion::new, new
IgniteProductVersionSerializer());
factory.register(-115, SchemaAlterTableAddColumnOperation::new, new
SchemaAlterTableAddColumnOperationSerializer());
factory.register(-114, SchemaIndexCreateOperation::new, new
SchemaIndexCreateOperationMarshallableSerializer(marsh, clsLdr));
factory.register(-113, SchemaIndexDropOperation::new, new
SchemaIndexDropOperationSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 8c5564616d4..91825fd876a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -363,7 +363,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
*/
public void addCacheGroup(CacheGroupDescriptor grpDesc,
IgnitePredicate<ClusterNode> filter, CacheMode cacheMode) {
CacheGroupAffinity old = registeredCacheGrps.put(grpDesc.groupId(),
- new CacheGroupAffinity(grpDesc.cacheOrGroupName(), filter,
cacheMode, grpDesc.persistenceEnabled()));
+ new CacheGroupAffinity(filter, cacheMode,
grpDesc.persistenceEnabled()));
assert old == null : old;
}
@@ -3389,9 +3389,6 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
*
*/
private static class CacheGroupAffinity {
- /** */
- private final String name;
-
/** Nodes filter. */
private final IgnitePredicate<ClusterNode> cacheFilter;
@@ -3402,17 +3399,11 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
private final boolean persistentCacheGrp;
/**
- * @param name Name.
* @param cacheFilter Node filter.
* @param cacheMode Cache mode.
* @param persistentCacheGrp Persistence is configured for cache or
not.
*/
- CacheGroupAffinity(
- String name,
- IgnitePredicate<ClusterNode> cacheFilter,
- CacheMode cacheMode,
- boolean persistentCacheGrp) {
- this.name = name;
+ CacheGroupAffinity(IgnitePredicate<ClusterNode> cacheFilter, CacheMode
cacheMode, boolean persistentCacheGrp) {
this.cacheFilter = cacheFilter;
this.cacheMode = cacheMode;
this.persistentCacheGrp = persistentCacheGrp;
diff --git
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java
index 1c78694550b..b8c02b60005 100644
---
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java
+++
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java
@@ -26,7 +26,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteVersionUtils;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.NotNull;
/**
@@ -37,7 +40,7 @@ import org.jetbrains.annotations.NotNull;
* Two versions are compared in the following order: major number,
* minor number, maintenance number, revision timestamp.
*/
-public class IgniteProductVersion implements Comparable<IgniteProductVersion>,
Externalizable {
+public class IgniteProductVersion implements Comparable<IgniteProductVersion>,
Externalizable, Message {
/** */
private static final long serialVersionUID = 0L;
@@ -52,25 +55,31 @@ public class IgniteProductVersion implements
Comparable<IgniteProductVersion>, E
Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)([-.]([^0123456789][^-]+)(-SNAPSHOT)?)?(-(\\d+))?(-([\\da-f]+))?");
/** Major version number. */
- private byte major;
+ @Order(0)
+ byte major;
/** Minor version number. */
- private byte minor;
+ @Order(1)
+ byte minor;
/** Maintenance version number. */
- private byte maintenance;
+ @Order(2)
+ byte maintenance;
/** Stage of development. */
- private String stage;
+ @Order(3)
+ String stage;
/** Revision timestamp. */
- private long revTs;
+ @Order(4)
+ long revTs;
/** Revision hash. */
- private byte[] revHash;
+ @Order(5)
+ byte[] revHash;
/**
- * Empty constructor required by {@link Externalizable}.
+ * Empty constructor required by {@link Externalizable} and {@link
DiscoveryMessageFactory}.
*/
public IgniteProductVersion() {
// No-op.
@@ -265,6 +274,11 @@ public class IgniteProductVersion implements
Comparable<IgniteProductVersion>, E
revHash = U.readByteArray(in);
}
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -116;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revTs *
1000);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 02f56983d41..0fce9fac5bb 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -2240,7 +2240,7 @@ class ClientImpl extends TcpDiscoveryImpl {
nodeAdded = true;
if (msg.topologyHistory() != null)
- topHist.putAll(msg.topologyHistory());
+ topHist.putAll(upcast(msg.topologyHistory()));
}
else {
if (log.isDebugEnabled())
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f5495299899..ac6218aa32b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -643,12 +643,11 @@ class ServerImpl extends TcpDiscoveryImpl {
processed.add(n);
- List<ClusterNode> top = U.arrayList(nodes,
F.notIn(processed));
+ Collection<ClusterNode> top = upcast(U.arrayList(nodes,
F.notIn(processed)));
topVer++;
- NavigableMap<Long, Collection<ClusterNode>> hist =
updateTopologyHistory(topVer,
- Collections.unmodifiableList(top));
+ NavigableMap<Long, Collection<ClusterNode>> hist =
updateTopologyHistory(topVer, top);
lsnr.onDiscovery(
new DiscoveryNotification(EVT_NODE_FAILED, topVer, n,
top, hist, null, null)
@@ -1907,7 +1906,7 @@ class ServerImpl extends TcpDiscoveryImpl {
hist = new TreeMap<>(topHist);
}
- nodeAddedMsg.topologyHistory(hist);
+ nodeAddedMsg.topologyHistory(downcast(hist));
}
}
}
@@ -2451,6 +2450,31 @@ class ServerImpl extends TcpDiscoveryImpl {
return spi.ignite() instanceof IgniteEx ?
((IgniteEx)spi.ignite()).context().workersRegistry() : null;
}
+ /**
+ * Upcasts collection type.
+ *
+ * @param <P> Parent type.
+ * @param <C> Child type.
+ * @param c Initial collection.
+ * @return Resulting collection.
+ */
+ private static <P, C extends P> Collection<P> upcast(Collection<C> c) {
+ return (Collection<P>)c;
+ }
+
+ /**
+ * Downcasts type of map's collection value.
+ *
+ * @param <K> Map key type.
+ * @param <P> Parent type.
+ * @param <C> Child type.
+ * @param m Initial collections map.
+ * @return Resulting map.
+ */
+ private static <K, P, C extends P> Map<K, Collection<C>> downcast(Map<K,
Collection<P>> m) {
+ return (Map<K, Collection<C>>)(Map)m;
+ }
+
/**
* Discovery messages history used for client reconnect.
*/
@@ -5086,7 +5110,7 @@ class ServerImpl extends TcpDiscoveryImpl {
joiningNodesDiscoDataList = new ArrayList<>();
topHist.clear();
- topHist.putAll(msg.topologyHistory());
+ topHist.putAll(upcast(msg.topologyHistory()));
pendingMsgs.reset(msg.messages());
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e115a3cca03..c18760dcb56 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -43,7 +43,6 @@ import
org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.IgniteSpiContext;
@@ -140,18 +139,6 @@ abstract class TcpDiscoveryImpl {
/** Tracing. */
protected Tracing tracing;
- /**
- * Upcasts collection type.
- *
- * @param c Initial collection.
- * @return Resulting collection.
- */
- protected static <T extends R, R> Collection<R> upcast(Collection<T> c) {
- A.notNull(c, "c");
-
- return (Collection<R>)c;
- }
-
/**
* @param spi Adapter.
*/
@@ -495,6 +482,19 @@ abstract class TcpDiscoveryImpl {
return msg.traceLogLevel() ? traceLog : debugLog;
}
+ /**
+ * Upcasts type of map's collection value.
+ *
+ * @param <K> Map key type.
+ * @param <P> Parent type.
+ * @param <C> Child type.
+ * @param m Initial map of collections.
+ * @return Resulting map.
+ */
+ protected static <K, P, C extends P> Map<K, Collection<P>> upcast(Map<K,
Collection<C>> m) {
+ return (Map<K, Collection<P>>)(Map)m;
+ }
+
/**
*
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 89f1f492e92..0630144dc75 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -30,11 +30,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -44,8 +46,11 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
@@ -58,27 +63,38 @@ import static
org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes;
* <tt>public</tt> due to certain limitations of Java technology.
*/
public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements
IgniteClusterNode,
- Comparable<TcpDiscoveryNode>, Externalizable {
+ Comparable<TcpDiscoveryNode>, Externalizable, MarshallableMessage {
/** */
private static final long serialVersionUID = 0L;
/** Node ID. */
- private volatile UUID id;
+ @Order(0)
+ volatile UUID id;
/** Consistent ID. */
@GridToStringInclude
private Object consistentId;
+ /** Serialized {@link #consistentId}. */
+ @Order(1)
+ byte[] consistentIdBytes;
+
/** Node attributes. */
@GridToStringExclude
private Map<String, Object> attrs;
+ /** Serialized {@link #attrs}. */
+ @Order(2)
+ byte[] attrsBytes;
+
/** Internal discovery addresses as strings. */
@GridToStringInclude
- private Collection<String> addrs;
+ @Order(3)
+ Collection<String> addrs;
/** Internal discovery host names as strings. */
- private Collection<String> hostNames;
+ @Order(4)
+ Collection<String> hostNames;
/** */
@GridToStringInclude
@@ -86,21 +102,29 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
/** */
@GridToStringInclude
- private int discPort;
+ @Order(5)
+ int discPort;
/** Node metrics. */
@GridToStringExclude
- private volatile ClusterMetrics metrics;
+ volatile ClusterMetrics metrics;
+
+ /** Node metrics message. */
+ @GridToStringExclude
+ @Order(6)
+ volatile TcpDiscoveryNodeMetricsMessage metricsMsg;
/** Node cache metrics. */
@GridToStringExclude
private volatile Map<Integer, CacheMetrics> cacheMetrics;
/** Node order in the topology. */
- private volatile long order;
+ @Order(7)
+ volatile long order;
/** Node order in the topology (internal). */
- private volatile long intOrder;
+ @Order(8)
+ volatile long intOrder;
/** The most recent time when metrics update message was received from the
node. */
@GridToStringExclude
@@ -122,7 +146,8 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
private boolean loc;
/** Version. */
- private IgniteProductVersion ver;
+ @Order(9)
+ IgniteProductVersion ver;
/** Alive check time (used by clients). */
@GridToStringExclude
@@ -130,7 +155,8 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
/** Client router node ID. */
@GridToStringExclude
- private UUID clientRouterNodeId;
+ @Order(10)
+ UUID clientRouterNodeId;
/** */
@GridToStringExclude
@@ -193,6 +219,38 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
sockAddrs = U.toSocketAddresses(this, discPort);
}
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (attrs != null)
+ attrsBytes = U.marshal(marsh, attrs);
+
+ if (consistentId != null)
+ consistentIdBytes = U.marshal(marsh, consistentId);
+
+ metricsMsg = new TcpDiscoveryNodeMetricsMessage(metrics);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ if (attrsBytes != null)
+ attrs = U.unmarshal(marsh, attrsBytes, clsLdr);
+
+ if (consistentIdBytes != null)
+ consistentId = U.unmarshal(marsh, consistentIdBytes, clsLdr);
+
+ if (metricsMsg != null)
+ metrics = new ClusterMetricsSnapshot(metricsMsg);
+
+ attrsBytes = null;
+ consistentIdBytes = null;
+ metricsMsg = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -117;
+ }
+
/**
* @return Last successfully connected address.
*/
@@ -304,7 +362,7 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
/** {@inheritDoc} */
@Override public void setCacheMetrics(Map<Integer, CacheMetrics>
cacheMetrics) {
- this.cacheMetrics = cacheMetrics != null ? cacheMetrics :
Collections.<Integer, CacheMetrics>emptyMap();
+ this.cacheMetrics = cacheMetrics != null ? cacheMetrics :
Collections.emptyMap();
}
/**
@@ -463,7 +521,7 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
/** {@inheritDoc} */
@Override public boolean isClient() {
if (!cacheCliInit) {
- Boolean clientModeAttr =
((ClusterNode)this).attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
+ Boolean clientModeAttr =
(Boolean)attrs.get(IgniteNodeAttributes.ATTR_CLIENT_MODE);
cacheCli = clientModeAttr != null && clientModeAttr;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 2b6bffa4b4f..361ced45a9f 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -17,12 +17,8 @@
package org.apache.ignite.spi.discovery.tcp.messages;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -32,17 +28,13 @@ import static
org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes;
* Initial message sent by a node that wants to enter topology.
* Sent to random node during SPI start. Then forwarded directly to
coordinator.
*/
-public class TcpDiscoveryJoinRequestMessage extends
TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage {
+public class TcpDiscoveryJoinRequestMessage extends
TcpDiscoveryAbstractTraceableMessage {
/** */
private static final long serialVersionUID = 0L;
/** New node that wants to join the topology. */
- private TcpDiscoveryNode node;
-
- /** Serialized {@link #node}. */
- // TODO Remove the field after completing
https://issues.apache.org/jira/browse/IGNITE-27899.
@Order(0)
- byte[] nodeBytes;
+ TcpDiscoveryNode node;
/** Discovery data container. */
@Order(1)
@@ -94,17 +86,6 @@ public class TcpDiscoveryJoinRequestMessage extends
TcpDiscoveryAbstractTraceabl
setFlag(RESPONDED_FLAG_POS, responded);
}
- /** {@inheritDoc} */
- @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
- if (node != null)
- nodeBytes = U.marshal(marsh, node);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
- if (nodeBytes != null)
- node = U.unmarshal(marsh, nodeBytes, clsLdr);
- }
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 9c0f7a1704b..ac56920d1f9 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -25,7 +25,6 @@ import
org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.jetbrains.annotations.Nullable;
@@ -34,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
*/
@TcpDiscoveryEnsureDelivery
@TcpDiscoveryRedirectToClient
-public class TcpDiscoveryNodeAddFinishedMessage extends
TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage {
+public class TcpDiscoveryNodeAddFinishedMessage extends
TcpDiscoveryAbstractTraceableMessage {
/** */
private static final long serialVersionUID = 0L;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index ffe8a1bf412..ce375a30109 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -20,40 +20,29 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;
/**
- * TODO: Use NodeMessage for {@link TcpDiscoveryNode} and {@link ClusterNode}
after https://issues.apache.org/jira/browse/IGNITE-27899
* Message telling nodes that new node should be added to topology.
* When newly added node receives the message it connects to its next and
finishes
* join process.
*/
@TcpDiscoveryEnsureDelivery
@TcpDiscoveryRedirectToClient
-public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage {
+public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableMessage {
/** */
private static final long serialVersionUID = 0L;
/** Added node. */
- private TcpDiscoveryNode node;
-
- /** Marshalled {@link #node}. */
@Order(0)
- @GridToStringExclude
- byte[] nodeBytes;
+ TcpDiscoveryNode node;
/** */
@Order(1)
@@ -65,24 +54,16 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
/** Current topology. Initialized by coordinator. */
@GridToStringInclude
- private Collection<TcpDiscoveryNode> top;
-
- /** Marshalled {@link #top}. */
@Order(3)
- @GridToStringExclude
- @Nullable byte[] topBytes;
+ @Nullable Collection<TcpDiscoveryNode> top;
/** */
@GridToStringInclude
private transient Collection<TcpDiscoveryNode> clientTop;
/** Topology snapshots history. */
- private Map<Long, Collection<ClusterNode>> topHist;
-
- /** Marshalled {@link #topHist}. */
@Order(4)
- @GridToStringExclude
- @Nullable byte[] topHistBytes;
+ Map<Long, Collection<TcpDiscoveryNode>> topHist;
/** Start time of the first grid node. */
@Order(5)
@@ -124,13 +105,10 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
super(msg);
node = msg.node;
- nodeBytes = msg.nodeBytes;
pendingMsgsMsg = msg.pendingMsgsMsg;
top = msg.top;
- topBytes = msg.topBytes;
clientTop = msg.clientTop;
topHist = msg.topHist;
- topHistBytes = msg.topHistBytes;
dataPacket = msg.dataPacket;
gridStartTime = msg.gridStartTime;
}
@@ -178,7 +156,6 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
*/
public void topology(@Nullable Collection<TcpDiscoveryNode> top) {
this.top = top;
- topBytes = null;
}
/**
@@ -202,7 +179,7 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
*
* @return Map with topology snapshots history.
*/
- public Map<Long, Collection<ClusterNode>> topologyHistory() {
+ public Map<Long, Collection<TcpDiscoveryNode>> topologyHistory() {
return topHist;
}
@@ -211,9 +188,8 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
*
* @param topHist Map with topology snapshots history.
*/
- public void topologyHistory(@Nullable Map<Long, Collection<ClusterNode>>
topHist) {
+ public void topologyHistory(@Nullable Map<Long,
Collection<TcpDiscoveryNode>> topHist) {
this.topHist = topHist;
- topHistBytes = null;
}
/**
@@ -246,36 +222,11 @@ public class TcpDiscoveryNodeAddedMessage extends
TcpDiscoveryAbstractTraceableM
return gridStartTime;
}
- /** @param marsh marshaller. */
- @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
- if (node != null)
- nodeBytes = U.marshal(marsh, node);
-
- if (top != null)
- topBytes = U.marshal(marsh, top);
-
- if (topHist != null)
- topHistBytes = U.marshal(marsh, topHist);
- }
-
/** {@inheritDoc} */
- @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
- if (nodeBytes != null)
- node = U.unmarshal(marsh, nodeBytes, clsLdr);
-
- if (topBytes != null)
- top = U.unmarshal(marsh, topBytes, clsLdr);
-
- if (topHistBytes != null)
- topHist = U.unmarshal(marsh, topHistBytes, clsLdr);
-
- nodeBytes = null;
- topBytes = null;
- topHistBytes = null;
+ @Override public short directType() {
+ return 29;
}
-
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super",
super.toString());
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
index 5d8e966851c..f994cee551f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
@@ -29,7 +29,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+
+import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.util.collection.BitSetIntSet;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteProductVersion;
@@ -49,7 +52,11 @@ import static org.junit.Assert.fail;
*/
public class GridAffinityAssignmentV2Test {
/** */
- protected DiscoveryMetricsProvider metrics = new
SerializableMetricsProvider();
+ protected DiscoveryMetricsProvider metrics = new
SerializableMetricsProvider() {
+ @Override public ClusterMetrics metrics() {
+ return new ClusterMetricsSnapshot();
+ }
+ };
/** */
protected IgniteProductVersion ver = new IgniteProductVersion();
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryDeadNodeAddressResolvingTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryDeadNodeAddressResolvingTest.java
index e07aee83326..ddd01e7c664 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryDeadNodeAddressResolvingTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryDeadNodeAddressResolvingTest.java
@@ -100,8 +100,11 @@ public class TcpDiscoveryDeadNodeAddressResolvingTest
extends GridCommonAbstract
ClusterNode clusterNode = node.get();
- Object sockAddrs = GridTestUtils.getFieldValue(clusterNode,
"sockAddrs");
- assertNull(sockAddrs);
+ if (clusterNode instanceof TcpDiscoveryNode) {
+ Object sockAddrs = GridTestUtils.getFieldValue(clusterNode,
"sockAddrs");
+
+ assertNull(sockAddrs);
+ }
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index 7ccf15f6773..caed628d1d6 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -106,7 +106,7 @@ import static org.junit.Assert.fail;
@SuppressWarnings("TransientFieldInNonSerializableClass")
public class IgniteProcessProxy implements IgniteEx {
/** Grid proxies. */
- private static final transient ConcurrentMap<String, IgniteProcessProxy>
gridProxies = new ConcurrentHashMap<>();
+ private static final ConcurrentMap<String, IgniteProcessProxy> gridProxies
= new ConcurrentHashMap<>();
/** Property that specify alternative {@code JAVA_HOME}. */
private static final String TEST_MULTIJVM_JAVA_HOME =
"test.multijvm.java.home";