Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.X f1423806e -> 6b6bc6a36
  refs/heads/trunk 48591489d -> 3ea0579d8


Expose time spent waiting in thread pool queue

Patch by Dikang Gu; reviewed by T Jake Luciani for CASSANDRA-8398


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b6bc6a3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b6bc6a3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b6bc6a3

Branch: refs/heads/cassandra-3.X
Commit: 6b6bc6a36c623b8074f0fb27c656b8c26c27cd7e
Parents: f142380
Author: Dikang Gu <dikan...@gmail.com>
Authored: Wed Nov 30 13:00:49 2016 -0800
Committer: Dikang Gu <dikan...@gmail.com>
Committed: Tue Dec 6 15:02:31 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/metrics/MessagingMetrics.java     | 16 ++++++++++
 .../cassandra/net/MessageDeliveryTask.java      |  6 ++++
 .../org/apache/cassandra/tools/NodeProbe.java   | 14 +++++++++
 .../tools/nodetool/stats/TpStatsHolder.java     | 13 ++++++++
 .../tools/nodetool/stats/TpStatsPrinter.java    | 17 +++++++++--
 .../cassandra/net/MessagingServiceTest.java     | 31 ++++++++++++++++++++
 7 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bddd823..8b2bed7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.12
+ * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
  * Conditionally update index built status to avoid unnecessary flushes 
(CASSANDRA-12969)
  * NoReplicationTokenAllocator should work with zero replication factor 
(CASSANDRA-12983)
  * cqlsh auto completion: refactor definition of compaction strategy options 
(CASSANDRA-12946)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java 
b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
index e126c93..5f640b9 100644
--- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
@@ -38,11 +38,13 @@ public class MessagingMetrics
     private static final MetricNameFactory factory = new 
DefaultNameFactory("Messaging");
     public final Timer crossNodeLatency;
     public final ConcurrentHashMap<String, Timer> dcLatency;
+    public final ConcurrentHashMap<String, Timer> queueWaitLatency;
 
     public MessagingMetrics()
     {
         crossNodeLatency = 
Metrics.timer(factory.createMetricName("CrossNodeLatency"));
         dcLatency = new ConcurrentHashMap<>();
+        queueWaitLatency = new ConcurrentHashMap<>();
     }
 
     public void addTimeTaken(InetAddress from, long timeTaken)
@@ -56,4 +58,18 @@ public class MessagingMetrics
         timer.update(timeTaken, TimeUnit.MILLISECONDS);
         crossNodeLatency.update(timeTaken, TimeUnit.MILLISECONDS);
     }
+
+    public void addQueueWaitTime(String verb, long timeTaken)
+    {
+        if (timeTaken < 0)
+            // the measurement is not accurate, ignore the negative timeTaken
+            return;
+
+        Timer timer = queueWaitLatency.get(verb);
+        if (timer == null)
+        {
+            timer = queueWaitLatency.computeIfAbsent(verb, k -> 
Metrics.timer(factory.createMetricName(verb + "-WaitLatency")));
+        }
+        timer.update(timeTaken, TimeUnit.MILLISECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java 
b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index c91e9da..c7fc991 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
+import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.IndexNotAvailableException;
@@ -35,17 +36,22 @@ public class MessageDeliveryTask implements Runnable
 
     private final MessageIn message;
     private final int id;
+    private final long enqueueTime;
 
     public MessageDeliveryTask(MessageIn message, int id)
     {
         assert message != null;
         this.message = message;
         this.id = id;
+        this.enqueueTime = ApproximateTime.currentTimeMillis();
     }
 
     public void run()
     {
         MessagingService.Verb verb = message.verb;
+        MessagingService.instance().metrics.addQueueWaitTime(verb.toString(),
+                                                             
ApproximateTime.currentTimeMillis() - enqueueTime);
+
         long timeTaken = message.getLifetimeInMS();
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
             && timeTaken > message.getTimeout())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 8c01891..a48baf8 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1354,6 +1354,20 @@ public class NodeProbe implements AutoCloseable
         }
     }
 
