This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f088e3fed9d IGNITE-26179 Added metrics of TCP communication connection
pool (#12271)
f088e3fed9d is described below
commit f088e3fed9d404ee0b64cc1de9e6ba7eea6ef1db
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Oct 16 09:48:30 2025 +0300
IGNITE-26179 Added metrics of TCP communication connection pool (#12271)
---
.../internal/util/nio/GridCommunicationClient.java | 14 +
.../ignite/internal/util/nio/GridNioServer.java | 5 +-
.../ignite/internal/util/nio/GridNioSession.java | 7 +
.../internal/util/nio/GridNioSessionImpl.java | 9 +-
.../util/nio/GridSelectorNioSessionImpl.java | 8 +-
.../util/nio/GridTcpNioCommunicationClient.java | 16 +-
.../tcp/TcpCommunicationMetricsListener.java | 6 +-
.../spi/communication/tcp/TcpCommunicationSpi.java | 17 +-
.../tcp/internal/ConnectionClientPool.java | 516 ++++++++++++++-----
.../tcp/internal/RoundRobinConnectionPolicy.java | 22 +-
.../internal/util/nio/impl/MockNioSession.java | 5 +
.../CommunicationConnectionPoolMetricsTest.java | 556 +++++++++++++++++++++
...cpCommunicationSpiHalfOpenedConnectionTest.java | 14 +
.../IgniteSpiCommunicationSelfTestSuite.java | 3 +
.../zk/internal/ZookeeperDiscoverySpiTestBase.java | 10 +
15 files changed, 1069 insertions(+), 139 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index b4a9a954731..514dedf6d21 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -75,6 +75,20 @@ public interface GridCommunicationClient {
*/
public long getIdleTime();
+ /**
+ * Provides creation time of this client.
+ *
+ * @return Creation time of this client.
+ */
+ public long creationTime();
+
+ /**
+ * Provides number of pending messages.
+ *
+ * @return Number of pending messages.
+ */
+ public int messagesQueueSize();
+
/**
* @param data Data to send.
* @throws IgniteCheckedException If failed.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 4e43d28ea7e..aaa78cd1769 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -151,7 +151,8 @@ public class GridNioServer<T> {
public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME =
"outboundMessagesQueueSize";
/** */
- public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC =
"Number of messages waiting to be sent";
+ public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC
+ = "Total number of messages waiting to be sent over all connections";
/** */
public static final String RECEIVED_BYTES_METRIC_NAME = "receivedBytes";
@@ -2449,7 +2450,7 @@ public class GridNioServer<T> {
.append(", bytesRcvd0=").append(ses.bytesReceived0())
.append(", bytesSent=").append(ses.bytesSent())
.append(", bytesSent0=").append(ses.bytesSent0())
- .append(", opQueueSize=").append(ses.writeQueueSize());
+ .append(",
opQueueSize=").append(ses.messagesQueueSize());
if (!shortInfo) {
MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index 1b9ac6f874a..af0a3bba338 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -86,6 +86,13 @@ public interface GridNioSession {
*/
public long lastSendTime();
+ /**
+ * Returns number of pending messages.
+ *
+ * @return Number of pending messages.
+ */
+ public int messagesQueueSize();
+
/**
* Returns time when last send was scheduled on this session.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index ef8aadd909f..0211eb85dca 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -48,7 +48,7 @@ public class GridNioSessionImpl implements GridNioSession {
private final InetSocketAddress rmtAddr;
/** Session create timestamp. */
- private long createTime;
+ private final long createTime;
/** Session close timestamp. */
private final AtomicLong closeTime = new AtomicLong();
@@ -78,7 +78,7 @@ public class GridNioSessionImpl implements GridNioSession {
private volatile boolean readsPaused;
/** Filter chain that will handle write and close requests. */
- private GridNioFilterChain filterChain;
+ private final GridNioFilterChain filterChain;
/** Accepted flag. */
private final boolean accepted;
@@ -242,6 +242,11 @@ public class GridNioSessionImpl implements GridNioSession {
return sndSchedTime;
}
+ /** {@inheritDoc} */
+ @Override public int messagesQueueSize() {
+ return 0;
+ }
+
/** {@inheritDoc} */
@Override public <T> T meta(int key) {
assert key < meta.length;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 7fceaad96ea..8004bce31ae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -412,12 +412,8 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
return rmv;
}
- /**
- * Gets number of write requests in a queue that have not been processed
yet.
- *
- * @return Number of write requests.
- */
- int writeQueueSize() {
+ /** {@inheritDoc} */
+ @Override public int messagesQueueSize() {
return queue.sizex();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 056f4c20a32..4ff126fa4cf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -25,7 +25,6 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -101,10 +100,7 @@ public class GridTcpNioCommunicationClient extends
GridAbstractCommunicationClie
if (closed())
throw new IgniteCheckedException("Client was closed: " + this);
- IgniteInternalFuture<?> fut = ses.send(data);
-
- if (fut.isDone())
- fut.get();
+ ses.send(data);
}
/** {@inheritDoc} */
@@ -146,6 +142,16 @@ public class GridTcpNioCommunicationClient extends
GridAbstractCommunicationClie
now - ses.lastSendTime());
}
+ /** {@inheritDoc} */
+ @Override public long creationTime() {
+ return ses.createTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int messagesQueueSize() {
+ return ses.messagesQueueSize();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridTcpNioCommunicationClient.class, this,
super.toString());
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index 266fafa982b..2fb542ceb8b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -147,8 +147,8 @@ public class TcpCommunicationMetricsListener {
rcvdMsgsMetric = mreg.longAdderMetric(RECEIVED_MESSAGES_METRIC_NAME,
RECEIVED_MESSAGES_METRIC_DESC);
spiCtx.addMetricRegistryCreationListener(mreg -> {
- // Metrics for the specific nodes.
- if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME +
SEPARATOR))
+ // Metrics for the specific nodes or other communication metrics.
+ if (!TcpCommunicationSpi.isCommunicationMetrics(mreg.name()))
return;
((MetricRegistryImpl)mreg).longAdderMetric(
@@ -380,7 +380,7 @@ public class TcpCommunicationMetricsListener {
}
for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
- if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME +
SEPARATOR)) {
+ if (TcpCommunicationSpi.isCommunicationMetrics(mreg.name())) {
mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2beea5091d1..27e2d69dc23 100755
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -94,6 +94,7 @@ import org.jetbrains.annotations.TestOnly;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.SEPARATOR;
import static
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.NOOP;
import static
org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage.UNDEFINED_CONNECTION_INDEX;
@@ -374,6 +375,17 @@ public class TcpCommunicationSpi extends
TcpCommunicationConfigInitializer {
this.lsnr = lsnr;
}
+ /**
+ * @param metricName Metric name.
+ * @return {@code True} if the metric name is a pure TCP Communication
metric. {@code False} if is other metric or
+ * metric of other TCP Communication component.
+ */
+ public static boolean isCommunicationMetrics(String metricName) {
+ return metricName.startsWith(COMMUNICATION_METRICS_GROUP_NAME +
SEPARATOR)
+ &&
!metricName.startsWith(ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME +
SEPARATOR)
+ &&
!metricName.equals(ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME);
+ }
+
/** {@inheritDoc} */
@Override public int getSentMessagesCount() {
// Listener could be not initialized yet, but discovery thread could
try to aggregate metrics.
@@ -597,7 +609,7 @@ public class TcpCommunicationSpi extends
TcpCommunicationConfigInitializer {
}
if (cfg.connectionsPerNode() > 1)
- connPlc = new RoundRobinConnectionPolicy(cfg);
+ connPlc = new RoundRobinConnectionPolicy(cfg.connectionsPerNode());
else
connPlc = new FirstConnectionPolicy();
@@ -684,7 +696,8 @@ public class TcpCommunicationSpi extends
TcpCommunicationConfigInitializer {
this,
stateProvider,
nioSrvWrapper,
- getName()
+ getName(),
+ ((IgniteEx)ignite).context().metric()
));
this.srvLsnr.setClientPool(clientPool);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
index 42c21bbc5b5..9ae4b8d3b43 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
@@ -23,9 +23,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
@@ -36,6 +40,8 @@ import
org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
@@ -54,6 +60,7 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Objects.nonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DISABLED_CLIENT_PORT;
import static
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.nodeAddresses;
import static
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.usePairedConnections;
@@ -65,9 +72,49 @@ public class ConnectionClientPool {
/** Time threshold to log too long connection establish. */
private static final int CONNECTION_ESTABLISH_THRESHOLD_MS = 100;
+ /** */
+ public static final long METRICS_UPDATE_THRESHOLD = U.millisToNanos(200);
+
+ /** */
+ public static final String SHARED_METRICS_REGISTRY_NAME =
metricName(TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME,
+ "connectionPool");
+
+ /** */
+ public static final String METRIC_NAME_POOL_SIZE = "maxConnectionsCnt";
+
+ /** */
+ public static final String METRIC_NAME_PAIRED_CONNS = "isPaired";
+
+ /** */
+ public static final String METRIC_NAME_ASYNC_CONNS = "isAsync";
+
+ /** It is handy for a user to see consistent id of a problematic node. */
+ public static final String METRIC_NAME_CONSIST_ID = "consistentId";
+
+ /** */
+ public static final String METRIC_NAME_CUR_CNT = "currentConnectionsCnt";
+
+ /** */
+ public static final String METRIC_NAME_MSG_QUEUE_SIZE =
"outboundMessagesQueueSize";
+
+ /** */
+ public static final String METRIC_NAME_REMOVED_CNT =
"removedConnectionsCnt";
+
+ /** */
+ public static final String METRIC_NAME_MAX_NET_IDLE_TIME =
"maxNetworkIdleTime";
+
+ /** */
+ public static final String METRIC_NAME_AVG_LIFE_TIME =
"avgConnectionLifetime";
+
+ /** */
+ public static final String METRIC_NAME_ACQUIRING_THREADS_CNT =
"acquiringThreadsCnt";
+
/** Clients. */
private final ConcurrentMap<UUID, GridCommunicationClient[]> clients =
GridConcurrentFactory.newMap();
+ /** Metrics for each remote node. */
+ private final Map<UUID, NodeMetrics> metrics;
+
/** Config. */
private final TcpCommunicationConfiguration cfg;
@@ -116,6 +163,12 @@ public class ConnectionClientPool {
private boolean forcibleNodeKillEnabled = IgniteSystemProperties
.getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+ /** */
+ private final GridMetricManager metricsMgr;
+
+ /** */
+ private volatile AtomicBoolean asyncMetric;
+
/**
* @param cfg Config.
* @param attrs Attributes.
@@ -129,6 +182,7 @@ public class ConnectionClientPool {
* @param clusterStateProvider Cluster state provider.
* @param nioSrvWrapper Nio server wrapper.
* @param igniteInstanceName Ignite instance name.
+ * @param metricsMgr Metrics manager. If {@code null}, no metrics are
created.
*/
public ConnectionClientPool(
TcpCommunicationConfiguration cfg,
@@ -142,23 +196,46 @@ public class ConnectionClientPool {
TcpCommunicationSpi tcpCommSpi,
ClusterStateProvider clusterStateProvider,
GridNioServerWrapper nioSrvWrapper,
- String igniteInstanceName
+ String igniteInstanceName,
+ GridMetricManager metricsMgr
) {
this.cfg = cfg;
this.attrs = attrs;
this.log = log;
this.metricsLsnr = metricsLsnr;
this.locNodeSupplier = locNodeSupplier;
- this.nodeGetter = nodeGetter;
this.msgFormatterSupplier = msgFormatterSupplier;
this.registry = registry;
this.tcpCommSpi = tcpCommSpi;
this.clusterStateProvider = clusterStateProvider;
this.nioSrvWrapper = nioSrvWrapper;
+ this.metricsMgr = metricsMgr;
+
+ this.nodeGetter = new Function<>() {
+ @Override public ClusterNode apply(UUID nodeId) {
+ ClusterNode node = nodeGetter.apply(nodeId);
+
+ if (node == null)
+ removeNodeMetrics(nodeId);
+
+ return node;
+ }
+ };
this.handshakeTimeoutExecutorService =
newSingleThreadScheduledExecutor(
new IgniteThreadFactory(igniteInstanceName,
"handshake-timeout-client")
);
+
+ if (metricsMgr != null) {
+ MetricRegistryImpl mreg =
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
+
+ mreg.register(METRIC_NAME_POOL_SIZE, () ->
cfg.connectionsPerNode(), "Maximal connections number to a remote node.");
+ mreg.register(METRIC_NAME_PAIRED_CONNS, () ->
cfg.usePairedConnections(), "Paired connections flag.");
+
+ metrics = new ConcurrentHashMap<>(64, 0.75f, Math.max(16,
Runtime.getRuntime().availableProcessors()));
+ }
+ else
+ metrics = null;
}
/**
@@ -167,6 +244,10 @@ public class ConnectionClientPool {
public void stop() {
this.stopping = true;
+ metricsMgr.remove(SHARED_METRICS_REGISTRY_NAME);
+
+ clients.keySet().forEach(this::removeNodeMetrics);
+
for (GridFutureAdapter<GridCommunicationClient> fut :
clientFuts.values()) {
if (fut instanceof ConnectionRequestFuture) {
// There's no way it would be done by itself at this point.
@@ -186,155 +267,188 @@ public class ConnectionClientPool {
* @throws IgniteCheckedException Thrown if any exception occurs.
*/
public GridCommunicationClient reserveClient(ClusterNode node, int
connIdx) throws IgniteCheckedException {
- assert node != null;
- assert (connIdx >= 0 && connIdx < cfg.connectionsPerNode())
- || !(cfg.usePairedConnections() && usePairedConnections(node,
attrs.pairedConnection()))
- || GridNioServerWrapper.isChannelConnIdx(connIdx) : "Wrong
communication connection index: " + connIdx;
-
- if (locNodeSupplier.get().isClient()) {
- if (node.isClient()) {
- if (DISABLED_CLIENT_PORT.equals(node.attribute(attrs.port())))
- throw new IgniteSpiException("Cannot send message to the
client node with no server socket opened.");
+ NodeMetrics nodeMetrics = metrics.get(node.id());
+
+ if (nodeMetrics != null)
+ nodeMetrics.acquiringThreadsCnt.incrementAndGet();
+
+ try {
+ assert node != null;
+ assert (connIdx >= 0 && connIdx < cfg.connectionsPerNode())
+ || !(cfg.usePairedConnections() && usePairedConnections(node,
attrs.pairedConnection()))
+ || GridNioServerWrapper.isChannelConnIdx(connIdx) : "Wrong
communication connection index: " + connIdx;
+
+ if (locNodeSupplier.get().isClient()) {
+ if (node.isClient()) {
+ if
(DISABLED_CLIENT_PORT.equals(node.attribute(attrs.port())))
+ throw new IgniteSpiException("Cannot send message to
the client node with no server socket opened.");
+ }
}
- }
- UUID nodeId = node.id();
+ UUID nodeId = node.id();
- if (log.isDebugEnabled())
- log.debug("The node client is going to reserve a connection
[nodeId=" + node.id() + ", connIdx=" + connIdx + "]");
+ if (log.isDebugEnabled())
+ log.debug("The node client is going to reserve a connection
[nodeId=" + node.id() + ", connIdx=" + connIdx + "]");
- while (true) {
- GridCommunicationClient[] curClients = clients.get(nodeId);
+ while (true) {
+ GridCommunicationClient[] curClients = clients.get(nodeId);
+
+ GridCommunicationClient client = curClients != null && connIdx
< curClients.length ?
+ curClients[connIdx] : null;
- GridCommunicationClient client = curClients != null && connIdx <
curClients.length ?
- curClients[connIdx] : null;
+ if (client == null) {
+ if (stopping)
+ throw new IgniteSpiException("Node is stopping.");
- if (client == null) {
- if (stopping)
- throw new IgniteSpiException("Node is stopping.");
+ // Do not allow concurrent connects.
+ GridFutureAdapter<GridCommunicationClient> fut = new
ConnectFuture();
- // Do not allow concurrent connects.
- GridFutureAdapter<GridCommunicationClient> fut = new
ConnectFuture();
+ ConnectionKey connKey = new ConnectionKey(nodeId, connIdx,
-1);
- ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
+ GridFutureAdapter<GridCommunicationClient> oldFut =
clientFuts.putIfAbsent(connKey, fut);
- GridFutureAdapter<GridCommunicationClient> oldFut =
clientFuts.putIfAbsent(connKey, fut);
+ if (oldFut == null) {
+ try {
+ GridCommunicationClient[] curClients0 =
clients.get(nodeId);
- if (oldFut == null) {
- try {
- GridCommunicationClient[] curClients0 =
clients.get(nodeId);
+ GridCommunicationClient client0 = curClients0 !=
null && connIdx < curClients0.length ?
+ curClients0[connIdx] : null;
- GridCommunicationClient client0 = curClients0 != null
&& connIdx < curClients0.length ?
- curClients0[connIdx] : null;
+ if (client0 == null) {
+ client0 = createCommunicationClient(node,
connIdx);
- if (client0 == null) {
- client0 = createCommunicationClient(node, connIdx);
+ if (client0 != null) {
+ addNodeClient(node, connIdx, client0);
- if (client0 != null) {
- addNodeClient(node, connIdx, client0);
+ if (client0 instanceof
GridTcpNioCommunicationClient) {
+ GridTcpNioCommunicationClient
tcpClient = ((GridTcpNioCommunicationClient)client0);
- if (client0 instanceof
GridTcpNioCommunicationClient) {
- GridTcpNioCommunicationClient tcpClient =
((GridTcpNioCommunicationClient)client0);
+ if (tcpClient.session().closeTime() >
0 && removeNodeClient(nodeId, client0)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Session was closed
after client creation, will retry " +
+ "[node=" + node + ",
client=" + client0 + ']');
+ }
- if (tcpClient.session().closeTime() > 0 &&
removeNodeClient(nodeId, client0)) {
- if (log.isDebugEnabled()) {
- log.debug("Session was closed
after client creation, will retry " +
- "[node=" + node + ", client="
+ client0 + ']');
+ client0 = null;
}
-
- client0 = null;
}
}
- }
- else {
- U.sleep(200);
+ else {
+ U.sleep(200);
- if (nodeGetter.apply(node.id()) == null)
- throw new
ClusterTopologyCheckedException("Failed to send message " +
- "(node left topology): " + node);
+ if (nodeGetter.apply(node.id()) == null)
+ throw new
ClusterTopologyCheckedException("Failed to send message " +
+ "(node left topology): " + node);
+ }
}
- }
- fut.onDone(client0);
- }
- catch (NodeUnreachableException e) {
- log.warning(e.getMessage());
+ fut.onDone(client0);
+ }
+ catch (NodeUnreachableException e) {
+ log.warning(e.getMessage());
- fut = handleUnreachableNodeException(node, connIdx,
fut, e);
- }
- catch (Throwable e) {
- if (e instanceof NodeUnreachableException)
- throw e;
+ fut = handleUnreachableNodeException(node,
connIdx, fut, e);
+ }
+ catch (Throwable e) {
+ if (e instanceof NodeUnreachableException)
+ throw e;
- fut.onDone(e);
+ fut.onDone(e);
- if (e instanceof IgniteTooManyOpenFilesException)
- throw e;
+ if (e instanceof IgniteTooManyOpenFilesException)
+ throw e;
- if (e instanceof Error)
- throw (Error)e;
- }
- finally {
- clientFuts.remove(connKey, fut);
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ finally {
+ clientFuts.remove(connKey, fut);
+ }
}
- }
- else
- fut = oldFut;
+ else
+ fut = oldFut;
- long clientReserveWaitTimeout = registry != null ?
registry.getSystemWorkerBlockedTimeout() / 3
- : cfg.connectionTimeout() / 3;
+ long clientReserveWaitTimeout = registry != null ?
registry.getSystemWorkerBlockedTimeout() / 3
+ : cfg.connectionTimeout() / 3;
- long currTimeout = System.currentTimeMillis();
+ long currTimeout = System.currentTimeMillis();
- // This cycle will eventually quit when future is completed by
concurrent thread reserving client.
- while (true) {
- try {
- client = fut.get(clientReserveWaitTimeout,
TimeUnit.MILLISECONDS);
+ // This cycle will eventually quit when future is
completed by concurrent thread reserving client.
+ while (true) {
+ try {
+ client = fut.get(clientReserveWaitTimeout,
TimeUnit.MILLISECONDS);
- break;
- }
- catch (IgniteFutureTimeoutCheckedException ignored) {
- currTimeout += clientReserveWaitTimeout;
-
- if (log.isDebugEnabled()) {
- log.debug(
- "Still waiting for reestablishing connection
to node " +
- "[nodeId=" + node.id() + ", waitingTime="
+ currTimeout + "ms]"
- );
+ break;
}
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ currTimeout += clientReserveWaitTimeout;
+
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Still waiting for reestablishing
connection to node " +
+ "[nodeId=" + node.id() + ",
waitingTime=" + currTimeout + "ms]"
+ );
+ }
- if (registry != null) {
- GridWorker wrkr =
registry.worker(Thread.currentThread().getName());
+ if (registry != null) {
+ GridWorker wrkr =
registry.worker(Thread.currentThread().getName());
- if (wrkr != null)
- wrkr.updateHeartbeat();
+ if (wrkr != null)
+ wrkr.updateHeartbeat();
+ }
}
}
- }
- if (client == null) {
- if (clusterStateProvider.isLocalNodeDisconnected())
- throw new IgniteCheckedException("Unable to create TCP
client due to local node disconnecting.");
- else
- continue;
+ if (client == null) {
+ if (clusterStateProvider.isLocalNodeDisconnected())
+ throw new IgniteCheckedException("Unable to create
TCP client due to local node disconnecting.");
+ else
+ continue;
+ }
+
+ if (nodeGetter.apply(nodeId) == null) {
+ if (removeNodeClient(nodeId, client))
+ client.forceClose();
+
+ throw new IgniteSpiException("Destination node is not
in topology: " + node.id());
+ }
}
- if (nodeGetter.apply(nodeId) == null) {
- if (removeNodeClient(nodeId, client))
- client.forceClose();
+ assert connIdx == client.connectionIndex() : client;
- throw new IgniteSpiException("Destination node is not in
topology: " + node.id());
+ if (client.reserve()) {
+ updateClientAcquiredMetric(client);
+
+ return client;
}
+ else
+ // Client has just been closed by idle worker. Help it and
try again.
+ removeNodeClient(nodeId, client);
}
+ }
+ finally {
+ if (nodeMetrics != null)
+ nodeMetrics.acquiringThreadsCnt.decrementAndGet();
+ }
+ }
- assert connIdx == client.connectionIndex() : client;
+ /** */
+ private void updateClientAcquiredMetric(GridCommunicationClient client) {
+ if (asyncMetric == null) {
+ synchronized (metrics) {
+ if (asyncMetric == null) {
+ MetricRegistryImpl mreg =
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
- if (client.reserve())
- return client;
- else
- // Client has just been closed by idle worker. Help it and try
again.
- removeNodeClient(nodeId, client);
+ // We assume that all the clients have the same async flag.
+ asyncMetric = new AtomicBoolean(client.async());
+
+ mreg.register(METRIC_NAME_ASYNC_CONNS, () ->
asyncMetric.get(), "Asynchronous flag. If TRUE, " +
+ "connections put data in a queue (with some
preprocessing) instead of immediate sending.");
+ }
+ }
}
+ else
+ assert client.async() == asyncMetric.get();
}
/**
@@ -514,7 +628,19 @@ public class ConnectionClientPool {
newClients = new
GridCommunicationClient[cfg.connectionsPerNode()];
newClients[connIdx] = addClient;
- if (clients.putIfAbsent(node.id(), newClients) == null)
+ curClients = clients.compute(node.id(), (nodeId0, clients0) ->
{
+ if (clients0 == null) {
+ // Syncs metrics creation on this map.
+ if (metricsMgr != null)
+ createNodeMetrics(node);
+
+ return newClients;
+ }
+
+ return clients0;
+ });
+
+ if (curClients != null)
break;
}
else {
@@ -530,15 +656,108 @@ public class ConnectionClientPool {
}
}
+ /** */
+ private void createNodeMetrics(ClusterNode node) {
+ MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.id()));
+
+ assert !mreg.iterator().hasNext() : "Node connection pools metrics
aren't empty.";
+
+ mreg.register(METRIC_NAME_CONSIST_ID, () ->
node.consistentId().toString(), String.class,
+ "Consistent id of the remote node as string.");
+
+ mreg.register(METRIC_NAME_CUR_CNT, () ->
updatedNodeMetrics(node.id()).connsCnt,
+ "Number of current connections to the remote node.");
+
+ mreg.register(METRIC_NAME_MSG_QUEUE_SIZE, () ->
updatedNodeMetrics(node.id()).msgsQueueSize,
+ "Overal number of pending messages to the remote node.");
+
+ mreg.register(METRIC_NAME_MAX_NET_IDLE_TIME, () ->
updatedNodeMetrics(node.id()).maxIdleTime,
+ "Maximal idle time of physical sending or receiving data in
milliseconds.");
+
+ mreg.register(METRIC_NAME_AVG_LIFE_TIME, () ->
updatedNodeMetrics(node.id()).avgLifetime,
+ "Average connection lifetime in milliseconds.");
+
+ mreg.register(METRIC_NAME_REMOVED_CNT, () ->
updatedNodeMetrics(node.id()).removedConnectionsCnt.get(),
+ "Total number of removed connections.");
+
+ mreg.register(METRIC_NAME_ACQUIRING_THREADS_CNT, () ->
updatedNodeMetrics(node.id()).acquiringThreadsCnt.get(),
+ "Number of threads currently acquiring a connection.");
+ }
+
+ /** */
+ private NodeMetrics updatedNodeMetrics(UUID nodeId) {
+ long nowNanos = System.nanoTime();
+
+ NodeMetrics res = metrics.get(nodeId);
+
+ if (res == null || (nowNanos - res.updateTs > METRICS_UPDATE_THRESHOLD
&& res.canUpdate())) {
+ GridCommunicationClient[] nodeClients = clients.get(nodeId);
+
+ // Node might already leave the cluster.
+ if (nodeClients != null) {
+ long nowMillis = U.currentTimeMillis();
+
+ res = new NodeMetrics(res);
+
+ long avgLifetime = 0;
+ long maxIdleTime = 0;
+
+ for (GridCommunicationClient nodeClient : nodeClients) {
+ if (nodeClient == null)
+ continue;
+
+ ++res.connsCnt;
+
+ avgLifetime += nowMillis - nodeClient.creationTime();
+
+ long nodeIdleTime = nodeClient.getIdleTime();
+
+ if (nodeIdleTime > maxIdleTime)
+ maxIdleTime = nodeIdleTime;
+
+ res.msgsQueueSize += nodeClient.messagesQueueSize();
+ }
+
+ if (res.connsCnt != 0)
+ res.avgLifetime = avgLifetime / res.connsCnt;
+
+ res.maxIdleTime = maxIdleTime;
+
+ NodeMetrics res0 = res;
+
+ res.updateTs = System.nanoTime();
+
+ // Node might already leave the cluster. Syncs metrics removal
on the clients map.
+ clients.compute(nodeId, (nodeId0, clients) -> {
+ if (clients == null)
+ removeNodeMetrics(nodeId);
+ else
+ metrics.put(nodeId, res0);
+
+ return clients;
+ });
+ }
+ else {
+ removeNodeMetrics(nodeId);
+
+ res = null;
+ }
+ }
+
+ return res == null ? NodeMetrics.EMPTY : res;
+ }
+
+ /** */
+ public static String nodeMetricsRegName(UUID nodeId) {
+ return metricName(SHARED_METRICS_REGISTRY_NAME, nodeId.toString());
+ }
+
/**
* @param nodeId Node ID.
* @param rmvClient Client to remove.
* @return {@code True} if client was removed.
*/
public boolean removeNodeClient(UUID nodeId, GridCommunicationClient
rmvClient) {
- if (log.isDebugEnabled())
- log.debug("The client was removed [nodeId=" + nodeId + ",
client=" + rmvClient.toString() + "].");
-
for (; ; ) {
GridCommunicationClient[] curClients = clients.get(nodeId);
@@ -551,8 +770,17 @@ public class ConnectionClientPool {
newClients[rmvClient.connectionIndex()] = null;
- if (clients.replace(nodeId, curClients, newClients))
+ if (clients.replace(nodeId, curClients, newClients)) {
+ NodeMetrics nodeMetrics = metrics.get(nodeId);
+
+ if (nodeMetrics != null && nodeMetrics != NodeMetrics.EMPTY)
+ nodeMetrics.removedConnectionsCnt.addAndGet(1);
+
+ if (log.isDebugEnabled())
+ log.debug("The client was removed [nodeId=" + nodeId + ",
client=" + rmvClient.toString() + "].");
+
return true;
+ }
}
}
@@ -567,6 +795,8 @@ public class ConnectionClientPool {
if (log.isDebugEnabled())
log.debug("The node client connections were closed [nodeId=" +
nodeId + "]");
+ removeNodeMetrics(nodeId);
+
GridCommunicationClient[] clients = this.clients.remove(nodeId);
if (nonNull(clients)) {
for (GridCommunicationClient client : clients)
@@ -589,6 +819,8 @@ public class ConnectionClientPool {
public void onNodeLeft(UUID nodeId) {
GridCommunicationClient[] clients0 = clients.remove(nodeId);
+ removeNodeMetrics(nodeId);
+
if (clients0 != null) {
for (GridCommunicationClient client : clients0) {
if (client != null) {
@@ -603,6 +835,13 @@ public class ConnectionClientPool {
}
}
+ /** */
+ private void removeNodeMetrics(UUID nodeId) {
+ metricsMgr.remove(nodeMetricsRegName(nodeId));
+
+ metrics.remove(nodeId);
+ }
+
/**
* @param id Id.
*/
@@ -669,4 +908,57 @@ public class ConnectionClientPool {
public void metricsListener(@Nullable TcpCommunicationMetricsListener
metricsLsnr) {
this.metricsLsnr = metricsLsnr;
}
+
+ /** */
+ private static final class NodeMetrics {
+ /** Avoids NPEs on metrics getting because nodes leave cluster
asynchronously. */
+ private static final NodeMetrics EMPTY = new NodeMetrics(null);
+
+ /** */
+ private volatile long updateTs = System.nanoTime();
+
+ /** */
+ private volatile boolean updatingFlag;
+
+ /** */
+ private int connsCnt;
+
+ /** */
+ private int msgsQueueSize;
+
+ /** */
+ private long maxIdleTime;
+
+ /** */
+ private long avgLifetime;
+
+ /** */
+ private final AtomicLong removedConnectionsCnt;
+
+ /** */
+ private final AtomicInteger acquiringThreadsCnt;
+
+ /** */
+ private NodeMetrics(@Nullable NodeMetrics prev) {
+ this.removedConnectionsCnt = prev == null ? new AtomicLong() :
prev.removedConnectionsCnt;
+ this.acquiringThreadsCnt = prev == null ? new AtomicInteger() :
prev.acquiringThreadsCnt;
+ this.avgLifetime = prev == null ? 0 : prev.avgLifetime;
+ this.maxIdleTime = prev == null ? 0 : prev.maxIdleTime;
+ }
+
+ /** */
+ private boolean canUpdate() {
+ if (updatingFlag)
+ return false;
+
+ synchronized (this) {
+ if (updatingFlag)
+ return false;
+
+ updatingFlag = true;
+
+ return true;
+ }
+ }
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/RoundRobinConnectionPolicy.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/RoundRobinConnectionPolicy.java
index afdad6661a1..e211df235c6 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/RoundRobinConnectionPolicy.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/RoundRobinConnectionPolicy.java
@@ -17,24 +17,32 @@
package org.apache.ignite.spi.communication.tcp.internal;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Round robin connection policy.
*/
public class RoundRobinConnectionPolicy implements ConnectionPolicy {
- /** Config. */
- private final TcpCommunicationConfiguration cfg;
+ /** Maximal connections number. */
+ private final int cnt;
+
+ /** */
+ private final AtomicInteger counter = new AtomicInteger();
/**
- * @param cfg Config.
+ * @param cnt Maximal connections number.
*/
- public RoundRobinConnectionPolicy(TcpCommunicationConfiguration cfg) {
- this.cfg = cfg;
+ public RoundRobinConnectionPolicy(int cnt) {
+ this.cnt = cnt;
}
/** {@inheritDoc} */
@Override public int connectionIndex() {
- return (int)(U.safeAbs(Thread.currentThread().getId()) %
cfg.connectionsPerNode());
+ int idx = counter.getAndIncrement() % cnt;
+
+ if (idx < 0)
+ idx += cnt;
+
+ return idx;
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/MockNioSession.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/MockNioSession.java
index 6a04aeda125..b1a26b41b4a 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/MockNioSession.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/MockNioSession.java
@@ -104,6 +104,11 @@ public class MockNioSession extends
GridMetadataAwareAdapter implements GridNioS
return 0;
}
+ /** {@inheritDoc} */
+ @Override public int messagesQueueSize() {
+ return 0;
+ }
+
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> close() {
return new GridFinishedFuture<>(true);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
new file mode 100644
index 00000000000..a4eb483d5e9
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.BooleanGauge;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.GridTestMessage;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool;
+import org.apache.ignite.spi.metric.IntMetric;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.Metric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_ACQUIRING_THREADS_CNT;
+import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_AVG_LIFE_TIME;
+import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_CONSIST_ID;
+import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_CUR_CNT;
+import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_MAX_NET_IDLE_TIME;
+import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_MSG_QUEUE_SIZE;
+import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_REMOVED_CNT;
+import static
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.nodeMetricsRegName;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** Tests metrics of {@link ConnectionClientPool}. */
+@RunWith(Parameterized.class)
+public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTest {
+ /** */
+ private static final int MIN_LOAD_THREADS = 2;
+
+ /** */
+ private volatile long maxConnIdleTimeout =
TcpCommunicationSpi.DFLT_IDLE_CONN_TIMEOUT;
+
+ /** */
+ private volatile int createClientDelay;
+
+ /** */
+ @Parameterized.Parameter(0)
+ public int connsPerNode;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public boolean pairedConns;
+
+ /** */
+ @Parameterized.Parameter(2)
+ public int msgQueueLimit;
+
+ /** */
+ @Parameterized.Parameter(3)
+ public boolean clientLdr;
+
+ /** */
+ @Parameterized.Parameters(name = "connsPerNode={0}, pairedConns={1},
msgQueueLimit={2}, clientLdr={3}")
+ public static Collection<Object[]> params() {
+ return GridTestUtils.cartesianProduct(
+ F.asList(1, 4), // Connections per node.
+ F.asList(false, true), // Paired connections.
+ F.asList(0, 100), // Message queue limit.
+ F.asList(true, false) // Use client as a load.
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpCommunicationSpi communicationSpi = new TcpCommunicationSpi() {
+ @Override protected GridCommunicationClient
createTcpClient(ClusterNode node,
+ int connIdx) throws IgniteCheckedException {
+ if (createClientDelay > 0)
+ U.sleep(createClientDelay);
+
+ return super.createTcpClient(node, connIdx);
+ }
+ };
+
+ communicationSpi.setConnectionsPerNode(connsPerNode)
+ .setUsePairedConnections(pairedConns)
+ .setIdleConnectionTimeout(maxConnIdleTimeout)
+ .setMessageQueueLimit(msgQueueLimit);
+
+ cfg.setCommunicationSpi(communicationSpi);
+
+ cfg.setPluginProviders(new TestCommunicationMessagePluginProvider());
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testRemovedConnectionMetrics() throws Exception {
+ maxConnIdleTimeout = 500;
+
+ Ignite srvr = startGridsMultiThreaded(2);
+ Ignite cli = startClientGrid(G.allGrids().size());
+
+ Ignite ldr = clientLdr ? cli : srvr;
+
+ GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+ AtomicBoolean runFlag = new AtomicBoolean(true);
+ TestMessage msg = new TestMessage();
+
+ IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> msg,
null);
+
+ // Wait until all connections are created and used.
+ for (Ignite node : G.allGrids()) {
+ if (node == ldr)
+ continue;
+
+ MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+ assertTrue(waitForCondition(
+ () -> {
+ IntMetric m = mreg.findMetric(METRIC_NAME_CUR_CNT);
+
+ return m != null && m.value() == connsPerNode;
+ },
+ getTestTimeout())
+ );
+ }
+
+ // Ensure that the loading is ok.
+ assertTrue(runFlag.get());
+
+ runFlag.set(false);
+ loadFut.get(getTestTimeout());
+
+ // Wait until connections are closed.
+ for (Ignite node : G.allGrids()) {
+ if (node == ldr)
+ continue;
+
+ MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+ assertTrue(waitForCondition(() ->
mreg.<LongMetric>findMetric(METRIC_NAME_REMOVED_CNT).value() >= connsPerNode,
+ getTestTimeout()));
+ }
+
+ dumpMetrics(ldr);
+ }
+
+ /** */
+ @Test
+ public void testIdleRemovedConnectionMetricsUnderLazyLoad() throws
Exception {
+ maxConnIdleTimeout = 10;
+
+ Ignite srvr = startGridsMultiThreaded(2);
+ Ignite cli = startClientGrid(G.allGrids().size());
+
+ Ignite ldr = clientLdr ? cli : srvr;
+
+ GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+ AtomicBoolean runFlag = new AtomicBoolean(true);
+ Message msg = new TestMessage();
+
+ IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> msg,
null, maxConnIdleTimeout, maxConnIdleTimeout * 4);
+
+ // Wait until all connections are created and used.
+ for (Ignite node : G.allGrids()) {
+ if (node == ldr)
+ continue;
+
+ MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+ assertTrue(waitForCondition(
+ () -> {
+ LongMetric m = mreg.findMetric(METRIC_NAME_REMOVED_CNT);
+
+ return m != null && m.value() >= connsPerNode * 4L;
+ },
+ getTestTimeout())
+ );
+ }
+
+ // Ensure that the loading is ok.
+ assertTrue(runFlag.get());
+
+ runFlag.set(false);
+ loadFut.get(getTestTimeout());
+
+ dumpMetrics(ldr);
+ }
+
+ /** */
+ @Test
+ public void testMetricsBasics() throws Exception {
+ int preloadCnt = 300;
+ int srvrCnt = 3;
+
+ Ignite srvr = startGridsMultiThreaded(srvrCnt);
+ Ignite cli = startClientGrid(G.allGrids().size());
+
+ Ignite ldr = clientLdr ? cli : srvr;
+
+ GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+ MetricRegistryImpl mreg0 =
metricsMgr.registry(ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME);
+
+ assertEquals(connsPerNode,
mreg0.<IntMetric>findMetric(ConnectionClientPool.METRIC_NAME_POOL_SIZE).value());
+ assertEquals(pairedConns,
mreg0.<BooleanGauge>findMetric(ConnectionClientPool.METRIC_NAME_PAIRED_CONNS).value());
+
+ AtomicBoolean runFlag = new AtomicBoolean(true);
+ AtomicLong loadCnt = new AtomicLong(preloadCnt);
+ TestMessage msg = new TestMessage();
+
+ long loadMillis0 = System.currentTimeMillis();
+
+ IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> msg,
loadCnt);
+
+ assertTrue(waitForCondition(() -> loadCnt.get() <= 0 ||
!runFlag.get(), getTestTimeout(), 25));
+
+ long loadMillis1 = System.currentTimeMillis() - loadMillis0;
+
+ // Ensure that preloaded without a failure.
+ assertTrue(runFlag.get());
+
+ long checkPeriod =
U.nanosToMillis(ConnectionClientPool.METRICS_UPDATE_THRESHOLD / 3);
+
+ // Check metrics.
+ for (Ignite node : G.allGrids()) {
+ if (node == ldr)
+ continue;
+
+ UUID nodeId = node.cluster().localNode().id();
+
+ MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(nodeId));
+
+ // We assume that entire pool was used at least once.
+ assertTrue(waitForCondition(() -> connsPerNode ==
mreg.<IntMetric>findMetric(METRIC_NAME_CUR_CNT).value(),
+ getTestTimeout(), checkPeriod));
+
+ // Connections should not be idle under a heavy load.
+ assertTrue(waitForCondition(() ->
mreg.<LongMetric>findMetric(METRIC_NAME_MAX_NET_IDLE_TIME).value() < 50,
+ getTestTimeout(), checkPeriod));
+
+ assertTrue(waitForCondition(() ->
mreg.<LongMetric>findMetric(METRIC_NAME_AVG_LIFE_TIME).value() > loadMillis1,
+ getTestTimeout(), checkPeriod));
+
+ // Default connection idle and write timeouts are large enough.
Connections should not be failed/deleted.
+ assertEquals(0,
mreg.<LongMetric>findMetric(METRIC_NAME_REMOVED_CNT).value());
+
+ assertEquals(node.cluster().localNode().consistentId().toString(),
mreg.findMetric(METRIC_NAME_CONSIST_ID).getAsString());
+ }
+
+ // Current connection implementations are async.
+ assertEquals(true,
mreg0.<BooleanGauge>findMetric(ConnectionClientPool.METRIC_NAME_ASYNC_CONNS).value());
+
+ dumpMetrics(ldr);
+
+ // Check node metrics are cleared if a node stops.
+ for (Ignite node : G.allGrids()) {
+ if (node == ldr)
+ continue;
+
+ // Keep last server node.
+ if (!node.cluster().localNode().isClient() && --srvrCnt == 0)
+ break;
+
+ UUID nodeId = node.cluster().localNode().id();
+
+ assertTrue(G.stop(node.name(), true));
+
+ assertTrue(waitForCondition(() -> {
+ MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(nodeId));
+
+ return mreg == null || !mreg.iterator().hasNext();
+ }, getTestTimeout()));
+ }
+
+ runFlag.set(false);
+ loadFut.get(getTestTimeout());
+
+ // Ensure that all the possible nodes are stopped.
+ assertTrue(waitForCondition(() -> ldr.cluster().nodes().size() ==
(clientLdr ? 2 : 1), getTestTimeout()));
+ }
+
+ /** Simulates delay/concurrency of connections acquire. */
+ @Test
+ public void testAcquiringThreadsCntMetric() throws Exception {
+ // Forces quick connection removing and recreating.
+ maxConnIdleTimeout = 1;
+ createClientDelay = 50;
+
+ Ignite srvr = startGridsMultiThreaded(2);
+ Ignite cli = startClientGrid(G.allGrids().size());
+
+ Ignite ldr = clientLdr ? cli : srvr;
+
+ GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+ AtomicBoolean runFlag = new AtomicBoolean(true);
+
+ IgniteInternalFuture<?> monFut = GridTestUtils.runAsync(() -> {
+ while (runFlag.get()) {
+ for (Ignite node : G.allGrids()) {
+ if (node == ldr)
+ continue;
+
+ MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+ IntMetric m =
mreg.findMetric(METRIC_NAME_ACQUIRING_THREADS_CNT);
+
+ if (m != null && m.value() >= MIN_LOAD_THREADS)
+ runFlag.set(false);
+ }
+
+ U.sleep(1);
+ }
+ });
+
+ IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> new
TestMessage((int)maxConnIdleTimeout * 3), null);
+
+ monFut.get(getTestTimeout());
+
+ createClientDelay = 0;
+
+ loadFut.get(getTestTimeout());
+ }
+
+ /** */
+ @Test
+ public void testPendingMessagesMetric() throws Exception {
+ int preloadCnt = 500;
+
+ Ignite server = startGridsMultiThreaded(2);
+ Ignite client = startClientGrid(G.allGrids().size());
+
+ Ignite ldr = clientLdr ? client : server;
+
+ GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+ AtomicBoolean runFlag = new AtomicBoolean(true);
+ AtomicLong loadCnt = new AtomicLong(preloadCnt);
+
+ AtomicInteger writeDelay = new AtomicInteger();
+
+ IgniteInternalFuture<?> loadFut = runLoad(
+ ldr,
+ runFlag,
+ () -> new TestMessage(writeDelay.get()),
+ loadCnt
+ );
+
+ assertTrue(waitForCondition(() -> loadCnt.get() <= 0 ||
!runFlag.get(), getTestTimeout(), 25));
+
+ // Ensure that preloaded without a failure.
+ assertTrue(runFlag.get());
+
+ // Will delay message queue processing but not network i/o.
+ writeDelay.set(50);
+
+ long checkPeriod =
U.nanosToMillis(ConnectionClientPool.METRICS_UPDATE_THRESHOLD / 3);
+
+ // Check metrics.
+ for (Ignite node : G.allGrids()) {
+ if (node == ldr)
+ continue;
+
+ MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+ assertTrue(waitForCondition(
+ () ->
mreg.<IntMetric>findMetric(METRIC_NAME_MSG_QUEUE_SIZE).value() >= Math.max(10,
msgQueueLimit),
+ getTestTimeout(), checkPeriod)
+ );
+ }
+
+ writeDelay.set(0);
+
+ dumpMetrics(ldr);
+
+ runFlag.set(false);
+ loadFut.get(getTestTimeout());
+ }
+
+ /** */
+ private IgniteInternalFuture<?> runLoad(
+ Ignite ldrNode,
+ AtomicBoolean keepLoadingFlag,
+ Supplier<Message> msgSupplier,
+ @Nullable AtomicLong preloadCnt,
+ long minIterationDelayMs,
+ long maxIterationDelayMs
+ ) {
+ assert minIterationDelayMs >= 0 && maxIterationDelayMs >=
minIterationDelayMs;
+
+ GridIoManager io = ((IgniteEx)ldrNode).context().io();
+
+ return GridTestUtils.runMultiThreadedAsync(() -> {
+ try {
+ while (keepLoadingFlag.get()) {
+ for (Ignite node : G.allGrids()) {
+ if (node == ldrNode)
+ continue;
+
+ Message msg = msgSupplier.get();
+
+ io.sendToCustomTopic(node.cluster().localNode().id(),
"tt", msg, GridIoPolicy.PUBLIC_POOL);
+ }
+
+ if (preloadCnt != null && preloadCnt.get() > 0)
+ preloadCnt.decrementAndGet();
+
+ if (maxIterationDelayMs > 0)
+
U.sleep(ThreadLocalRandom.current().nextLong(minIterationDelayMs,
maxIterationDelayMs + 1));
+ }
+ }
+ catch (Throwable ignored) {
+ // No-op.
+ keepLoadingFlag.set(false);
+ }
+ }, Math.max(MIN_LOAD_THREADS, connsPerNode + connsPerNode / 2),
"testLoader");
+ }
+
+ /** */
+ private IgniteInternalFuture<?> runLoad(
+ Ignite ldrNode,
+ AtomicBoolean keepLoadingFlag,
+ Supplier<Message> msgSupplier,
+ @Nullable AtomicLong preloadCnt
+ ) {
+ return runLoad(ldrNode, keepLoadingFlag, msgSupplier, preloadCnt, 1,
3);
+ }
+
+ /** */
+ private static void dumpMetrics(Ignite ldr) {
+ if (!log.isInfoEnabled())
+ return;
+
+ GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+
+ for (Ignite node : G.allGrids()) {
+ if (node == ldr)
+ continue;
+
+ MetricRegistryImpl mreg =
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+ StringBuilder b = new StringBuilder()
+ .append("Pool metrics from node
").append(ldr.cluster().localNode().order())
+ .append(" to node ").append(node.cluster().localNode().order())
+ .append(": ");
+
+ for (Metric m : mreg) {
+ b.append(System.lineSeparator()).append('\t');
+
+ b.append(m.name()).append(" = ").append(m.getAsString());
+ }
+
+ log.info(b.toString());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 120 * 1000;
+ }
+
+ /** */
+ public static class TestCommunicationMessagePluginProvider extends
AbstractTestPluginProvider {
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "TEST_PLUGIN";
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initExtensions(PluginContext ctx,
ExtensionRegistry registry) {
+ registry.registerExtension(MessageFactoryProvider.class, new
MessageFactoryProvider() {
+ @Override public void registerAll(MessageFactory factory) {
+ factory.register(TestMessage.DIRECT_TYPE,
TestMessage::new);
+ }
+ });
+ }
+ }
+
+ /** */
+ private static class TestMessage extends GridTestMessage {
+ /** */
+ private final int writeDelay;
+
+ /** */
+ public TestMessage(int writeDelay) {
+ this.writeDelay = writeDelay;
+ }
+
+ /** */
+ public TestMessage() {
+ this(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
+ if (writeDelay > 0) {
+ try {
+ U.sleep(writeDelay);
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ // No-op.
+ }
+ }
+
+ return super.writeTo(buf, writer);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
index f2b6bafccdf..ee665ca8a60 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
@@ -19,11 +19,13 @@ package org.apache.ignite.spi.communication.tcp;
import java.io.IOException;
import java.util.Iterator;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
@@ -31,6 +33,7 @@ import
org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -136,6 +139,7 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest
extends GridCommonAbstr
CommunicationSpi commSpi =
srcNode.configuration().getCommunicationSpi();
ConcurrentMap<UUID, GridCommunicationClient[]> clients =
GridTestUtils.getFieldValue(commSpi, "clientPool", "clients");
+ GridMetricManager metricMgr = GridTestUtils.getFieldValue(commSpi,
"clientPool", "metricsMgr");
ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs =
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "recoveryDescs");
ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs =
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "outRecDescs");
ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs =
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "inRecDescs");
@@ -160,6 +164,16 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest
extends GridCommonAbstr
// uninformed and force ping old connection.
GridCommunicationClient[] clients0 =
clients.remove(targetNode.cluster().localNode().id());
+ if (metricMgr != null) {
+ Map<UUID, ?> metrics = GridTestUtils.getFieldValue(commSpi,
"clientPool", "metrics");
+
+ assert metrics != null;
+
+ metrics.remove(targetNode.cluster().localNode().id());
+
+
metricMgr.remove(ConnectionClientPool.nodeMetricsRegName(targetNode.cluster().localNode().id()));
+ }
+
for (GridCommunicationClient commClient : clients0)
lsnr.onDisconnected(((GridTcpNioCommunicationClient)commClient).session(), new
IOException("Test exception"));
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index e231898adf6..1029d060c2b 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import
org.apache.ignite.internal.util.nio.TcpCommunicationSpiSslVolatilePayloadTest;
import org.apache.ignite.spi.communication.tcp.ClientExceptionsUtilsTest;
+import
org.apache.ignite.spi.communication.tcp.CommunicationConnectionPoolMetricsTest;
import org.apache.ignite.spi.communication.tcp.GridCacheDhtLockBackupSelfTest;
import
org.apache.ignite.spi.communication.tcp.GridSandboxedClientWithoutNetworkTest;
import
org.apache.ignite.spi.communication.tcp.GridTcpCommunicationInverseConnectionEstablishingTest;
@@ -100,6 +101,8 @@ import org.junit.runners.Suite;
TcpCommunicationStatisticsTest.class,
+ CommunicationConnectionPoolMetricsTest.class,
+
IgniteTcpCommunicationHandshakeWaitTest.class,
IgniteTcpCommunicationHandshakeWaitSslTest.class,
IgniteTcpCommunicationConnectOnInitTest.class,
diff --git
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
index 0f65306eac6..b45b79ab2fa 100644
---
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
+++
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
@@ -774,11 +774,21 @@ class ZookeeperDiscoverySpiTestBase extends
GridCommonAbstractTest {
delegate.release();
}
+ /** {@inheritDoc} */
+ @Override public long creationTime() {
+ return delegate.creationTime();
+ }
+
/** {@inheritDoc} */
@Override public long getIdleTime() {
return delegate.getIdleTime();
}
+ /** {@inheritDoc} */
+ @Override public int messagesQueueSize() {
+ return delegate.messagesQueueSize();
+ }
+
/** {@inheritDoc} */
@Override public void sendMessage(ByteBuffer data) throws
IgniteCheckedException {
if (failure && !matrix.hasConnection(locNode, remoteNode))