Repository: ignite
Updated Branches:
  refs/heads/ignite-2.4 cf10a134b -> 6a8280e53


IGNITE-7866: Fixed TcpCommunicationSpi metrics performance. This closes #3593.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a8280e5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a8280e5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a8280e5

Branch: refs/heads/ignite-2.4
Commit: 6a8280e53d3f59ee6c6c359de20afee8f0e9e774
Parents: cf10a134
Author: devozerov <voze...@gridgain.com>
Authored: Fri Mar 2 13:47:10 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Fri Mar 2 13:47:10 2018 +0300

----------------------------------------------------------------------
 .../GridClientConnectionManagerAdapter.java     |   5 +
 .../impl/GridTcpRouterNioListenerAdapter.java   |   5 +
 .../ignite/internal/util/nio/GridNioServer.java |  48 ++-
 .../util/nio/GridNioServerListener.java         |   8 +
 .../util/nio/GridNioServerListenerAdapter.java  |   5 +
 .../tcp/TcpCommunicationMetricsListener.java    | 312 +++++++++++++++----
 .../communication/tcp/TcpCommunicationSpi.java  |  14 +-
 .../tcp/TcpCommunicationStatisticsTest.java     |  14 +-
 8 files changed, 329 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8280e5/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index aa06322..e16ce07 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -631,6 +631,11 @@ public abstract class GridClientConnectionManagerAdapter 
implements GridClientCo
         }
 
         /** {@inheritDoc} */
+        @Override public void onMessageSent(GridNioSession ses, Object msg) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
         @Override public void onMessage(GridNioSession ses, Object msg) {
             GridClientFutureAdapter<Boolean> handshakeFut =
                 ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8280e5/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
index 6bcea09..22f5152 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
@@ -111,6 +111,11 @@ public abstract class GridTcpRouterNioListenerAdapter 
implements GridNioServerLi
     }
 
     /** {@inheritDoc} */
