This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new eec735a IGNITE-12576: nodeId replaced with consistentId in
TcpCommunicationMetricsListener. (#7310)
eec735a is described below
commit eec735a4ea39439d57a1da89b5cc17bfbff2a74c
Author: Ivan Bessonov <[email protected]>
AuthorDate: Thu Jan 30 11:24:12 2020 +0300
IGNITE-12576: nodeId replaced with consistentId in
TcpCommunicationMetricsListener. (#7310)
---
.../tcp/TcpCommunicationMetricsListener.java | 111 ++++++++++++---------
.../spi/communication/tcp/TcpCommunicationSpi.java | 58 +++++++----
.../TcpCommunicationConnectionCheckFuture.java | 17 ++--
...idTcpCommunicationSpiMultithreadedSelfTest.java | 2 +-
.../tcp/TcpCommunicationStatisticsTest.java | 18 ++--
5 files changed, 122 insertions(+), 84 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index 504cbb4d..d89fe2c 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -24,6 +24,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
@@ -33,6 +35,7 @@ import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.Metric;
import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
+import static java.util.stream.Collectors.toMap;
import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.SEPARATOR;
import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static
org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_DESC;
@@ -40,14 +43,14 @@ import static
org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_M
import static
org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_DESC;
import static
org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_NAME;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME;
-import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC;
-import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME;
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC;
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_DESC;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_NAME;
-import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_DESC;
-import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_NAME;
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC;
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_DESC;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_NAME;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_METRIC_DESC;
@@ -63,6 +66,9 @@ class TcpCommunicationMetricsListener {
/** Metrics registry. */
private final org.apache.ignite.internal.processors.metric.MetricRegistry
mreg;
+ /** Current ignite instance. */
+ private final Ignite ignite;
+
/** All registered metrics. */
private final Set<ThreadMetrics> allMetrics =
Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -81,11 +87,11 @@ class TcpCommunicationMetricsListener {
/** Function to be used in {@link Map#computeIfAbsent(Object, Function)}
of {@code rcvdMsgsMetricsByType}. */
private final Function<Short, LongAdderMetric>
rcvdMsgsCntByTypeMetricFactory;
- /** Function to be used in {@link Map#computeIfAbsent(Object, Function)}
of {@code sentMsgsMetricsByNodeId}. */
- private final Function<UUID, LongAdderMetric>
sentMsgsCntByNodeIdMetricFactory;
+ /** Function to be used in {@link Map#computeIfAbsent(Object, Function)}
of {@code #sentMsgsMetricsByConsistentId}. */
+ private final Function<Object, LongAdderMetric>
sentMsgsCntByConsistentIdMetricFactory;
- /** Function to be used in {@link Map#computeIfAbsent(Object, Function)}
of {@code rcvdMsgsMetricsByNodeId}. */
- private final Function<UUID, LongAdderMetric>
rcvdMsgsCntByNodeIdMetricFactory;
+ /** Function to be used in {@link Map#computeIfAbsent(Object, Function)}
of {@code #rcvdMsgsMetricsByConsistentId}. */
+ private final Function<Object, LongAdderMetric>
rcvdMsgsCntByConsistentIdMetricFactory;
/** Sent bytes count metric.*/
private final LongAdderMetric sentBytesMetric;
@@ -106,8 +112,9 @@ class TcpCommunicationMetricsListener {
private volatile Map<Short, String> msgTypMap;
/** */
- public TcpCommunicationMetricsListener(GridMetricManager mmgr) {
+ public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite
ignite) {
this.mmgr = mmgr;
+ this.ignite = ignite;
mreg = mmgr.registry(COMMUNICATION_METRICS_GROUP_NAME);
@@ -120,13 +127,13 @@ class TcpCommunicationMetricsListener {
RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
);
- sentMsgsCntByNodeIdMetricFactory = nodeId ->
- mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME,
nodeId.toString()))
- .findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME);
+ sentMsgsCntByConsistentIdMetricFactory = consistentId ->
+ mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME,
consistentId.toString()))
+ .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
- rcvdMsgsCntByNodeIdMetricFactory = nodeId ->
- mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME,
nodeId.toString()))
- .findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME);
+ rcvdMsgsCntByConsistentIdMetricFactory = consistentId ->
+ mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME,
consistentId.toString()))
+
.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
sentBytesMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME,
SENT_BYTES_METRIC_DESC);
rcvdBytesMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME,
RECEIVED_BYTES_METRIC_DESC);
@@ -139,9 +146,9 @@ class TcpCommunicationMetricsListener {
if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME +
SEPARATOR))
return;
-
((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME,
SENT_MESSAGES_BY_NODE_ID_METRIC_DESC);
+
((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
-
((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME,
RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC);
+
((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
});
}
@@ -154,11 +161,11 @@ class TcpCommunicationMetricsListener {
* Collects statistics for message sent by SPI.
*
* @param msg Sent message.
- * @param nodeId Receiver node id.
+ * @param consistentId Receiver node consistent id.
*/
- public void onMessageSent(Message msg, UUID nodeId) {
+ public void onMessageSent(Message msg, Object consistentId) {
assert msg != null;
- assert nodeId != null;
+ assert consistentId != null;
if (msg instanceof GridIoMessage) {
msg = ((GridIoMessage) msg).message();
@@ -167,7 +174,7 @@ class TcpCommunicationMetricsListener {
sentMsgsMetric.increment();
- threadMetrics.get().onMessageSent(msg, nodeId);
+ threadMetrics.get().onMessageSent(msg, consistentId);
}
}
@@ -175,11 +182,11 @@ class TcpCommunicationMetricsListener {
* Collects statistics for message received by SPI.
*
* @param msg Received message.
- * @param nodeId Sender node id.
+ * @param consistentId Sender node consistent id.
*/
- public void onMessageReceived(Message msg, UUID nodeId) {
+ public void onMessageReceived(Message msg, Object consistentId) {
assert msg != null;
- assert nodeId != null;
+ assert consistentId != null;
if (msg instanceof GridIoMessage) {
msg = ((GridIoMessage) msg).message();
@@ -188,7 +195,7 @@ class TcpCommunicationMetricsListener {
rcvdMsgsMetric.increment();
- threadMetrics.get().onMessageReceived(msg, nodeId);
+ threadMetrics.get().onMessageReceived(msg, consistentId);
}
}
@@ -247,7 +254,7 @@ class TcpCommunicationMetricsListener {
* @return Map containing sender nodes and respective counts.
*/
public Map<UUID, Long> receivedMessagesByNode() {
- return
collectMessagesCountByNodeId(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME);
+ return
collectMessagesCountByNodeId(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
}
/**
@@ -265,7 +272,7 @@ class TcpCommunicationMetricsListener {
* @return Map containing receiver nodes and respective counts.
*/
public Map<UUID, Long> sentMessagesByNode() {
- return
collectMessagesCountByNodeId(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME);
+ return
collectMessagesCountByNodeId(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
}
/** */
@@ -296,13 +303,20 @@ class TcpCommunicationMetricsListener {
protected Map<UUID, Long> collectMessagesCountByNodeId(String metricName) {
Map<UUID, Long> res = new HashMap<>();
+ Map<String, UUID> nodesMapping =
ignite.cluster().nodes().stream().collect(toMap(
+ node -> node.consistentId().toString(), ClusterNode::id
+ ));
+
String mregPrefix = COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR;
for (ReadOnlyMetricRegistry mreg : mmgr) {
if (mreg.name().startsWith(mregPrefix)) {
- String nodeIdStr = mreg.name().substring(mregPrefix.length());
+ String nodeConsIdStr =
mreg.name().substring(mregPrefix.length());
+
+ UUID nodeId = nodesMapping.get(nodeConsIdStr);
- UUID nodeId = UUID.fromString(nodeIdStr);
+ if (nodeId == null)
+ continue;
res.put(nodeId,
mreg.<LongMetric>findMetric(metricName).value());
}
@@ -330,23 +344,26 @@ class TcpCommunicationMetricsListener {
for (ReadOnlyMetricRegistry mreg : mmgr) {
if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME +
SEPARATOR)) {
- mreg.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME).reset();
+
mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
-
mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME).reset();
+
mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
}
}
}
/**
- * @param nodeId Left node id.
+ * @param consistentId Consistent id of the node.
*/
- public void onNodeLeft(UUID nodeId) {
+ public void onNodeLeft(Object consistentId) {
+ // Tricky part - these maps are not thread-safe. Ideally it's only
required to delete one entry from each one
+ // of them, but this would lead to syncs in communication worker
threads. Instead, we just "clean" them so they
+ // will be filled later lazily with the same data.
for (ThreadMetrics threadMetrics : allMetrics) {
- threadMetrics.rcvdMsgsMetricsByNodeId = new HashMap<>();
- threadMetrics.sentMsgsMetricsByNodeId = new HashMap<>();
+ threadMetrics.sentMsgsMetricsByConsistentId = new HashMap<>();
+ threadMetrics.rcvdMsgsMetricsByConsistentId = new HashMap<>();
}
- mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME,
nodeId.toString()));
+ mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME,
consistentId.toString()));
}
/**
@@ -402,38 +419,36 @@ class TcpCommunicationMetricsListener {
private final Map<Short, LongAdderMetric> rcvdMsgsMetricsByType = new
HashMap<>();
/**
- * Sent messages count metrics grouped by message node id.
+ * Sent messages count metrics grouped by message node consistent id.
*/
- public volatile Map<UUID, LongAdderMetric> sentMsgsMetricsByNodeId =
new HashMap<>();
+ public volatile Map<Object, LongAdderMetric>
sentMsgsMetricsByConsistentId = new HashMap<>();
/**
- * Received messages metrics count grouped by message node id.
+ * Received messages metrics count grouped by message node consistent
id.
*/
- public volatile Map<UUID, LongAdderMetric> rcvdMsgsMetricsByNodeId =
new HashMap<>();
+ public volatile Map<Object, LongAdderMetric>
rcvdMsgsMetricsByConsistentId = new HashMap<>();
/**
* Collects statistics for message sent by SPI.
- *
* @param msg Sent message.
- * @param nodeId Receiver node id.
+ * @param consistentId Receiver node consistent id.
*/
- private void onMessageSent(Message msg, UUID nodeId) {
+ private void onMessageSent(Message msg, Object consistentId) {
sentMsgsMetricsByType.computeIfAbsent(msg.directType(),
sentMsgsCntByTypeMetricFactory).increment();
- sentMsgsMetricsByNodeId.computeIfAbsent(nodeId,
sentMsgsCntByNodeIdMetricFactory).increment();
+ sentMsgsMetricsByConsistentId.computeIfAbsent(consistentId,
sentMsgsCntByConsistentIdMetricFactory).increment();
}
/**
* Collects statistics for message received by SPI.
- *
* @param msg Received message.
- * @param nodeId Sender node id.
+ * @param consistentId Sender node consistent id.
*/
- private void onMessageReceived(Message msg, UUID nodeId) {
+ private void onMessageReceived(Message msg, Object consistentId) {
rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(),
rcvdMsgsCntByTypeMetricFactory).increment();
- rcvdMsgsMetricsByNodeId.computeIfAbsent(nodeId,
rcvdMsgsCntByNodeIdMetricFactory).increment();
+ rcvdMsgsMetricsByConsistentId.computeIfAbsent(consistentId,
rcvdMsgsCntByConsistentIdMetricFactory).increment();
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1f51879..ce324ca 100755
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -357,6 +357,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
/** Connection index meta for session. */
public static final int CONN_IDX_META =
GridNioSessionMetaKey.nextUniqueKey();
+ /** Node consistent id meta for session. */
+ public static final int CONSISTENT_ID_META =
GridNioSessionMetaKey.nextUniqueKey();
+
/** Message tracker meta for session. */
private static final int TRACKER_META =
GridNioSessionMetaKey.nextUniqueKey();
@@ -421,25 +424,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
public static final String SENT_MESSAGES_BY_TYPE_METRIC_NAME =
"sentMessagesByType";
/** */
- public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC = "Total
number of messages with given type sent by current node";
+ public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC =
+ "Total number of messages with given type sent by current node";
/** */
public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME =
"receivedMessagesByType";
/** */
- public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC = "Total
number of messages with given type received by current node";
+ public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC =
+ "Total number of messages with given type received by current node";
/** */
- public static final String SENT_MESSAGES_BY_NODE_ID_METRIC_NAME =
"sentMessagesToNode";
+ public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME
= "sentMessagesToNode";
/** */
- public static final String SENT_MESSAGES_BY_NODE_ID_METRIC_DESC = "Total
number of messages sent by current node to the given node";
+ public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
=
+ "Total number of messages sent by current node to the given node";
/** */
- public static final String RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME =
"receivedMessagesFromNode";
+ public static final String
RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME =
"receivedMessagesFromNode";
/** */
- public static final String RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC =
"Total number of messages received by current node from the given node";
+ public static final String
RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC =
+ "Total number of messages received by current node from the given
node";
/** */
private ConnectGateway connectGate;
@@ -623,6 +630,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
return;
}
+ ses.addMeta(CONSISTENT_ID_META, rmtNode.consistentId());
+
final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
assert old == null;
@@ -786,13 +795,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
}
@Override public void onMessageSent(GridNioSession ses, Message
msg) {
- ConnectionKey connKey = ses.meta(CONN_IDX_META);
+ Object consistentId = ses.meta(CONSISTENT_ID_META);
- if (connKey != null) {
- UUID nodeId = connKey.nodeId();
-
- metricsLsnr.onMessageSent(msg, nodeId);
- }
+ if (consistentId != null)
+ metricsLsnr.onMessageSent(msg, consistentId);
}
private void onChannelCreate(
@@ -860,6 +866,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
}
}
else {
+ Object consistentId = ses.meta(CONSISTENT_ID_META);
+
+ assert consistentId != null;
+
if (isChannelConnIdx(connKey.connectionIndex())) {
if (ses.meta(CHANNEL_FUT_META) == null)
onChannelCreate((GridSelectorNioSessionImpl)ses,
connKey, msg);
@@ -884,7 +894,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
}
if (msg instanceof RecoveryLastReceivedMessage) {
- metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+ metricsLsnr.onMessageReceived(msg, consistentId);
GridNioRecoveryDescriptor recovery =
ses.outRecoveryDescriptor();
@@ -935,7 +945,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
}
}
- metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+ metricsLsnr.onMessageReceived(msg, consistentId);
IgniteRunnable c;
@@ -1393,7 +1403,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
@MetricManagerResource
private void injectMetricManager(GridMetricManager mmgr) {
if (mmgr != null)
- metricsLsnr = new TcpCommunicationMetricsListener(mmgr);
+ metricsLsnr = new TcpCommunicationMetricsListener(mmgr, ignite);
}
/**
@@ -2767,12 +2777,13 @@ public class TcpCommunicationSpi extends
IgniteSpiAdapter implements Communicati
}
/**
+ * @param consistentId Consistent id of the node.
* @param nodeId Left node ID.
*/
- void onNodeLeft(UUID nodeId) {
+ void onNodeLeft(Object consistentId, UUID nodeId) {
assert nodeId != null;
- metricsLsnr.onNodeLeft(nodeId);
+ metricsLsnr.onNodeLeft(consistentId);
GridCommunicationClient[] clients0 = clients.remove(nodeId);
@@ -3623,6 +3634,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
recoveryDesc.onHandshake(rcvCnt);
+ meta.put(CONSISTENT_ID_META, node.consistentId());
meta.put(CONN_IDX_META, connKey);
meta.put(GridNioServer.RECOVERY_DESC_META_KEY,
recoveryDesc);
@@ -4492,7 +4504,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
assert evt instanceof DiscoveryEvent : evt;
assert evt.type() == EVT_NODE_LEFT || evt.type() ==
EVT_NODE_FAILED;
- onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
+ ClusterNode node = ((DiscoveryEvent)evt).eventNode();
+
+ onNodeLeft(node.consistentId(), node.id());
}
/** {@inheritDoc} */
@@ -4698,7 +4712,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
GridNioRecoveryDescriptor recovery = null;
if (!usePairedConnections(node) && client instanceof
GridTcpNioCommunicationClient) {
- recovery = recoveryDescs.get(new
ConnectionKey(node.id(), client.connectionIndex(), -1));
+ recovery = recoveryDescs.get(new ConnectionKey(
+ node.id(), client.connectionIndex(), -1)
+ );
if (recovery != null && recovery.lastAcknowledged() !=
recovery.received()) {
RecoveryLastReceivedMessage msg = new
RecoveryLastReceivedMessage(recovery.received());
@@ -4724,7 +4740,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
implements Communicati
if (idleTime >= idleConnTimeout) {
if (recovery == null && usePairedConnections(node))
- recovery = outRecDescs.get(new
ConnectionKey(node.id(), client.connectionIndex(), -1));
+ recovery = outRecDescs.get(new ConnectionKey(
+ node.id(), client.connectionIndex(), -1)
+ );
if (recovery != null &&
recovery.nodeAlive(getSpiContext().node(nodeId)) &&
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 46fdb0b..9caff6a 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -142,14 +142,14 @@ public class TcpCommunicationConnectionCheckFuture
extends GridFutureAdapter<Bit
if (addrs.size() == 1) {
SingleAddressConnectFuture fut = new
SingleAddressConnectFuture(i);
- fut.init(addrs.iterator().next(), node.id());
+ fut.init(addrs.iterator().next(), node.consistentId(),
node.id());
futs[i] = fut;
}
else {
MultipleAddressesConnectFuture fut = new
MultipleAddressesConnectFuture(i);
- fut.init(addrs, node.id());
+ fut.init(addrs, node.consistentId(), node.id());
futs[i] = fut;
}
@@ -292,9 +292,10 @@ public class TcpCommunicationConnectionCheckFuture extends
GridFutureAdapter<Bit
/**
* @param addr Node address.
+ * @param consistentId Consistent if of the node.
* @param rmtNodeId Id of node to open connection check session with.
*/
- public void init(InetSocketAddress addr, UUID rmtNodeId) {
+ public void init(InetSocketAddress addr, Object consistentId, UUID
rmtNodeId) {
boolean connect;
try {
@@ -317,7 +318,10 @@ public class TcpCommunicationConnectionCheckFuture extends
GridFutureAdapter<Bit
sesMeta = new GridLeanMap<>(3);
// Set dummy key to identify connection-check outgoing
connection.
- sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, new
ConnectionKey(rmtNodeId, -1, -1, true));
+ ConnectionKey connKey = new ConnectionKey(rmtNodeId, -1, -1,
true);
+
+ sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, connKey);
+ sesMeta.put(TcpCommunicationSpi.CONSISTENT_ID_META,
consistentId);
sesMeta.put(SES_FUT_META, this);
nioSrvr.createSession(ch, sesMeta, true, new
IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
@@ -422,9 +426,10 @@ public class TcpCommunicationConnectionCheckFuture extends
GridFutureAdapter<Bit
/**
* @param addrs Node addresses.
+ * @param consistentId Consistent if of the node.
* @param rmtNodeId Id of node to open connection check session with.
*/
- void init(Collection<InetSocketAddress> addrs, UUID rmtNodeId) {
+ void init(Collection<InetSocketAddress> addrs, Object consistentId,
UUID rmtNodeId) {
SingleAddressConnectFuture[] futs = new
SingleAddressConnectFuture[addrs.size()];
for (int i = 0; i < addrs.size(); i++) {
@@ -442,7 +447,7 @@ public class TcpCommunicationConnectionCheckFuture extends
GridFutureAdapter<Bit
int idx = 0;
for (InetSocketAddress addr : addrs) {
- futs[idx++].init(addr, rmtNodeId);
+ futs[idx++].init(addr, consistentId, rmtNodeId);
if (resCnt == Integer.MAX_VALUE)
return;
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index cfde86b..a53b43b 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -349,7 +349,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest
extends GridSpiAbstrac
while (run.get() &&
!Thread.currentThread().isInterrupted()) {
U.sleep(interval * 3 / 2);
-
((TcpCommunicationSpi)spis.get(from.id())).onNodeLeft(to.id());
+
((TcpCommunicationSpi)spis.get(from.id())).onNodeLeft(to.consistentId(),
to.id());
}
}
catch (IgniteInterruptedCheckedException ignored) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
index 399b8a3..99840c8 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -49,8 +49,8 @@ import
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME;
-import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME;
-import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_NAME;
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
/**
* Test for TcpCommunicationSpi statistics.
@@ -141,17 +141,17 @@ public class TcpCommunicationStatisticsTest extends
GridCommonAbstractTest {
startGrids(2);
try {
- UUID node0Id = grid(0).localNode().id();
- UUID node1Id = grid(1).localNode().id();
+ Object node0consistentId = grid(0).localNode().consistentId();
+ Object node1consistentId = grid(1).localNode().consistentId();
String node0regName = MetricUtils.metricName(
COMMUNICATION_METRICS_GROUP_NAME,
- node0Id.toString()
+ node0consistentId.toString()
);
String node1regName = MetricUtils.metricName(
COMMUNICATION_METRICS_GROUP_NAME,
- node1Id.toString()
+ node1consistentId.toString()
);
// Send custom message from node0 to node1.
@@ -205,11 +205,11 @@ public class TcpCommunicationStatisticsTest extends
GridCommonAbstractTest {
MetricRegistry mreg0 =
grid(0).context().metric().registry(node1regName);
MetricRegistry mreg1 =
grid(1).context().metric().registry(node0regName);
- LongAdderMetric sentMetric =
mreg0.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME);
+ LongAdderMetric sentMetric =
mreg0.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
assertNotNull(sentMetric);
assertEquals(mbean0.getSentMessagesCount(),
sentMetric.value());
- LongAdderMetric rcvMetric =
mreg1.findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME);
+ LongAdderMetric rcvMetric =
mreg1.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
assertNotNull(rcvMetric);
assertEquals(mbean1.getReceivedMessagesCount(),
rcvMetric.value());
@@ -217,7 +217,7 @@ public class TcpCommunicationStatisticsTest extends
GridCommonAbstractTest {
mreg0 = grid(0).context().metric().registry(node1regName);
- sentMetric =
mreg0.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME);
+ sentMetric =
mreg0.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
assertNotNull(sentMetric); // Automatically generated by
MetricRegistryCreationListener.
assertEquals(0, sentMetric.value());
}