+    public CassandraMetricsRegistry.JmxTimerMBean 
getMessagingQueueWaitMetrics(String verb)
+    {
+        try
+        {
+            return JMX.newMBeanProxy(mbeanServerConn,
+                                     new 
ObjectName("org.apache.cassandra.metrics:name=" + verb + 
"-WaitLatency,type=Messaging"),
+                                     
CassandraMetricsRegistry.JmxTimerMBean.class);
+        }
+        catch (MalformedObjectNameException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * Retrieve Proxy metrics
      * @param metricName CompletedTasks, PendingTasks, BytesCompacted or 
TotalCompactionsCompleted.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java 
b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
index d70b4dd..f3e91dc 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
@@ -39,6 +39,7 @@ public class TpStatsHolder implements StatsHolder
         HashMap<String, Object> result = new HashMap<>();
         HashMap<String, Map<String, Object>> threadPools = new HashMap<>();
         HashMap<String, Object> droppedMessage = new HashMap<>();
+        HashMap<String, double[]> waitLatencies = new HashMap<>();
 
         for (Map.Entry<String, String> tp : probe.getThreadPools().entries())
         {
@@ -53,8 +54,20 @@ public class TpStatsHolder implements StatsHolder
         result.put("ThreadPools", threadPools);
 
         for (Map.Entry<String, Integer> entry : 
probe.getDroppedMessages().entrySet())
+        {
             droppedMessage.put(entry.getKey(), entry.getValue());
+            try
+            {
+                waitLatencies.put(entry.getKey(), 
probe.metricPercentilesAsArray(probe.getMessagingQueueWaitMetrics(entry.getKey())));
+            }
+            catch (RuntimeException e)
+            {
+                // ignore the exceptions when fetching metrics
+            }
+        }
+
         result.put("DroppedMessage", droppedMessage);
+        result.put("WaitLatencies", waitLatencies);
 
         return result;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java 
b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
index b874746..45fc5ee 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
@@ -61,12 +61,25 @@ public class TpStatsPrinter
                            values.get("TotalBlockedTasks"));
             }
 
-            out.printf("%n%-20s%10s%n", "Message type", "Dropped");
+            out.printf("%n%-20s%10s%18s%18s%18s%18s%n", "Message type", 
"Dropped", "", "Latency waiting in queue (micros)", "", "");
+            out.printf("%-20s%10s%18s%18s%18s%18s%n", "", "", "50%", "95%", 
"99%", "Max");
 
             Map<Object, Object> droppedMessages = 
convertData.get("DroppedMessage") instanceof Map<?, ?> ? 
(Map)convertData.get("DroppedMessage") : Collections.emptyMap();
+            Map<Object, double[]> waitLatencies = 
convertData.get("WaitLatencies") instanceof Map<?, ?> ? 
(Map)convertData.get("WaitLatencies") : Collections.emptyMap();
             for (Map.Entry<Object, Object> entry : droppedMessages.entrySet())
             {
-                out.printf("%-20s%10s%n", entry.getKey(), entry.getValue());
+                out.printf("%-20s%10s", entry.getKey(), entry.getValue());
+                if (waitLatencies.containsKey(entry.getKey()))
+                {
+                    double[] latencies = waitLatencies.get(entry.getKey());
+                    out.printf("%18.2f%18.2f%18.2f%18.2f", latencies[0], 
latencies[2], latencies[4], latencies[6]);
+                }
+                else
+                {
+                    out.printf("%18s%18s%18s%18s", "N/A", "N/A", "N/A", "N/A");
+                }
+
+                out.printf("%n");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java 
b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 11d17b8..8f8e97e 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -129,6 +129,37 @@ public class MessagingServiceTest
     }
 
     @Test
+    public void testQueueWaitLatency() throws Exception
+    {
+        int latency = 100;
+        String verb = MessagingService.Verb.MUTATION.toString();
+
+        ConcurrentHashMap<String, Timer> queueWaitLatency = 
MessagingService.instance().metrics.queueWaitLatency;
+        queueWaitLatency.clear();
+
+        assertNull(queueWaitLatency.get(verb));
+        MessagingService.instance().metrics.addQueueWaitTime(verb, latency);
+        assertNotNull(queueWaitLatency.get(verb));
+        assertEquals(1, queueWaitLatency.get(verb).getCount());
+        long expectedBucket = 
bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, 
TimeUnit.MILLISECONDS.toNanos(latency))) - 1];
+        assertEquals(expectedBucket, 
queueWaitLatency.get(verb).getSnapshot().getMax());
+    }
+
+    @Test
+    public void testNegativeQueueWaitLatency() throws Exception
+    {
+        int latency = -100;
+        String verb = MessagingService.Verb.MUTATION.toString();
+
+        ConcurrentHashMap<String, Timer> queueWaitLatency = 
MessagingService.instance().metrics.queueWaitLatency;
+        queueWaitLatency.clear();
+
+        assertNull(queueWaitLatency.get(verb));
+        MessagingService.instance().metrics.addQueueWaitTime(verb, latency);
+        assertNull(queueWaitLatency.get(verb));
+    }
+
+    @Test
     public void 
testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws 
UnknownHostException
     {
         MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();

Reply via email to