+    @Override public void onMessageSent(GridNioSession ses, GridClientMessage 
msg) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("TypeMayBeWeakened")
     @Override public void onMessage(final GridNioSession ses, final 
GridClientMessage msg) {
         if (msg instanceof GridRouterRequest) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8280e5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 3a88507..e0ec8d1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -153,6 +153,9 @@ public class GridNioServer<T> {
     /** Filter chain to use. */
     private final GridNioFilterChain<T> filterChain;
 
+    /** Server listener. */
+    private final GridNioServerListener<T> lsnr;
+
     /** Logger. */
     @GridToStringExclude
     private final IgniteLogger log;
@@ -312,6 +315,7 @@ public class GridNioServer<T> {
         this.msgQueueLsnr = msgQueueLsnr;
         this.selectorSpins = selectorSpins;
         this.readWriteSelectorsAssign = readWriteSelectorsAssign;
+        this.lsnr = lsnr;
 
         filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), 
filters);
 
@@ -1381,8 +1385,12 @@ public class GridNioServer<T> {
 
                         finished = msg.writeTo(buf, writer);
 
-                        if (finished && writer != null)
-                            writer.reset();
+                        if (finished) {
+                            onMessageWritten(ses, msg);
+
+                            if (writer != null)
+                                writer.reset();
+                        }
                     }
 
                     // Fill up as many messages as possible to write buffer.
@@ -1406,8 +1414,12 @@ public class GridNioServer<T> {
 
                         finished = msg.writeTo(buf, writer);
 
-                        if (finished && writer != null)
-                            writer.reset();
+                        if (finished) {
+                            onMessageWritten(ses, msg);
+
+                            if (writer != null)
+                                writer.reset();
+                        }
                     }
 
                     int sesBufLimit = buf.limit();
@@ -1579,8 +1591,12 @@ public class GridNioServer<T> {
 
                 finished = msg.writeTo(buf, writer);
 
-                if (finished && writer != null)
-                    writer.reset();
+                if (finished) {
+                    onMessageWritten(ses, msg);
+
+                    if (writer != null)
+                        writer.reset();
+                }
             }
 
             // Fill up as many messages as possible to write buffer.
@@ -1604,8 +1620,12 @@ public class GridNioServer<T> {
 
                 finished = msg.writeTo(buf, writer);
 
-                if (finished && writer != null)
-                    writer.reset();
+                if (finished) {
+                    onMessageWritten(ses, msg);
+
+                    if (writer != null)
+                        writer.reset();
+                }
             }
 
             buf.flip();
@@ -1650,6 +1670,18 @@ public class GridNioServer<T> {
     }
 
     /**
+     * Handle message written event.
+     *
+     * @param ses Session.
+     * @param msg Message.
+     */
+    @SuppressWarnings("unchecked")
+    private void onMessageWritten(GridSelectorNioSessionImpl ses, Message msg) 
{
+        if (lsnr != null)
+            lsnr.onMessageSent(ses, (T)msg);
+    }
+
+    /**
      * Thread performing only read operations from the channel.
      */
     private abstract class AbstractNioClientWorker extends GridWorker 
implements GridNioWorker {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8280e5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
index 29e482a..db28792 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
@@ -40,6 +40,14 @@ public interface GridNioServerListener<T> {
     public void onDisconnected(GridNioSession ses, @Nullable Exception e);
 
     /**
+     * Handle message sent.
+     *
+     * @param ses Session.
+     * @param msg Message.
+     */
+    void onMessageSent(GridNioSession ses, T msg);
+
+    /**
      * This method is called whenever a {@link GridNioParser} returns non-null 
value.
      *
      * @param ses Session on which message was received.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8280e5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
index 2cc16f8..5d222c1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
@@ -30,4 +30,9 @@ public abstract class GridNioServerListenerAdapter<T> 
implements GridNioServerLi
     @Override public void onSessionIdleTimeout(GridNioSession ses) {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public void onMessageSent(GridNioSession ses, T msg) {
+        // No-op.
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8280e5/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index f9e35a5..838ee00 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -17,52 +17,56 @@
 
 package org.apache.ignite.spi.communication.tcp;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.jsr166.LongAdder8;
 
 /**
  * Statistics for {@link 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi}.
  */
 public class TcpCommunicationMetricsListener implements GridNioMetricsListener{
-    /** Received messages count. */
-    private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
-
-    /** Sent messages count.*/
-    private final LongAdder8 sentMsgsCnt = new LongAdder8();
+    /** Counter factory. */
+    private static final Callable<LongHolder> HOLDER_FACTORY = new 
Callable<LongHolder>() {
+        @Override public LongHolder call() {
+            return new LongHolder();
+        }
+    };
 
     /** Received bytes count. */
-    private final LongAdder8 rcvdBytesCnt = new LongAdder8();
+    private final LongAdder rcvdBytesCnt = new LongAdder();
 
     /** Sent bytes count.*/
-    private final LongAdder8 sentBytesCnt = new LongAdder8();
+    private final LongAdder sentBytesCnt = new LongAdder();
 
-    /** Counter factory. */
-    private static final Callable<LongAdder8> LONG_ADDER_FACTORY = new 
Callable<LongAdder8>() {
-        @Override public LongAdder8 call() {
-            return new LongAdder8();
-        }
-    };
+    /** All registered metrics. */
+    private final Set<ThreadMetrics> allMetrics = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    /** Thread-local metrics. */
+    private final ThreadLocal<ThreadMetrics> threadMetrics = new 
ThreadLocal<ThreadMetrics>() {
+        @Override protected ThreadMetrics initialValue() {
+            ThreadMetrics metrics = new ThreadMetrics();
 
-    /** Received messages count grouped by message type. */
-    private final ConcurrentMap<String, LongAdder8> rcvdMsgsCntByType = new 
ConcurrentHashMap<>();
+            allMetrics.add(metrics);
 
-    /** Received messages count grouped by sender. */
-    private final ConcurrentMap<UUID, LongAdder8> rcvdMsgsCntByNode = new 
ConcurrentHashMap<>();
+            return metrics;
+        }
+    };
 
-    /** Sent messages count grouped by message type. */
-    private final ConcurrentMap<String, LongAdder8> sentMsgsCntByType = new 
ConcurrentHashMap<>();
+    /** Method to synchronize access to message type map. */
+    private final Object msgTypMapMux = new Object();
 
-    /** Sent messages count grouped by receiver. */
-    private final ConcurrentMap<UUID, LongAdder8> sentMsgsCntByNode = new 
ConcurrentHashMap<>();
+    /** Message type map. */
+    private volatile Map<Short, String> msgTypMap;
 
     /** {@inheritDoc} */
     @Override public void onBytesSent(int bytesCnt) {
@@ -84,16 +88,15 @@ public class TcpCommunicationMetricsListener implements 
GridNioMetricsListener{
         assert msg != null;
         assert nodeId != null;
 
-        sentMsgsCnt.increment();
+        if (msg instanceof GridIoMessage) {
+            msg = ((GridIoMessage) msg).message();
 
-        if (msg instanceof GridIoMessage)
-            msg = ((GridIoMessage)msg).message();
+            updateMessageTypeMap(msg);
 
-        LongAdder8 cntByType = F.addIfAbsent(sentMsgsCntByType, 
msg.getClass().getSimpleName(), LONG_ADDER_FACTORY);
-        LongAdder8 cntByNode = F.addIfAbsent(sentMsgsCntByNode, nodeId, 
LONG_ADDER_FACTORY);
+            ThreadMetrics metrics = threadMetrics.get();
 
-        cntByType.increment();
-        cntByNode.increment();
+            metrics.onMessageSent(msg, nodeId);
+        }
     }
 
     /**
@@ -106,16 +109,15 @@ public class TcpCommunicationMetricsListener implements 
GridNioMetricsListener{
         assert msg != null;
         assert nodeId != null;
 
-        rcvdMsgsCnt.increment();
+        if (msg instanceof GridIoMessage) {
+            msg = ((GridIoMessage) msg).message();
 
-        if (msg instanceof GridIoMessage)
-            msg = ((GridIoMessage)msg).message();
+            updateMessageTypeMap(msg);
 
-        LongAdder8 cntByType = F.addIfAbsent(rcvdMsgsCntByType, 
msg.getClass().getSimpleName(), LONG_ADDER_FACTORY);
-        LongAdder8 cntByNode = F.addIfAbsent(rcvdMsgsCntByNode, nodeId, 
LONG_ADDER_FACTORY);
+            ThreadMetrics metrics = threadMetrics.get();
 
-        cntByType.increment();
-        cntByNode.increment();
+            metrics.onMessageReceived(msg, nodeId);
+        }
     }
 
     /**
@@ -124,7 +126,17 @@ public class TcpCommunicationMetricsListener implements 
GridNioMetricsListener{
      * @return Sent messages count.
      */
     public int sentMessagesCount() {
-        return sentMsgsCnt.intValue();
+        long res = 0;
+
+        for (ThreadMetrics metrics : allMetrics)
+            res += metrics.sentMsgsCnt;
+
+        int res0 = (int)res;
+
+        if (res0 < 0)
+            res0 = Integer.MAX_VALUE;
+
+        return res0;
     }
 
     /**
@@ -142,7 +154,17 @@ public class TcpCommunicationMetricsListener implements 
GridNioMetricsListener{
      * @return Received messages count.
      */
     public int receivedMessagesCount() {
-        return rcvdMsgsCnt.intValue();
+        long res = 0;
+
+        for (ThreadMetrics metrics : allMetrics)
+            res += metrics.rcvdMsgsCnt;
+
+        int res0 = (int)res;
+
+        if (res0 < 0)
+            res0 = Integer.MAX_VALUE;
+
+        return res0;
     }
 
     /**
@@ -155,27 +177,40 @@ public class TcpCommunicationMetricsListener implements 
GridNioMetricsListener{
     }
 
     /**
-     * Converts statistics from internal representation to JMX-readable format.
+     * Gets received messages counts (grouped by type).
      *
-     * @param srcStat Internal statistics representation.
-     * @return Result map.
+     * @return Map containing message types and respective counts.
      */
-    private <T> Map<T, Long> convertStatistics(Map<T, LongAdder8> srcStat) {
-        Map<T, Long> destStat = U.newHashMap(srcStat.size());
+    public Map<String, Long> receivedMessagesByType() {
+        Map<Short, Long> res = new HashMap<>();
 
-        for (Map.Entry<T, LongAdder8> entry : srcStat.entrySet())
-            destStat.put(entry.getKey(), entry.getValue().longValue());
+        for (ThreadMetrics metrics : allMetrics)
+            addMetrics(res, metrics.rcvdMsgsCntByType);
 
-        return destStat;
+        return convertMessageTypes(res);
     }
 
     /**
-     * Gets received messages counts (grouped by type).
+     * Convert message types.
      *
-     * @return Map containing message types and respective counts.
+     * @param input Input map.
+     * @return Result map.
      */
-    public Map<String, Long> receivedMessagesByType() {
-        return convertStatistics(rcvdMsgsCntByType);
+    private Map<String, Long> convertMessageTypes(Map<Short, Long> input) {
+        Map<String, Long> res = new HashMap<>(input.size());
+
+        Map<Short, String> msgTypMap0 = msgTypMap;
+
+        if (msgTypMap0 != null) {
+            for (Map.Entry<Short, Long> inputEntry : input.entrySet()) {
+                String typeName = msgTypMap0.get(inputEntry.getKey());
+
+                if (typeName != null)
+                    res.put(typeName, inputEntry.getValue());
+            }
+        }
+
+        return res;
     }
 
     /**
@@ -184,7 +219,12 @@ public class TcpCommunicationMetricsListener implements 
GridNioMetricsListener{
      * @return Map containing sender nodes and respective counts.
      */
     public Map<UUID, Long> receivedMessagesByNode() {
-        return convertStatistics(rcvdMsgsCntByNode);
+        Map<UUID, Long> res = new HashMap<>();
+
+        for (ThreadMetrics metrics : allMetrics)
+            addMetrics(res, metrics.rcvdMsgsCntByNode);
+
+        return res;
     }
 
     /**
@@ -193,7 +233,12 @@ public class TcpCommunicationMetricsListener implements 
GridNioMetricsListener{
      * @return Map containing message types and respective counts.
      */
     public Map<String, Long> sentMessagesByType() {
-        return convertStatistics(sentMsgsCntByType);
+        Map<Short, Long> res = new HashMap<>();
+
+        for (ThreadMetrics metrics : allMetrics)
+            addMetrics(res, metrics.sentMsgsCntByType);
+
+        return convertMessageTypes(res);
     }
 
     /**
@@ -202,23 +247,158 @@ public class TcpCommunicationMetricsListener implements 
GridNioMetricsListener{
      * @return Map containing receiver nodes and respective counts.
      */
     public Map<UUID, Long> sentMessagesByNode() {
-        return convertStatistics(sentMsgsCntByNode);
+        Map<UUID, Long> res = new HashMap<>();
+
+        for (ThreadMetrics metrics : allMetrics)
+            addMetrics(res, metrics.sentMsgsCntByNode);
+
+        return res;
     }
 
     /**
      * Resets metrics for this instance.
      */
     public void resetMetrics() {
-        // Can't use 'reset' method because it is not thread-safe
-        // according to javadoc.
-        sentMsgsCnt.add(-sentMsgsCnt.sum());
-        rcvdMsgsCnt.add(-rcvdMsgsCnt.sum());
-        sentBytesCnt.add(-sentBytesCnt.sum());
-        rcvdBytesCnt.add(-rcvdBytesCnt.sum());
-
-        sentMsgsCntByType.clear();
-        rcvdMsgsCntByType.clear();
-        sentMsgsCntByNode.clear();
-        rcvdMsgsCntByNode.clear();
+        for (ThreadMetrics metrics : allMetrics)
+            metrics.reset();
+    }
+
+    /**
+     * Add single metrics to the total.
+     *
+     * @param total Total.
+     * @param current Current metrics.
+     */
+    private <T> void addMetrics(Map<T, Long> total, Map<T, LongHolder> 
current) {
+        for (Map.Entry<T, LongHolder> entry : current.entrySet()) {
+            T key = entry.getKey();
+            long val = entry.getValue().val;
+
+            Long prevVal = total.get(key);
+
+            total.put(key, prevVal == null ? val : prevVal + val);
+        }
+    }
+
+    /**
+     * Update message type map.
+     *
+     * @param msg Message.
+     */
+    private void updateMessageTypeMap(Message msg) {
+        short typeId = msg.directType();
+
+        Map<Short, String> msgTypMap0 = msgTypMap;
+
+        if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
+            synchronized (msgTypMapMux) {
+                if (msgTypMap == null) {
+                    msgTypMap0 = new HashMap<>();
+
+                    msgTypMap0.put(typeId, msg.getClass().getName());
+
+                    msgTypMap = msgTypMap0;
+                }
+                else {
+                    if (!msgTypMap.containsKey(typeId)) {
+                        msgTypMap0 = new HashMap<>(msgTypMap);
+
+                        msgTypMap0.put(typeId, msg.getClass().getName());
+
+                        msgTypMap = msgTypMap0;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Long value holder.
+     */
+    private static class LongHolder {
+        /** Value. */
+        private long val;
+
+        /**
+         * Increment value.
+         */
+        private void increment() {
+            val++;
+        }
+    }
+
+    /**
+     * Thread-local metrics.
+     */
+    private static class ThreadMetrics {
+        /** Received messages count. */
+        private long rcvdMsgsCnt;
+
+        /** Sent messages count.*/
+        private long sentMsgsCnt;
+
+        /** Received messages count grouped by message type. */
+        private final HashMap<Short, LongHolder> rcvdMsgsCntByType = new 
HashMap<>();
+
+        /** Received messages count grouped by sender. */
+        private final HashMap<UUID, LongHolder> rcvdMsgsCntByNode = new 
HashMap<>();
+
+        /** Sent messages count grouped by message type. */
+        private final HashMap<Short, LongHolder> sentMsgsCntByType = new 
HashMap<>();
+
+        /** Sent messages count grouped by receiver. */
+        private final HashMap<UUID, LongHolder> sentMsgsCntByNode = new 
HashMap<>();
+
+        /**
+         * Collects statistics for message sent by SPI.
+         *
+         * @param msg Sent message.
+         * @param nodeId Receiver node id.
+         */
+        private void onMessageSent(Message msg, UUID nodeId) {
+            sentMsgsCnt++;
+
+            LongHolder cntByType = F.addIfAbsent(sentMsgsCntByType, 
msg.directType(), HOLDER_FACTORY);
+            LongHolder cntByNode = F.addIfAbsent(sentMsgsCntByNode, nodeId, 
HOLDER_FACTORY);
+
+            assert cntByType != null;
+            assert cntByNode != null;
+
+            cntByType.increment();
+            cntByNode.increment();
+        }
+
+        /**
+         * Collects statistics for message received by SPI.
+         *
+         * @param msg Received message.
+         * @param nodeId Sender node id.
+         */
+        private void onMessageReceived(Message msg, UUID nodeId) {
+            rcvdMsgsCnt++;
+
+            LongHolder cntByType = F.addIfAbsent(rcvdMsgsCntByType, 
msg.directType(), HOLDER_FACTORY);
+            LongHolder cntByNode = F.addIfAbsent(rcvdMsgsCntByNode, nodeId, 
HOLDER_FACTORY);
+
+            assert cntByType != null;
+            assert cntByNode != null;
+
+            cntByType.increment();
+            cntByNode.increment();
+        }
+
+        /**
+         * Reset metrics.
+         */
+        private void reset() {
+            rcvdMsgsCnt = 0;
+            sentMsgsCnt = 0;
+
+            sentMsgsCntByType.clear();
+            sentMsgsCntByNode.clear();
+
+            rcvdMsgsCntByType.clear();
+            rcvdMsgsCntByNode.clear();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8280e5/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 3c5b5e9..27f5e3e 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -671,6 +671,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                 }
             }
 
+            @Override public void onMessageSent(GridNioSession ses, Message 
msg) {
+                ConnectionKey connKey = ses.meta(CONN_IDX_META);
+
+                if (connKey != null) {
+                    UUID nodeId = connKey.nodeId();
+
+                    metricsLsnr.onMessageSent(msg, nodeId);
+                }
+            }
+
             @Override public void onMessage(final GridNioSession ses, Message 
msg) {
                 ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
@@ -2619,9 +2629,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
 
                     client.release();
 
-                    if (!retry)
-                        metricsLsnr.onMessageSent(msg, node.id());
-                    else {
+                    if (retry) {
                         removeNodeClient(node.id(), client);
 
                         ClusterNode node0 = getSpiContext().node(node.id());

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8280e5/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
index e06a4bf..377d1eb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -30,7 +30,10 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
@@ -88,7 +91,7 @@ public class TcpCommunicationStatisticsTest extends 
GridCommonAbstractTest {
         @Override protected void notifyListener(UUID sndId, Message msg, 
IgniteRunnable msgC) {
             super.notifyListener(sndId, msg, msgC);
 
-            if (msg instanceof GridTestMessage)
+            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() 
instanceof GridTestMessage)
                 latch.countDown();
         }
     }
@@ -130,13 +133,14 @@ public class TcpCommunicationStatisticsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    @SuppressWarnings("ConstantConditions")
     public void testStatistics() throws Exception {
         startGrids(2);
 
         try {
             // Send custom message from node0 to node1.
-            
grid(0).configuration().getCommunicationSpi().sendMessage(grid(1).cluster().localNode(),
-                new GridTestMessage());
+            
grid(0).context().io().sendToGridTopic(grid(1).cluster().localNode(), 
GridTopic.TOPIC_IO_TEST, new GridTestMessage(), GridIoPolicy.PUBLIC_POOL);
+
 
             latch.await(10, TimeUnit.SECONDS);
 
@@ -180,8 +184,8 @@ public class TcpCommunicationStatisticsTest extends 
GridCommonAbstractTest {
                 // Node1 sent exactly the same types and count of messages as 
node0 received.
                 assertEquals(msgsSentByType1, msgsReceivedByType0);
 
-                assertEquals(1, 
msgsSentByType0.get(GridTestMessage.class.getSimpleName()).longValue());
-                assertEquals(1, 
msgsReceivedByType1.get(GridTestMessage.class.getSimpleName()).longValue());
+                assertEquals(1, 
msgsSentByType0.get(GridTestMessage.class.getName()).longValue());
+                assertEquals(1, 
msgsReceivedByType1.get(GridTestMessage.class.getName()).longValue());
             }
         }
         finally {

Reply via email to