This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f088e3fed9d IGNITE-26179 Added metrics of TCP communication connection 
pool (#12271)
f088e3fed9d is described below

commit f088e3fed9d404ee0b64cc1de9e6ba7eea6ef1db
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Oct 16 09:48:30 2025 +0300

    IGNITE-26179 Added metrics of TCP communication connection pool (#12271)
---
 .../internal/util/nio/GridCommunicationClient.java |  14 +
 .../ignite/internal/util/nio/GridNioServer.java    |   5 +-
 .../ignite/internal/util/nio/GridNioSession.java   |   7 +
 .../internal/util/nio/GridNioSessionImpl.java      |   9 +-
 .../util/nio/GridSelectorNioSessionImpl.java       |   8 +-
 .../util/nio/GridTcpNioCommunicationClient.java    |  16 +-
 .../tcp/TcpCommunicationMetricsListener.java       |   6 +-
 .../spi/communication/tcp/TcpCommunicationSpi.java |  17 +-
 .../tcp/internal/ConnectionClientPool.java         | 516 ++++++++++++++-----
 .../tcp/internal/RoundRobinConnectionPolicy.java   |  22 +-
 .../internal/util/nio/impl/MockNioSession.java     |   5 +
 .../CommunicationConnectionPoolMetricsTest.java    | 556 +++++++++++++++++++++
 ...cpCommunicationSpiHalfOpenedConnectionTest.java |  14 +
 .../IgniteSpiCommunicationSelfTestSuite.java       |   3 +
 .../zk/internal/ZookeeperDiscoverySpiTestBase.java |  10 +
 15 files changed, 1069 insertions(+), 139 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index b4a9a954731..514dedf6d21 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -75,6 +75,20 @@ public interface GridCommunicationClient {
      */
     public long getIdleTime();
 
+    /**
+     * Provides creation time of this client.
+     *
+     * @return Creation time of this client.
+     */
+    public long creationTime();
+
+    /**
+     * Provides number of pending messages.
+     *
+     * @return Number of pending messages.
+     */
+    public int messagesQueueSize();
+
     /**
      * @param data Data to send.
      * @throws IgniteCheckedException If failed.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 4e43d28ea7e..aaa78cd1769 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -151,7 +151,8 @@ public class GridNioServer<T> {
     public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME = 
"outboundMessagesQueueSize";
 
     /** */
-    public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC = 
"Number of messages waiting to be sent";
+    public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC
+        = "Total number of messages waiting to be sent over all connections";
 
     /** */
     public static final String RECEIVED_BYTES_METRIC_NAME = "receivedBytes";
@@ -2449,7 +2450,7 @@ public class GridNioServer<T> {
                         .append(", bytesRcvd0=").append(ses.bytesReceived0())
                         .append(", bytesSent=").append(ses.bytesSent())
                         .append(", bytesSent0=").append(ses.bytesSent0())
-                        .append(", opQueueSize=").append(ses.writeQueueSize());
+                        .append(", 
opQueueSize=").append(ses.messagesQueueSize());
 
                     if (!shortInfo) {
                         MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index 1b9ac6f874a..af0a3bba338 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -86,6 +86,13 @@ public interface GridNioSession {
      */
     public long lastSendTime();
 
+    /**
+     * Returns number of pending messages.
+     *
+     * @return Number of pending messages.
+     */
+    public int messagesQueueSize();
+
     /**
      * Returns time when last send was scheduled on this session.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index ef8aadd909f..0211eb85dca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -48,7 +48,7 @@ public class GridNioSessionImpl implements GridNioSession {
     private final InetSocketAddress rmtAddr;
 
     /** Session create timestamp. */
-    private long createTime;
+    private final long createTime;
 
     /** Session close timestamp. */
     private final AtomicLong closeTime = new AtomicLong();
@@ -78,7 +78,7 @@ public class GridNioSessionImpl implements GridNioSession {
     private volatile boolean readsPaused;
 
     /** Filter chain that will handle write and close requests. */
-    private GridNioFilterChain filterChain;
+    private final GridNioFilterChain filterChain;
 
     /** Accepted flag. */
     private final boolean accepted;
@@ -242,6 +242,11 @@ public class GridNioSessionImpl implements GridNioSession {
         return sndSchedTime;
     }
 
+    /** {@inheritDoc} */
+    @Override public int messagesQueueSize() {
+        return 0;
+    }
+
     /** {@inheritDoc} */
     @Override public <T> T meta(int key) {
         assert key < meta.length;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 7fceaad96ea..8004bce31ae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -412,12 +412,8 @@ public class GridSelectorNioSessionImpl extends 
GridNioSessionImpl implements Gr
         return rmv;
     }
 
-    /**
-     * Gets number of write requests in a queue that have not been processed 
yet.
-     *
-     * @return Number of write requests.
-     */
-    int writeQueueSize() {
+    /** {@inheritDoc} */
+    @Override public int messagesQueueSize() {
         return queue.sizex();
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 056f4c20a32..4ff126fa4cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -25,7 +25,6 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -101,10 +100,7 @@ public class GridTcpNioCommunicationClient extends 
GridAbstractCommunicationClie
         if (closed())
             throw new IgniteCheckedException("Client was closed: " + this);
 
-        IgniteInternalFuture<?> fut = ses.send(data);
-
-        if (fut.isDone())
-            fut.get();
+        ses.send(data);
     }
 
     /** {@inheritDoc} */
@@ -146,6 +142,16 @@ public class GridTcpNioCommunicationClient extends 
GridAbstractCommunicationClie
             now - ses.lastSendTime());
     }
 
+    /** {@inheritDoc} */
+    @Override public long creationTime() {
+        return ses.createTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int messagesQueueSize() {
+        return ses.messagesQueueSize();
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridTcpNioCommunicationClient.class, this, 
super.toString());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index 266fafa982b..2fb542ceb8b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -147,8 +147,8 @@ public class TcpCommunicationMetricsListener {
         rcvdMsgsMetric = mreg.longAdderMetric(RECEIVED_MESSAGES_METRIC_NAME, 
RECEIVED_MESSAGES_METRIC_DESC);
 
         spiCtx.addMetricRegistryCreationListener(mreg -> {
-            // Metrics for the specific nodes.
-            if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + 
SEPARATOR))
+            // Metrics for the specific nodes or other communication metrics.
+            if (!TcpCommunicationSpi.isCommunicationMetrics(mreg.name()))
                 return;
 
             ((MetricRegistryImpl)mreg).longAdderMetric(
@@ -380,7 +380,7 @@ public class TcpCommunicationMetricsListener {
         }
 
         for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
-            if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + 
SEPARATOR)) {
+            if (TcpCommunicationSpi.isCommunicationMetrics(mreg.name())) {
                 
mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
 
                 
mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2beea5091d1..27e2d69dc23 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -94,6 +94,7 @@ import org.jetbrains.annotations.TestOnly;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.SEPARATOR;
 import static 
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.NOOP;
 import static 
org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage.UNDEFINED_CONNECTION_INDEX;
 
@@ -374,6 +375,17 @@ public class TcpCommunicationSpi extends 
TcpCommunicationConfigInitializer {
         this.lsnr = lsnr;
     }
 
+    /**
+     * @param metricName Metric name.
+     * @return {@code True} if the metric name is a pure TCP Communication 
metric. {@code False} if is other metric or
+     * metric of other TCP Communication component.
+     */
+    public static boolean isCommunicationMetrics(String metricName) {
+        return metricName.startsWith(COMMUNICATION_METRICS_GROUP_NAME + 
SEPARATOR)
+            && 
!metricName.startsWith(ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME + 
SEPARATOR)
+            && 
!metricName.equals(ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME);
+    }
+
     /** {@inheritDoc} */
     @Override public int getSentMessagesCount() {
         // Listener could be not initialized yet, but discovery thread could 
try to aggregate metrics.
@@ -597,7 +609,7 @@ public class TcpCommunicationSpi extends 
TcpCommunicationConfigInitializer {
         }
 
         if (cfg.connectionsPerNode() > 1)
-            connPlc = new RoundRobinConnectionPolicy(cfg);
+            connPlc = new RoundRobinConnectionPolicy(cfg.connectionsPerNode());
         else
             connPlc = new FirstConnectionPolicy();
 
@@ -684,7 +696,8 @@ public class TcpCommunicationSpi extends 
TcpCommunicationConfigInitializer {
             this,
             stateProvider,
             nioSrvWrapper,
-            getName()
+            getName(),
+            ((IgniteEx)ignite).context().metric()
         ));
 
         this.srvLsnr.setClientPool(clientPool);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
index 42c21bbc5b5..9ae4b8d3b43 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
@@ -23,9 +23,13 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.StringJoiner;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
@@ -36,6 +40,8 @@ import 
org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
@@ -54,6 +60,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static java.util.Objects.nonNull;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DISABLED_CLIENT_PORT;
 import static 
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.nodeAddresses;
 import static 
org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.usePairedConnections;
@@ -65,9 +72,49 @@ public class ConnectionClientPool {
     /** Time threshold to log too long connection establish. */
     private static final int CONNECTION_ESTABLISH_THRESHOLD_MS = 100;
 
+    /** */
+    public static final long METRICS_UPDATE_THRESHOLD = U.millisToNanos(200);
+
+    /** */
+    public static final String SHARED_METRICS_REGISTRY_NAME = 
metricName(TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME,
+        "connectionPool");
+
+    /** */
+    public static final String METRIC_NAME_POOL_SIZE = "maxConnectionsCnt";
+
+    /** */
+    public static final String METRIC_NAME_PAIRED_CONNS = "isPaired";
+
+    /** */
+    public static final String METRIC_NAME_ASYNC_CONNS = "isAsync";
+
+    /** It is handy for a user to see consistent id of a problematic node. */
+    public static final String METRIC_NAME_CONSIST_ID = "consistentId";
+
+    /** */
+    public static final String METRIC_NAME_CUR_CNT = "currentConnectionsCnt";
+
+    /** */
+    public static final String METRIC_NAME_MSG_QUEUE_SIZE = 
"outboundMessagesQueueSize";
+
+    /** */
+    public static final String METRIC_NAME_REMOVED_CNT = 
"removedConnectionsCnt";
+
+    /** */
+    public static final String METRIC_NAME_MAX_NET_IDLE_TIME = 
"maxNetworkIdleTime";
+
+    /** */
+    public static final String METRIC_NAME_AVG_LIFE_TIME = 
"avgConnectionLifetime";
+
+    /** */
+    public static final String METRIC_NAME_ACQUIRING_THREADS_CNT = 
"acquiringThreadsCnt";
+
     /** Clients. */
     private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = 
GridConcurrentFactory.newMap();
 
+    /** Metrics for each remote node. */
+    private final Map<UUID, NodeMetrics> metrics;
+
     /** Config. */
     private final TcpCommunicationConfiguration cfg;
 
@@ -116,6 +163,12 @@ public class ConnectionClientPool {
     private boolean forcibleNodeKillEnabled = IgniteSystemProperties
         .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
 
+    /** */
+    private final GridMetricManager metricsMgr;
+
+    /** */
+    private volatile AtomicBoolean asyncMetric;
+
     /**
      * @param cfg Config.
      * @param attrs Attributes.
@@ -129,6 +182,7 @@ public class ConnectionClientPool {
      * @param clusterStateProvider Cluster state provider.
      * @param nioSrvWrapper Nio server wrapper.
      * @param igniteInstanceName Ignite instance name.
+     * @param metricsMgr Metrics manager. If {@code null}, no metrics are 
created.
      */
     public ConnectionClientPool(
         TcpCommunicationConfiguration cfg,
@@ -142,23 +196,46 @@ public class ConnectionClientPool {
         TcpCommunicationSpi tcpCommSpi,
         ClusterStateProvider clusterStateProvider,
         GridNioServerWrapper nioSrvWrapper,
-        String igniteInstanceName
+        String igniteInstanceName,
+        GridMetricManager metricsMgr
     ) {
         this.cfg = cfg;
         this.attrs = attrs;
         this.log = log;
         this.metricsLsnr = metricsLsnr;
         this.locNodeSupplier = locNodeSupplier;
-        this.nodeGetter = nodeGetter;
         this.msgFormatterSupplier = msgFormatterSupplier;
         this.registry = registry;
         this.tcpCommSpi = tcpCommSpi;
         this.clusterStateProvider = clusterStateProvider;
         this.nioSrvWrapper = nioSrvWrapper;
+        this.metricsMgr = metricsMgr;
+
+        this.nodeGetter = new Function<>() {
+            @Override public ClusterNode apply(UUID nodeId) {
+                ClusterNode node = nodeGetter.apply(nodeId);
+
+                if (node == null)
+                    removeNodeMetrics(nodeId);
+
+                return node;
+            }
+        };
 
         this.handshakeTimeoutExecutorService = 
newSingleThreadScheduledExecutor(
             new IgniteThreadFactory(igniteInstanceName, 
"handshake-timeout-client")
         );
+
+        if (metricsMgr != null) {
+            MetricRegistryImpl mreg = 
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
+
+            mreg.register(METRIC_NAME_POOL_SIZE, () -> 
cfg.connectionsPerNode(), "Maximal connections number to a remote node.");
+            mreg.register(METRIC_NAME_PAIRED_CONNS, () -> 
cfg.usePairedConnections(), "Paired connections flag.");
+
+            metrics = new ConcurrentHashMap<>(64, 0.75f, Math.max(16, 
Runtime.getRuntime().availableProcessors()));
+        }
+        else
+            metrics = null;
     }
 
     /**
@@ -167,6 +244,10 @@ public class ConnectionClientPool {
     public void stop() {
         this.stopping = true;
 
+        metricsMgr.remove(SHARED_METRICS_REGISTRY_NAME);
+
+        clients.keySet().forEach(this::removeNodeMetrics);
+
         for (GridFutureAdapter<GridCommunicationClient> fut : 
clientFuts.values()) {
             if (fut instanceof ConnectionRequestFuture) {
                 // There's no way it would be done by itself at this point.
@@ -186,155 +267,188 @@ public class ConnectionClientPool {
      * @throws IgniteCheckedException Thrown if any exception occurs.
      */
     public GridCommunicationClient reserveClient(ClusterNode node, int 
connIdx) throws IgniteCheckedException {
-        assert node != null;
-        assert (connIdx >= 0 && connIdx < cfg.connectionsPerNode())
-            || !(cfg.usePairedConnections() && usePairedConnections(node, 
attrs.pairedConnection()))
-            || GridNioServerWrapper.isChannelConnIdx(connIdx) : "Wrong 
communication connection index: " + connIdx;
-
-        if (locNodeSupplier.get().isClient()) {
-            if (node.isClient()) {
-                if (DISABLED_CLIENT_PORT.equals(node.attribute(attrs.port())))
-                    throw new IgniteSpiException("Cannot send message to the 
client node with no server socket opened.");
+        NodeMetrics nodeMetrics = metrics.get(node.id());
+
+        if (nodeMetrics != null)
+            nodeMetrics.acquiringThreadsCnt.incrementAndGet();
+
+        try {
+            assert node != null;
+            assert (connIdx >= 0 && connIdx < cfg.connectionsPerNode())
+                || !(cfg.usePairedConnections() && usePairedConnections(node, 
attrs.pairedConnection()))
+                || GridNioServerWrapper.isChannelConnIdx(connIdx) : "Wrong 
communication connection index: " + connIdx;
+
+            if (locNodeSupplier.get().isClient()) {
+                if (node.isClient()) {
+                    if 
(DISABLED_CLIENT_PORT.equals(node.attribute(attrs.port())))
+                        throw new IgniteSpiException("Cannot send message to 
the client node with no server socket opened.");
+                }
             }
-        }
 
-        UUID nodeId = node.id();
+            UUID nodeId = node.id();
 
-        if (log.isDebugEnabled())
-            log.debug("The node client is going to reserve a connection 
[nodeId=" + node.id() + ", connIdx=" + connIdx + "]");
+            if (log.isDebugEnabled())
+                log.debug("The node client is going to reserve a connection 
[nodeId=" + node.id() + ", connIdx=" + connIdx + "]");
 
-        while (true) {
-            GridCommunicationClient[] curClients = clients.get(nodeId);
+            while (true) {
+                GridCommunicationClient[] curClients = clients.get(nodeId);
+
+                GridCommunicationClient client = curClients != null && connIdx 
< curClients.length ?
+                    curClients[connIdx] : null;
 
-            GridCommunicationClient client = curClients != null && connIdx < 
curClients.length ?
-                curClients[connIdx] : null;
+                if (client == null) {
+                    if (stopping)
+                        throw new IgniteSpiException("Node is stopping.");
 
-            if (client == null) {
-                if (stopping)
-                    throw new IgniteSpiException("Node is stopping.");
+                    // Do not allow concurrent connects.
+                    GridFutureAdapter<GridCommunicationClient> fut = new 
ConnectFuture();
 
-                // Do not allow concurrent connects.
-                GridFutureAdapter<GridCommunicationClient> fut = new 
ConnectFuture();
+                    ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, 
-1);
 
-                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
+                    GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(connKey, fut);
 
-                GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(connKey, fut);
+                    if (oldFut == null) {
+                        try {
+                            GridCommunicationClient[] curClients0 = 
clients.get(nodeId);
 
-                if (oldFut == null) {
-                    try {
-                        GridCommunicationClient[] curClients0 = 
clients.get(nodeId);
+                            GridCommunicationClient client0 = curClients0 != 
null && connIdx < curClients0.length ?
+                                curClients0[connIdx] : null;
 
-                        GridCommunicationClient client0 = curClients0 != null 
&& connIdx < curClients0.length ?
-                            curClients0[connIdx] : null;
+                            if (client0 == null) {
+                                client0 = createCommunicationClient(node, 
connIdx);
 
-                        if (client0 == null) {
-                            client0 = createCommunicationClient(node, connIdx);
+                                if (client0 != null) {
+                                    addNodeClient(node, connIdx, client0);
 
-                            if (client0 != null) {
-                                addNodeClient(node, connIdx, client0);
+                                    if (client0 instanceof 
GridTcpNioCommunicationClient) {
+                                        GridTcpNioCommunicationClient 
tcpClient = ((GridTcpNioCommunicationClient)client0);
 
-                                if (client0 instanceof 
GridTcpNioCommunicationClient) {
-                                    GridTcpNioCommunicationClient tcpClient = 
((GridTcpNioCommunicationClient)client0);
+                                        if (tcpClient.session().closeTime() > 
0 && removeNodeClient(nodeId, client0)) {
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("Session was closed 
after client creation, will retry " +
+                                                    "[node=" + node + ", 
client=" + client0 + ']');
+                                            }
 
-                                    if (tcpClient.session().closeTime() > 0 && 
removeNodeClient(nodeId, client0)) {
-                                        if (log.isDebugEnabled()) {
-                                            log.debug("Session was closed 
after client creation, will retry " +
-                                                "[node=" + node + ", client=" 
+ client0 + ']');
+                                            client0 = null;
                                         }
-
-                                        client0 = null;
                                     }
                                 }
-                            }
-                            else {
-                                U.sleep(200);
+                                else {
+                                    U.sleep(200);
 
-                                if (nodeGetter.apply(node.id()) == null)
-                                    throw new 
ClusterTopologyCheckedException("Failed to send message " +
-                                        "(node left topology): " + node);
+                                    if (nodeGetter.apply(node.id()) == null)
+                                        throw new 
ClusterTopologyCheckedException("Failed to send message " +
+                                            "(node left topology): " + node);
+                                }
                             }
-                        }
 
-                        fut.onDone(client0);
-                    }
-                    catch (NodeUnreachableException e) {
-                        log.warning(e.getMessage());
+                            fut.onDone(client0);
+                        }
+                        catch (NodeUnreachableException e) {
+                            log.warning(e.getMessage());
 
-                        fut = handleUnreachableNodeException(node, connIdx, 
fut, e);
-                    }
-                    catch (Throwable e) {
-                        if (e instanceof NodeUnreachableException)
-                            throw e;
+                            fut = handleUnreachableNodeException(node, 
connIdx, fut, e);
+                        }
+                        catch (Throwable e) {
+                            if (e instanceof NodeUnreachableException)
+                                throw e;
 
-                        fut.onDone(e);
+                            fut.onDone(e);
 
-                        if (e instanceof IgniteTooManyOpenFilesException)
-                            throw e;
+                            if (e instanceof IgniteTooManyOpenFilesException)
+                                throw e;
 
-                        if (e instanceof Error)
-                            throw (Error)e;
-                    }
-                    finally {
-                        clientFuts.remove(connKey, fut);
+                            if (e instanceof Error)
+                                throw (Error)e;
+                        }
+                        finally {
+                            clientFuts.remove(connKey, fut);
+                        }
                     }
-                }
-                else
-                    fut = oldFut;
+                    else
+                        fut = oldFut;
 
-                long clientReserveWaitTimeout = registry != null ? 
registry.getSystemWorkerBlockedTimeout() / 3
-                    : cfg.connectionTimeout() / 3;
+                    long clientReserveWaitTimeout = registry != null ? 
registry.getSystemWorkerBlockedTimeout() / 3
+                        : cfg.connectionTimeout() / 3;
 
-                long currTimeout = System.currentTimeMillis();
+                    long currTimeout = System.currentTimeMillis();
 
-                // This cycle will eventually quit when future is completed by 
concurrent thread reserving client.
-                while (true) {
-                    try {
-                        client = fut.get(clientReserveWaitTimeout, 
TimeUnit.MILLISECONDS);
+                    // This cycle will eventually quit when future is 
completed by concurrent thread reserving client.
+                    while (true) {
+                        try {
+                            client = fut.get(clientReserveWaitTimeout, 
TimeUnit.MILLISECONDS);
 
-                        break;
-                    }
-                    catch (IgniteFutureTimeoutCheckedException ignored) {
-                        currTimeout += clientReserveWaitTimeout;
-
-                        if (log.isDebugEnabled()) {
-                            log.debug(
-                                "Still waiting for reestablishing connection 
to node " +
-                                    "[nodeId=" + node.id() + ", waitingTime=" 
+ currTimeout + "ms]"
-                            );
+                            break;
                         }
+                        catch (IgniteFutureTimeoutCheckedException ignored) {
+                            currTimeout += clientReserveWaitTimeout;
+
+                            if (log.isDebugEnabled()) {
+                                log.debug(
+                                    "Still waiting for reestablishing 
connection to node " +
+                                        "[nodeId=" + node.id() + ", 
waitingTime=" + currTimeout + "ms]"
+                                );
+                            }
 
-                        if (registry != null) {
-                            GridWorker wrkr = 
registry.worker(Thread.currentThread().getName());
+                            if (registry != null) {
+                                GridWorker wrkr = 
registry.worker(Thread.currentThread().getName());
 
-                            if (wrkr != null)
-                                wrkr.updateHeartbeat();
+                                if (wrkr != null)
+                                    wrkr.updateHeartbeat();
+                            }
                         }
                     }
-                }
 
-                if (client == null) {
-                    if (clusterStateProvider.isLocalNodeDisconnected())
-                        throw new IgniteCheckedException("Unable to create TCP 
client due to local node disconnecting.");
-                    else
-                        continue;
+                    if (client == null) {
+                        if (clusterStateProvider.isLocalNodeDisconnected())
+                            throw new IgniteCheckedException("Unable to create 
TCP client due to local node disconnecting.");
+                        else
+                            continue;
+                    }
+
+                    if (nodeGetter.apply(nodeId) == null) {
+                        if (removeNodeClient(nodeId, client))
+                            client.forceClose();
+
+                        throw new IgniteSpiException("Destination node is not 
in topology: " + node.id());
+                    }
                 }
 
-                if (nodeGetter.apply(nodeId) == null) {
-                    if (removeNodeClient(nodeId, client))
-                        client.forceClose();
+                assert connIdx == client.connectionIndex() : client;
 
-                    throw new IgniteSpiException("Destination node is not in 
topology: " + node.id());
+                if (client.reserve()) {
+                    updateClientAcquiredMetric(client);
+
+                    return client;
                 }
+                else
+                    // Client has just been closed by idle worker. Help it and 
try again.
+                    removeNodeClient(nodeId, client);
             }
+        }
+        finally {
+            if (nodeMetrics != null)
+                nodeMetrics.acquiringThreadsCnt.decrementAndGet();
+        }
+    }
 
-            assert connIdx == client.connectionIndex() : client;
+    /** */
+    private void updateClientAcquiredMetric(GridCommunicationClient client) {
+        if (asyncMetric == null) {
+            synchronized (metrics) {
+                if (asyncMetric == null) {
+                    MetricRegistryImpl mreg = 
metricsMgr.registry(SHARED_METRICS_REGISTRY_NAME);
 
-            if (client.reserve())
-                return client;
-            else
-                // Client has just been closed by idle worker. Help it and try 
again.
-                removeNodeClient(nodeId, client);
+                    // We assume that all the clients have the same async flag.
+                    asyncMetric = new AtomicBoolean(client.async());
+
+                    mreg.register(METRIC_NAME_ASYNC_CONNS, () -> 
asyncMetric.get(), "Asynchronous flag. If TRUE, " +
+                        "connections put data in a queue (with some 
preprocessing) instead of immediate sending.");
+                }
+            }
         }
+        else
+            assert client.async() == asyncMetric.get();
     }
 
     /**
@@ -514,7 +628,19 @@ public class ConnectionClientPool {
                 newClients = new 
GridCommunicationClient[cfg.connectionsPerNode()];
                 newClients[connIdx] = addClient;
 
-                if (clients.putIfAbsent(node.id(), newClients) == null)
+                curClients = clients.compute(node.id(), (nodeId0, clients0) -> 
{
+                    if (clients0 == null) {
+                        // Syncs metrics creation on this map.
+                        if (metricsMgr != null)
+                            createNodeMetrics(node);
+
+                        return newClients;
+                    }
+
+                    return clients0;
+                });
+
+                if (curClients != null)
                     break;
             }
             else {
@@ -530,15 +656,108 @@ public class ConnectionClientPool {
         }
     }
 
+    /** */
+    private void createNodeMetrics(ClusterNode node) {
+        MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.id()));
+
+        assert !mreg.iterator().hasNext() : "Node connection pools metrics 
aren't empty.";
+
+        mreg.register(METRIC_NAME_CONSIST_ID, () -> 
node.consistentId().toString(), String.class,
+            "Consistent id of the remote node as string.");
+
+        mreg.register(METRIC_NAME_CUR_CNT, () -> 
updatedNodeMetrics(node.id()).connsCnt,
+            "Number of current connections to the remote node.");
+
+        mreg.register(METRIC_NAME_MSG_QUEUE_SIZE, () -> 
updatedNodeMetrics(node.id()).msgsQueueSize,
+            "Overal number of pending messages to the remote node.");
+
+        mreg.register(METRIC_NAME_MAX_NET_IDLE_TIME, () -> 
updatedNodeMetrics(node.id()).maxIdleTime,
+            "Maximal idle time of physical sending or receiving data in 
milliseconds.");
+
+        mreg.register(METRIC_NAME_AVG_LIFE_TIME, () -> 
updatedNodeMetrics(node.id()).avgLifetime,
+            "Average connection lifetime in milliseconds.");
+
+        mreg.register(METRIC_NAME_REMOVED_CNT, () -> 
updatedNodeMetrics(node.id()).removedConnectionsCnt.get(),
+            "Total number of removed connections.");
+
+        mreg.register(METRIC_NAME_ACQUIRING_THREADS_CNT, () -> 
updatedNodeMetrics(node.id()).acquiringThreadsCnt.get(),
+            "Number of threads currently acquiring a connection.");
+    }
+
+    /** */
+    private NodeMetrics updatedNodeMetrics(UUID nodeId) {
+        long nowNanos = System.nanoTime();
+
+        NodeMetrics res = metrics.get(nodeId);
+
+        if (res == null || (nowNanos - res.updateTs > METRICS_UPDATE_THRESHOLD 
&& res.canUpdate())) {
+            GridCommunicationClient[] nodeClients = clients.get(nodeId);
+
+            // Node might already leave the cluster.
+            if (nodeClients != null) {
+                long nowMillis = U.currentTimeMillis();
+
+                res = new NodeMetrics(res);
+
+                long avgLifetime = 0;
+                long maxIdleTime = 0;
+
+                for (GridCommunicationClient nodeClient : nodeClients) {
+                    if (nodeClient == null)
+                        continue;
+
+                    ++res.connsCnt;
+
+                    avgLifetime += nowMillis - nodeClient.creationTime();
+
+                    long nodeIdleTime = nodeClient.getIdleTime();
+
+                    if (nodeIdleTime > maxIdleTime)
+                        maxIdleTime = nodeIdleTime;
+
+                    res.msgsQueueSize += nodeClient.messagesQueueSize();
+                }
+
+                if (res.connsCnt != 0)
+                    res.avgLifetime = avgLifetime / res.connsCnt;
+
+                res.maxIdleTime = maxIdleTime;
+
+                NodeMetrics res0 = res;
+
+                res.updateTs = System.nanoTime();
+
+                // Node might already leave the cluster. Syncs metrics removal 
on the clients map.
+                clients.compute(nodeId, (nodeId0, clients) -> {
+                    if (clients == null)
+                        removeNodeMetrics(nodeId);
+                    else
+                        metrics.put(nodeId, res0);
+
+                    return clients;
+                });
+            }
+            else {
+                removeNodeMetrics(nodeId);
+
+                res = null;
+            }
+        }
+
+        return res == null ? NodeMetrics.EMPTY : res;
+    }
+
+    /** */
+    public static String nodeMetricsRegName(UUID nodeId) {
+        return metricName(SHARED_METRICS_REGISTRY_NAME, nodeId.toString());
+    }
+
     /**
      * @param nodeId Node ID.
      * @param rmvClient Client to remove.
      * @return {@code True} if client was removed.
      */
     public boolean removeNodeClient(UUID nodeId, GridCommunicationClient 
rmvClient) {
-        if (log.isDebugEnabled())
-            log.debug("The client was removed [nodeId=" + nodeId + ",  
client=" + rmvClient.toString() + "].");
-
         for (; ; ) {
             GridCommunicationClient[] curClients = clients.get(nodeId);
 
@@ -551,8 +770,17 @@ public class ConnectionClientPool {
 
             newClients[rmvClient.connectionIndex()] = null;
 
-            if (clients.replace(nodeId, curClients, newClients))
+            if (clients.replace(nodeId, curClients, newClients)) {
+                NodeMetrics nodeMetrics = metrics.get(nodeId);
+
+                if (nodeMetrics != null && nodeMetrics != NodeMetrics.EMPTY)
+                    nodeMetrics.removedConnectionsCnt.addAndGet(1);
+
+                if (log.isDebugEnabled())
+                    log.debug("The client was removed [nodeId=" + nodeId + ",  
client=" + rmvClient.toString() + "].");
+
                 return true;
+            }
         }
     }
 
@@ -567,6 +795,8 @@ public class ConnectionClientPool {
         if (log.isDebugEnabled())
             log.debug("The node client connections were closed [nodeId=" + 
nodeId + "]");
 
+        removeNodeMetrics(nodeId);
+
         GridCommunicationClient[] clients = this.clients.remove(nodeId);
         if (nonNull(clients)) {
             for (GridCommunicationClient client : clients)
@@ -589,6 +819,8 @@ public class ConnectionClientPool {
     public void onNodeLeft(UUID nodeId) {
         GridCommunicationClient[] clients0 = clients.remove(nodeId);
 
+        removeNodeMetrics(nodeId);
+
         if (clients0 != null) {
             for (GridCommunicationClient client : clients0) {
                 if (client != null) {
@@ -603,6 +835,13 @@ public class ConnectionClientPool {
         }
     }
 
+    /** */
+    private void removeNodeMetrics(UUID nodeId) {
+        metricsMgr.remove(nodeMetricsRegName(nodeId));
+
+        metrics.remove(nodeId);
+    }
+
     /**
      * @param id Id.
      */
@@ -669,4 +908,57 @@ public class ConnectionClientPool {
     public void metricsListener(@Nullable TcpCommunicationMetricsListener 
metricsLsnr) {
         this.metricsLsnr = metricsLsnr;
     }
+
+    /** */
+    private static final class NodeMetrics {
+        /** Avoids NPEs on metrics getting because nodes leave cluster 
asynchronously. */
+        private static final NodeMetrics EMPTY = new NodeMetrics(null);
+
+        /** */
+        private volatile long updateTs = System.nanoTime();
+
+        /** */
+        private volatile boolean updatingFlag;
+
+        /** */
+        private int connsCnt;
+
+        /** */
+        private int msgsQueueSize;
+
+        /** */
+        private long maxIdleTime;
+
+        /** */
+        private long avgLifetime;
+
+        /** */
+        private final AtomicLong removedConnectionsCnt;
+
+        /** */
+        private final AtomicInteger acquiringThreadsCnt;
+
+        /** */
+        private NodeMetrics(@Nullable NodeMetrics prev) {
+            this.removedConnectionsCnt = prev == null ? new AtomicLong() : 
prev.removedConnectionsCnt;
+            this.acquiringThreadsCnt = prev == null ? new AtomicInteger() : 
prev.acquiringThreadsCnt;
+            this.avgLifetime = prev == null ? 0 : prev.avgLifetime;
+            this.maxIdleTime = prev == null ? 0 : prev.maxIdleTime;
+        }
+
+        /** */
+        private boolean canUpdate() {
+            if (updatingFlag)
+                return false;
+
+            synchronized (this) {
+                if (updatingFlag)
+                    return false;
+
+                updatingFlag = true;
+
+                return true;
+            }
+        }
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/RoundRobinConnectionPolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/RoundRobinConnectionPolicy.java
index afdad6661a1..e211df235c6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/RoundRobinConnectionPolicy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/RoundRobinConnectionPolicy.java
@@ -17,24 +17,32 @@
 
 package org.apache.ignite.spi.communication.tcp.internal;
 
-import org.apache.ignite.internal.util.typedef.internal.U;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Round robin connection policy.
  */
 public class RoundRobinConnectionPolicy implements ConnectionPolicy {
-    /** Config. */
-    private final TcpCommunicationConfiguration cfg;
+    /** Maximal connections number. */
+    private final int cnt;
+
+    /** */
+    private final AtomicInteger counter = new AtomicInteger();
 
     /**
-     * @param cfg Config.
+     * @param cnt Maximal connections number.
      */
-    public RoundRobinConnectionPolicy(TcpCommunicationConfiguration cfg) {
-        this.cfg = cfg;
+    public RoundRobinConnectionPolicy(int cnt) {
+        this.cnt = cnt;
     }
 
     /** {@inheritDoc} */
     @Override public int connectionIndex() {
-        return (int)(U.safeAbs(Thread.currentThread().getId()) % 
cfg.connectionsPerNode());
+        int idx = counter.getAndIncrement() % cnt;
+
+        if (idx < 0)
+            idx += cnt;
+
+        return idx;
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/MockNioSession.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/MockNioSession.java
index 6a04aeda125..b1a26b41b4a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/MockNioSession.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/MockNioSession.java
@@ -104,6 +104,11 @@ public class MockNioSession extends 
GridMetadataAwareAdapter implements GridNioS
         return 0;
     }
 
+    /** {@inheritDoc} */
+    @Override public int messagesQueueSize() {
+        return 0;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Boolean> close() {
         return new GridFinishedFuture<>(true);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
new file mode 100644
index 00000000000..a4eb483d5e9
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.BooleanGauge;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.GridTestMessage;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool;
+import org.apache.ignite.spi.metric.IntMetric;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.Metric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_ACQUIRING_THREADS_CNT;
+import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_AVG_LIFE_TIME;
+import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_CONSIST_ID;
+import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_CUR_CNT;
+import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_MAX_NET_IDLE_TIME;
+import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_MSG_QUEUE_SIZE;
+import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.METRIC_NAME_REMOVED_CNT;
+import static 
org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool.nodeMetricsRegName;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** Tests metrics of {@link ConnectionClientPool}. */
+@RunWith(Parameterized.class)
+public class CommunicationConnectionPoolMetricsTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final int MIN_LOAD_THREADS = 2;
+
+    /** */
+    private volatile long maxConnIdleTimeout = 
TcpCommunicationSpi.DFLT_IDLE_CONN_TIMEOUT;
+
+    /** */
+    private volatile int createClientDelay;
+
+    /** */
+    @Parameterized.Parameter(0)
+    public int connsPerNode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean pairedConns;
+
+    /** */
+    @Parameterized.Parameter(2)
+    public int msgQueueLimit;
+
+    /** */
+    @Parameterized.Parameter(3)
+    public boolean clientLdr;
+
+    /** */
+    @Parameterized.Parameters(name = "connsPerNode={0}, pairedConns={1}, 
msgQueueLimit={2}, clientLdr={3}")
+    public static Collection<Object[]> params() {
+        return GridTestUtils.cartesianProduct(
+            F.asList(1, 4), // Connections per node.
+            F.asList(false, true), // Paired connections.
+            F.asList(0, 100), // Message queue limit.
+            F.asList(true, false) // Use client as a load.
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpCommunicationSpi communicationSpi = new TcpCommunicationSpi() {
+            @Override protected GridCommunicationClient 
createTcpClient(ClusterNode node,
+                int connIdx) throws IgniteCheckedException {
+                if (createClientDelay > 0)
+                    U.sleep(createClientDelay);
+
+                return super.createTcpClient(node, connIdx);
+            }
+        };
+
+        communicationSpi.setConnectionsPerNode(connsPerNode)
+            .setUsePairedConnections(pairedConns)
+            .setIdleConnectionTimeout(maxConnIdleTimeout)
+            .setMessageQueueLimit(msgQueueLimit);
+
+        cfg.setCommunicationSpi(communicationSpi);
+
+        cfg.setPluginProviders(new TestCommunicationMessagePluginProvider());
+
+        return cfg;
+    }
+
+    /** */
+    @Test
+    public void testRemovedConnectionMetrics() throws Exception {
+        maxConnIdleTimeout = 500;
+
+        Ignite srvr = startGridsMultiThreaded(2);
+        Ignite cli = startClientGrid(G.allGrids().size());
+
+        Ignite ldr = clientLdr ? cli : srvr;
+
+        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+        AtomicBoolean runFlag = new AtomicBoolean(true);
+        TestMessage msg = new TestMessage();
+
+        IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> msg, 
null);
+
+        // Wait until all connections are created and used.
+        for (Ignite node : G.allGrids()) {
+            if (node == ldr)
+                continue;
+
+            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+            assertTrue(waitForCondition(
+                () -> {
+                    IntMetric m = mreg.findMetric(METRIC_NAME_CUR_CNT);
+
+                    return m != null && m.value() == connsPerNode;
+                },
+                getTestTimeout())
+            );
+        }
+
+        // Ensure that the loading is ok.
+        assertTrue(runFlag.get());
+
+        runFlag.set(false);
+        loadFut.get(getTestTimeout());
+
+        // Wait until connections are closed.
+        for (Ignite node : G.allGrids()) {
+            if (node == ldr)
+                continue;
+
+            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+            assertTrue(waitForCondition(() -> 
mreg.<LongMetric>findMetric(METRIC_NAME_REMOVED_CNT).value() >= connsPerNode,
+                getTestTimeout()));
+        }
+
+        dumpMetrics(ldr);
+    }
+
+    /** */
+    @Test
+    public void testIdleRemovedConnectionMetricsUnderLazyLoad() throws 
Exception {
+        maxConnIdleTimeout = 10;
+
+        Ignite srvr = startGridsMultiThreaded(2);
+        Ignite cli = startClientGrid(G.allGrids().size());
+
+        Ignite ldr = clientLdr ? cli : srvr;
+
+        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+        AtomicBoolean runFlag = new AtomicBoolean(true);
+        Message msg = new TestMessage();
+
+        IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> msg, 
null, maxConnIdleTimeout, maxConnIdleTimeout * 4);
+
+        // Wait until all connections are created and used.
+        for (Ignite node : G.allGrids()) {
+            if (node == ldr)
+                continue;
+
+            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+            assertTrue(waitForCondition(
+                () -> {
+                    LongMetric m = mreg.findMetric(METRIC_NAME_REMOVED_CNT);
+
+                    return m != null && m.value() >= connsPerNode * 4L;
+                },
+                getTestTimeout())
+            );
+        }
+
+        // Ensure that the loading is ok.
+        assertTrue(runFlag.get());
+
+        runFlag.set(false);
+        loadFut.get(getTestTimeout());
+
+        dumpMetrics(ldr);
+    }
+
+    /** */
+    @Test
+    public void testMetricsBasics() throws Exception {
+        int preloadCnt = 300;
+        int srvrCnt = 3;
+
+        Ignite srvr = startGridsMultiThreaded(srvrCnt);
+        Ignite cli = startClientGrid(G.allGrids().size());
+
+        Ignite ldr = clientLdr ? cli : srvr;
+
+        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+        MetricRegistryImpl mreg0 = 
metricsMgr.registry(ConnectionClientPool.SHARED_METRICS_REGISTRY_NAME);
+
+        assertEquals(connsPerNode, 
mreg0.<IntMetric>findMetric(ConnectionClientPool.METRIC_NAME_POOL_SIZE).value());
+        assertEquals(pairedConns, 
mreg0.<BooleanGauge>findMetric(ConnectionClientPool.METRIC_NAME_PAIRED_CONNS).value());
+
+        AtomicBoolean runFlag = new AtomicBoolean(true);
+        AtomicLong loadCnt = new AtomicLong(preloadCnt);
+        TestMessage msg = new TestMessage();
+
+        long loadMillis0 = System.currentTimeMillis();
+
+        IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> msg, 
loadCnt);
+
+        assertTrue(waitForCondition(() -> loadCnt.get() <= 0 || 
!runFlag.get(), getTestTimeout(), 25));
+
+        long loadMillis1 = System.currentTimeMillis() - loadMillis0;
+
+        // Ensure that preloaded without a failure.
+        assertTrue(runFlag.get());
+
+        long checkPeriod = 
U.nanosToMillis(ConnectionClientPool.METRICS_UPDATE_THRESHOLD / 3);
+
+        // Check metrics.
+        for (Ignite node : G.allGrids()) {
+            if (node == ldr)
+                continue;
+
+            UUID nodeId = node.cluster().localNode().id();
+
+            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(nodeId));
+
+            // We assume that entire pool was used at least once.
+            assertTrue(waitForCondition(() -> connsPerNode == 
mreg.<IntMetric>findMetric(METRIC_NAME_CUR_CNT).value(),
+                getTestTimeout(), checkPeriod));
+
+            // Connections should not be idle under a heavy load.
+            assertTrue(waitForCondition(() -> 
mreg.<LongMetric>findMetric(METRIC_NAME_MAX_NET_IDLE_TIME).value() < 50,
+                getTestTimeout(), checkPeriod));
+
+            assertTrue(waitForCondition(() -> 
mreg.<LongMetric>findMetric(METRIC_NAME_AVG_LIFE_TIME).value() > loadMillis1,
+                getTestTimeout(), checkPeriod));
+
+            // Default connection idle and write timeouts are large enough. 
Connections should not be failed/deleted.
+            assertEquals(0, 
mreg.<LongMetric>findMetric(METRIC_NAME_REMOVED_CNT).value());
+
+            assertEquals(node.cluster().localNode().consistentId().toString(), 
mreg.findMetric(METRIC_NAME_CONSIST_ID).getAsString());
+        }
+
+        // Current connection implementations are async.
+        assertEquals(true, 
mreg0.<BooleanGauge>findMetric(ConnectionClientPool.METRIC_NAME_ASYNC_CONNS).value());
+
+        dumpMetrics(ldr);
+
+        // Check node metrics are cleared if a node stops.
+        for (Ignite node : G.allGrids()) {
+            if (node == ldr)
+                continue;
+
+            // Keep last server node.
+            if (!node.cluster().localNode().isClient() && --srvrCnt == 0)
+                break;
+
+            UUID nodeId = node.cluster().localNode().id();
+
+            assertTrue(G.stop(node.name(), true));
+
+            assertTrue(waitForCondition(() -> {
+                MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(nodeId));
+
+                return mreg == null || !mreg.iterator().hasNext();
+            }, getTestTimeout()));
+        }
+
+        runFlag.set(false);
+        loadFut.get(getTestTimeout());
+
+        // Ensure that all the possible nodes are stopped.
+        assertTrue(waitForCondition(() -> ldr.cluster().nodes().size() == 
(clientLdr ? 2 : 1), getTestTimeout()));
+    }
+
+    /** Simulates delay/concurrency of connections acquire. */
+    @Test
+    public void testAcquiringThreadsCntMetric() throws Exception {
+        // Forces quick connection removing and recreating.
+        maxConnIdleTimeout = 1;
+        createClientDelay = 50;
+
+        Ignite srvr = startGridsMultiThreaded(2);
+        Ignite cli = startClientGrid(G.allGrids().size());
+
+        Ignite ldr = clientLdr ? cli : srvr;
+
+        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+        AtomicBoolean runFlag = new AtomicBoolean(true);
+
+        IgniteInternalFuture<?> monFut = GridTestUtils.runAsync(() -> {
+            while (runFlag.get()) {
+                for (Ignite node : G.allGrids()) {
+                    if (node == ldr)
+                        continue;
+
+                    MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+                    IntMetric m = 
mreg.findMetric(METRIC_NAME_ACQUIRING_THREADS_CNT);
+
+                    if (m != null && m.value() >= MIN_LOAD_THREADS)
+                        runFlag.set(false);
+                }
+
+                U.sleep(1);
+            }
+        });
+
+        IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> new 
TestMessage((int)maxConnIdleTimeout * 3), null);
+
+        monFut.get(getTestTimeout());
+
+        createClientDelay = 0;
+
+        loadFut.get(getTestTimeout());
+    }
+
+    /** */
+    @Test
+    public void testPendingMessagesMetric() throws Exception {
+        int preloadCnt = 500;
+
+        Ignite server = startGridsMultiThreaded(2);
+        Ignite client = startClientGrid(G.allGrids().size());
+
+        Ignite ldr = clientLdr ? client : server;
+
+        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+        AtomicBoolean runFlag = new AtomicBoolean(true);
+        AtomicLong loadCnt = new AtomicLong(preloadCnt);
+
+        AtomicInteger writeDelay = new AtomicInteger();
+
+        IgniteInternalFuture<?> loadFut = runLoad(
+            ldr,
+            runFlag,
+            () -> new TestMessage(writeDelay.get()),
+            loadCnt
+        );
+
+        assertTrue(waitForCondition(() -> loadCnt.get() <= 0 || 
!runFlag.get(), getTestTimeout(), 25));
+
+        // Ensure that preloaded without a failure.
+        assertTrue(runFlag.get());
+
+        // Will delay message queue processing but not network i/o.
+        writeDelay.set(50);
+
+        long checkPeriod = 
U.nanosToMillis(ConnectionClientPool.METRICS_UPDATE_THRESHOLD / 3);
+
+        // Check metrics.
+        for (Ignite node : G.allGrids()) {
+            if (node == ldr)
+                continue;
+
+            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+            assertTrue(waitForCondition(
+                () -> 
mreg.<IntMetric>findMetric(METRIC_NAME_MSG_QUEUE_SIZE).value() >= Math.max(10, 
msgQueueLimit),
+                getTestTimeout(), checkPeriod)
+            );
+        }
+
+        writeDelay.set(0);
+
+        dumpMetrics(ldr);
+
+        runFlag.set(false);
+        loadFut.get(getTestTimeout());
+    }
+
+    /** */
+    private IgniteInternalFuture<?> runLoad(
+        Ignite ldrNode,
+        AtomicBoolean keepLoadingFlag,
+        Supplier<Message> msgSupplier,
+        @Nullable AtomicLong preloadCnt,
+        long minIterationDelayMs,
+        long maxIterationDelayMs
+    ) {
+        assert minIterationDelayMs >= 0 && maxIterationDelayMs >= 
minIterationDelayMs;
+
+        GridIoManager io = ((IgniteEx)ldrNode).context().io();
+
+        return GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                while (keepLoadingFlag.get()) {
+                    for (Ignite node : G.allGrids()) {
+                        if (node == ldrNode)
+                            continue;
+
+                        Message msg = msgSupplier.get();
+
+                        io.sendToCustomTopic(node.cluster().localNode().id(), 
"tt", msg, GridIoPolicy.PUBLIC_POOL);
+                    }
+
+                    if (preloadCnt != null && preloadCnt.get() > 0)
+                        preloadCnt.decrementAndGet();
+
+                    if (maxIterationDelayMs > 0)
+                        
U.sleep(ThreadLocalRandom.current().nextLong(minIterationDelayMs, 
maxIterationDelayMs + 1));
+                }
+            }
+            catch (Throwable ignored) {
+                // No-op.
+                keepLoadingFlag.set(false);
+            }
+        }, Math.max(MIN_LOAD_THREADS, connsPerNode + connsPerNode / 2), 
"testLoader");
+    }
+
+    /** */
+    private IgniteInternalFuture<?> runLoad(
+        Ignite ldrNode,
+        AtomicBoolean keepLoadingFlag,
+        Supplier<Message> msgSupplier,
+        @Nullable AtomicLong preloadCnt
+    ) {
+        return runLoad(ldrNode, keepLoadingFlag, msgSupplier, preloadCnt, 1, 
3);
+    }
+
+    /** */
+    private static void dumpMetrics(Ignite ldr) {
+        if (!log.isInfoEnabled())
+            return;
+
+        GridMetricManager metricsMgr = ((IgniteEx)ldr).context().metric();
+
+        for (Ignite node : G.allGrids()) {
+            if (node == ldr)
+                continue;
+
+            MetricRegistryImpl mreg = 
metricsMgr.registry(nodeMetricsRegName(node.cluster().localNode().id()));
+
+            StringBuilder b = new StringBuilder()
+                .append("Pool metrics from node 
").append(ldr.cluster().localNode().order())
+                .append(" to node ").append(node.cluster().localNode().order())
+                .append(": ");
+
+            for (Metric m : mreg) {
+                b.append(System.lineSeparator()).append('\t');
+
+                b.append(m.name()).append(" = ").append(m.getAsString());
+            }
+
+            log.info(b.toString());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 120 * 1000;
+    }
+
+    /** */
+    public static class TestCommunicationMessagePluginProvider extends 
AbstractTestPluginProvider {
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return "TEST_PLUGIN";
+        }
+
+        /** {@inheritDoc} */
+        @Override public void initExtensions(PluginContext ctx, 
ExtensionRegistry registry) {
+            registry.registerExtension(MessageFactoryProvider.class, new 
MessageFactoryProvider() {
+                @Override public void registerAll(MessageFactory factory) {
+                    factory.register(TestMessage.DIRECT_TYPE, 
TestMessage::new);
+                }
+            });
+        }
+    }
+
+    /** */
+    private static class TestMessage extends GridTestMessage {
+        /** */
+        private final int writeDelay;
+
+        /** */
+        public TestMessage(int writeDelay) {
+            this.writeDelay = writeDelay;
+        }
+
+        /** */
+        public TestMessage() {
+            this(0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) 
{
+            if (writeDelay > 0) {
+                try {
+                    U.sleep(writeDelay);
+                }
+                catch (IgniteInterruptedCheckedException ignored) {
+                    // No-op.
+                }
+            }
+
+            return super.writeTo(buf, writer);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
index f2b6bafccdf..ee665ca8a60 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java
@@ -19,11 +19,13 @@ package org.apache.ignite.spi.communication.tcp;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServerListener;
@@ -31,6 +33,7 @@ import 
org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -136,6 +139,7 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest 
extends GridCommonAbstr
         CommunicationSpi commSpi = 
srcNode.configuration().getCommunicationSpi();
 
         ConcurrentMap<UUID, GridCommunicationClient[]> clients = 
GridTestUtils.getFieldValue(commSpi, "clientPool", "clients");
+        GridMetricManager metricMgr = GridTestUtils.getFieldValue(commSpi, 
"clientPool", "metricsMgr");
         ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs = 
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "recoveryDescs");
         ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs = 
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "outRecDescs");
         ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs = 
GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper", "inRecDescs");
@@ -160,6 +164,16 @@ public class TcpCommunicationSpiHalfOpenedConnectionTest 
extends GridCommonAbstr
         // uninformed and force ping old connection.
         GridCommunicationClient[] clients0 = 
clients.remove(targetNode.cluster().localNode().id());
 
+        if (metricMgr != null) {
+            Map<UUID, ?> metrics = GridTestUtils.getFieldValue(commSpi, 
"clientPool", "metrics");
+
+            assert metrics != null;
+
+            metrics.remove(targetNode.cluster().localNode().id());
+
+            
metricMgr.remove(ConnectionClientPool.nodeMetricsRegName(targetNode.cluster().localNode().id()));
+        }
+
         for (GridCommunicationClient commClient : clients0)
             
lsnr.onDisconnected(((GridTcpNioCommunicationClient)commClient).session(), new 
IOException("Test exception"));
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index e231898adf6..1029d060c2b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import 
org.apache.ignite.internal.util.nio.TcpCommunicationSpiSslVolatilePayloadTest;
 import org.apache.ignite.spi.communication.tcp.ClientExceptionsUtilsTest;
+import 
org.apache.ignite.spi.communication.tcp.CommunicationConnectionPoolMetricsTest;
 import org.apache.ignite.spi.communication.tcp.GridCacheDhtLockBackupSelfTest;
 import 
org.apache.ignite.spi.communication.tcp.GridSandboxedClientWithoutNetworkTest;
 import 
org.apache.ignite.spi.communication.tcp.GridTcpCommunicationInverseConnectionEstablishingTest;
@@ -100,6 +101,8 @@ import org.junit.runners.Suite;
 
     TcpCommunicationStatisticsTest.class,
 
+    CommunicationConnectionPoolMetricsTest.class,
+
     IgniteTcpCommunicationHandshakeWaitTest.class,
     IgniteTcpCommunicationHandshakeWaitSslTest.class,
     IgniteTcpCommunicationConnectOnInitTest.class,
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
index 0f65306eac6..b45b79ab2fa 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
@@ -774,11 +774,21 @@ class ZookeeperDiscoverySpiTestBase extends 
GridCommonAbstractTest {
                 delegate.release();
             }
 
+            /** {@inheritDoc} */
+            @Override public long creationTime() {
+                return delegate.creationTime();
+            }
+
             /** {@inheritDoc} */
             @Override public long getIdleTime() {
                 return delegate.getIdleTime();
             }
 
+            /** {@inheritDoc} */
+            @Override public int messagesQueueSize() {
+                return delegate.messagesQueueSize();
+            }
+
             /** {@inheritDoc} */
             @Override public void sendMessage(ByteBuffer data) throws 
IgniteCheckedException {
                 if (failure && !matrix.hasConnection(locNode, remoteNode))

Reply via email to