Repository: cassandra Updated Branches: refs/heads/cassandra-3.X f1423806e -> 6b6bc6a36 refs/heads/trunk 48591489d -> 3ea0579d8
Expose time spent waiting in thread pool queue Patch by Dikang Gu; reviewed by T Jake Luciani for CASSANDRA-8398 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b6bc6a3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b6bc6a3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b6bc6a3 Branch: refs/heads/cassandra-3.X Commit: 6b6bc6a36c623b8074f0fb27c656b8c26c27cd7e Parents: f142380 Author: Dikang Gu <dikan...@gmail.com> Authored: Wed Nov 30 13:00:49 2016 -0800 Committer: Dikang Gu <dikan...@gmail.com> Committed: Tue Dec 6 15:02:31 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/metrics/MessagingMetrics.java | 16 ++++++++++ .../cassandra/net/MessageDeliveryTask.java | 6 ++++ .../org/apache/cassandra/tools/NodeProbe.java | 14 +++++++++ .../tools/nodetool/stats/TpStatsHolder.java | 13 ++++++++ .../tools/nodetool/stats/TpStatsPrinter.java | 17 +++++++++-- .../cassandra/net/MessagingServiceTest.java | 31 ++++++++++++++++++++ 7 files changed, 96 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bddd823..8b2bed7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.12 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398) * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969) * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983) * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/metrics/MessagingMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java index e126c93..5f640b9 100644 --- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java @@ -38,11 +38,13 @@ public class MessagingMetrics private static final MetricNameFactory factory = new DefaultNameFactory("Messaging"); public final Timer crossNodeLatency; public final ConcurrentHashMap<String, Timer> dcLatency; + public final ConcurrentHashMap<String, Timer> queueWaitLatency; public MessagingMetrics() { crossNodeLatency = Metrics.timer(factory.createMetricName("CrossNodeLatency")); dcLatency = new ConcurrentHashMap<>(); + queueWaitLatency = new ConcurrentHashMap<>(); } public void addTimeTaken(InetAddress from, long timeTaken) @@ -56,4 +58,18 @@ public class MessagingMetrics timer.update(timeTaken, TimeUnit.MILLISECONDS); crossNodeLatency.update(timeTaken, TimeUnit.MILLISECONDS); } + + public void addQueueWaitTime(String verb, long timeTaken) + { + if (timeTaken < 0) + // the measurement is not accurate, ignore the negative timeTaken + return; + + Timer timer = queueWaitLatency.get(verb); + if (timer == null) + { + timer = queueWaitLatency.computeIfAbsent(verb, k -> Metrics.timer(factory.createMetricName(verb + "-WaitLatency"))); + } + timer.update(timeTaken, TimeUnit.MILLISECONDS); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index c91e9da..c7fc991 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; +import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.index.IndexNotAvailableException; @@ -35,17 +36,22 @@ public class MessageDeliveryTask implements Runnable private final MessageIn message; private final int id; + private final long enqueueTime; public MessageDeliveryTask(MessageIn message, int id) { assert message != null; this.message = message; this.id = id; + this.enqueueTime = ApproximateTime.currentTimeMillis(); } public void run() { MessagingService.Verb verb = message.verb; + MessagingService.instance().metrics.addQueueWaitTime(verb.toString(), + ApproximateTime.currentTimeMillis() - enqueueTime); + long timeTaken = message.getLifetimeInMS(); if (MessagingService.DROPPABLE_VERBS.contains(verb) && timeTaken > message.getTimeout()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 8c01891..a48baf8 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1354,6 +1354,20 @@ public class NodeProbe implements AutoCloseable } } + public CassandraMetricsRegistry.JmxTimerMBean getMessagingQueueWaitMetrics(String verb) + { + try + { + return JMX.newMBeanProxy(mbeanServerConn, + new ObjectName("org.apache.cassandra.metrics:name=" + verb + "-WaitLatency,type=Messaging"), + CassandraMetricsRegistry.JmxTimerMBean.class); + } + catch (MalformedObjectNameException e) + { + throw new RuntimeException(e); + } + } + /** * Retrieve Proxy metrics * @param metricName CompletedTasks, PendingTasks, BytesCompacted or TotalCompactionsCompleted. http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java index d70b4dd..f3e91dc 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java @@ -39,6 +39,7 @@ public class TpStatsHolder implements StatsHolder HashMap<String, Object> result = new HashMap<>(); HashMap<String, Map<String, Object>> threadPools = new HashMap<>(); HashMap<String, Object> droppedMessage = new HashMap<>(); + HashMap<String, double[]> waitLatencies = new HashMap<>(); for (Map.Entry<String, String> tp : probe.getThreadPools().entries()) { @@ -53,8 +54,20 @@ public class TpStatsHolder implements StatsHolder result.put("ThreadPools", threadPools); for (Map.Entry<String, Integer> entry : probe.getDroppedMessages().entrySet()) + { droppedMessage.put(entry.getKey(), entry.getValue()); + try + { + waitLatencies.put(entry.getKey(), probe.metricPercentilesAsArray(probe.getMessagingQueueWaitMetrics(entry.getKey()))); + } + catch (RuntimeException e) + { + // ignore the exceptions when fetching metrics + } + } + result.put("DroppedMessage", droppedMessage); + result.put("WaitLatencies", waitLatencies); return result; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java index b874746..45fc5ee 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java @@ -61,12 +61,25 @@ public class TpStatsPrinter values.get("TotalBlockedTasks")); } - out.printf("%n%-20s%10s%n", "Message type", "Dropped"); + out.printf("%n%-20s%10s%18s%18s%18s%18s%n", "Message type", "Dropped", "", "Latency waiting in queue (micros)", "", ""); + out.printf("%-20s%10s%18s%18s%18s%18s%n", "", "", "50%", "95%", "99%", "Max"); Map<Object, Object> droppedMessages = convertData.get("DroppedMessage") instanceof Map<?, ?> ? (Map)convertData.get("DroppedMessage") : Collections.emptyMap(); + Map<Object, double[]> waitLatencies = convertData.get("WaitLatencies") instanceof Map<?, ?> ? (Map)convertData.get("WaitLatencies") : Collections.emptyMap(); for (Map.Entry<Object, Object> entry : droppedMessages.entrySet()) { - out.printf("%-20s%10s%n", entry.getKey(), entry.getValue()); + out.printf("%-20s%10s", entry.getKey(), entry.getValue()); + if (waitLatencies.containsKey(entry.getKey())) + { + double[] latencies = waitLatencies.get(entry.getKey()); + out.printf("%18.2f%18.2f%18.2f%18.2f", latencies[0], latencies[2], latencies[4], latencies[6]); + } + else + { + out.printf("%18s%18s%18s%18s", "N/A", "N/A", "N/A", "N/A"); + } + + out.printf("%n"); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index 11d17b8..8f8e97e 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -129,6 +129,37 @@ public class MessagingServiceTest } @Test + public void testQueueWaitLatency() throws Exception + { + int latency = 100; + String verb = MessagingService.Verb.MUTATION.toString(); + + ConcurrentHashMap<String, Timer> queueWaitLatency = MessagingService.instance().metrics.queueWaitLatency; + queueWaitLatency.clear(); + + assertNull(queueWaitLatency.get(verb)); + MessagingService.instance().metrics.addQueueWaitTime(verb, latency); + assertNotNull(queueWaitLatency.get(verb)); + assertEquals(1, queueWaitLatency.get(verb).getCount()); + long expectedBucket = bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, TimeUnit.MILLISECONDS.toNanos(latency))) - 1]; + assertEquals(expectedBucket, queueWaitLatency.get(verb).getSnapshot().getMax()); + } + + @Test + public void testNegativeQueueWaitLatency() throws Exception + { + int latency = -100; + String verb = MessagingService.Verb.MUTATION.toString(); + + ConcurrentHashMap<String, Timer> queueWaitLatency = MessagingService.instance().metrics.queueWaitLatency; + queueWaitLatency.clear(); + + assertNull(queueWaitLatency.get(verb)); + MessagingService.instance().metrics.addQueueWaitTime(verb, latency); + assertNull(queueWaitLatency.get(verb)); + } + + @Test public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException { MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();