Repository: cassandra
Updated Branches:
  refs/heads/trunk e31e21623 -> 04afa2bf5


Add cross-DC latency metrics

Patch by Chris Lohfink, reviewed by Carl Yeksigian for CASSANDRA-11596


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/04afa2bf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/04afa2bf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/04afa2bf

Branch: refs/heads/trunk
Commit: 04afa2bf52ce6f5a534323678defd625dca67336
Parents: e31e216
Author: Chris Lohfink <chris.lohf...@datastax.com>
Authored: Wed May 18 16:00:04 2016 -0500
Committer: Carl Yeksigian <c...@apache.org>
Committed: Thu Jun 16 10:49:24 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/metrics/MessagingMetrics.java     | 59 +++++++++++++++++++
 .../cassandra/net/IncomingTcpConnection.java    |  2 +-
 .../org/apache/cassandra/net/MessageIn.java     |  9 ++-
 .../apache/cassandra/net/MessagingService.java  |  3 +
 .../cassandra/net/MessagingServiceTest.java     | 62 +++++++++++++++++++-
 6 files changed, 131 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c44a63..08b5e4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.8
+ * Add cross-DC latency metrics (CASSANDRA-11596)
  * Allow terms in selection clause (CASSANDRA-10783)
  * Add bind variables to trace (CASSANDRA-11719)
  * Switch counter shards' clock to timestamps (CASSANDRA-9811)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java 
b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
new file mode 100644
index 0000000..e126c93
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.net.InetAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Timer;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+/**
+ * Metrics for messages
+ */
+public class MessagingMetrics
+{
+    private static Logger logger = 
LoggerFactory.getLogger(MessagingMetrics.class);
+    private static final MetricNameFactory factory = new 
DefaultNameFactory("Messaging");
+    public final Timer crossNodeLatency;
+    public final ConcurrentHashMap<String, Timer> dcLatency;
+
+    public MessagingMetrics()
+    {
+        crossNodeLatency = 
Metrics.timer(factory.createMetricName("CrossNodeLatency"));
+        dcLatency = new ConcurrentHashMap<>();
+    }
+
+    public void addTimeTaken(InetAddress from, long timeTaken)
+    {
+        String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(from);
+        Timer timer = dcLatency.get(dc);
+        if (timer == null)
+        {
+            timer = dcLatency.computeIfAbsent(dc, k -> 
Metrics.timer(factory.createMetricName(dc + "-Latency")));
+        }
+        timer.update(timeTaken, TimeUnit.MILLISECONDS);
+        crossNodeLatency.update(timeTaken, TimeUnit.MILLISECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 2a09bf4..9e8e2e1 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -187,7 +187,7 @@ public class IncomingTcpConnection extends Thread 
implements Closeable
         else
             id = input.readInt();
 
-        MessageIn message = MessageIn.read(input, version, id, 
MessageIn.readTimestamp(input));
+        MessageIn message = MessageIn.read(input, version, id, 
MessageIn.readTimestamp(from, input, System.currentTimeMillis()));
         if (message == null)
         {
             // callback expired; nothing to do

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java 
b/src/java/org/apache/cassandra/net/MessageIn.java
index c6e4d89..014ee93 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -120,14 +120,17 @@ public class MessageIn<T>
         return new ConstructionTime();
     }
 
-    public static ConstructionTime readTimestamp(DataInputPlus input) throws 
IOException
+    public static ConstructionTime readTimestamp(InetAddress from, 
DataInputPlus input, long timestamp) throws IOException
     {
         // make sure to readInt, even if cross_node_to is not enabled
         int partial = input.readInt();
+        long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | 
(((partial & 0xFFFFFFFFL) << 2) >> 2);
+        if (timestamp > crossNodeTimestamp)
+        {
+            MessagingService.instance().metrics.addTimeTaken(from, timestamp - 
crossNodeTimestamp);
+        }
         if(DatabaseDescriptor.hasCrossNodeTimeout())
         {
-            long timestamp = System.currentTimeMillis();
-            long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | 
(((partial & 0xFFFFFFFFL) << 2) >> 2);
             return new ConstructionTime(crossNodeTimestamp, timestamp != 
crossNodeTimestamp);
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index f930436..3828bad 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -67,6 +67,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
+import org.apache.cassandra.metrics.MessagingMetrics;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
@@ -101,6 +102,8 @@ public final class MessagingService implements 
MessagingServiceMBean
     private boolean allNodesAtLeast22 = true;
     private boolean allNodesAtLeast30 = true;
 
+    public final MessagingMetrics metrics = new MessagingMetrics();
+
     /* All verb handler identifiers */
     public enum Verb
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java 
b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 909bd79..ef51f30 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -20,15 +20,29 @@
  */
 package org.apache.cassandra.net;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
+import com.codahale.metrics.Timer;
+
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.caffinitas.ohc.histo.EstimatedHistogram;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 public class MessagingServiceTest
 {
     private final MessagingService messagingService = MessagingService.test();
+    private final static long[] bucketOffsets = new 
EstimatedHistogram(160).getBucketOffsets();
 
     @Test
     public void testDroppedMessages()
@@ -54,4 +68,50 @@ public class MessagingServiceTest
         assertEquals(7500, 
(int)messagingService.getDroppedMessages().get(verb.toString()));
     }
 
+    private static void addDCLatency(long sentAt, long now) throws IOException
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
+        {
+            out.writeInt((int) sentAt);
+        }
+        DataInputStreamPlus in = new DataInputStreamPlus(new 
ByteArrayInputStream(baos.toByteArray()));
+        MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now);
+    }
+
+    @Test
+    public void testDCLatency() throws Exception
+    {
+        int latency = 100;
+
+        ConcurrentHashMap<String, Timer> dcLatency = 
MessagingService.instance().metrics.dcLatency;
+        dcLatency.clear();
+
+        long now = System.currentTimeMillis();
+        long sentAt = now - latency;
+
+        assertNull(dcLatency.get("datacenter1"));
+        addDCLatency(sentAt, now);
+        assertNotNull(dcLatency.get("datacenter1"));
+        assertEquals(1, dcLatency.get("datacenter1").getCount());
+        long expectedBucket = 
bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, 
TimeUnit.MILLISECONDS.toNanos(latency))) - 1];
+        assertEquals(expectedBucket, 
dcLatency.get("datacenter1").getSnapshot().getMax());
+    }
+
+    @Test
+    public void testNegativeDCLatency() throws Exception
+    {
+        // if clocks are off should just not track anything
+        int latency = -100;
+
+        ConcurrentHashMap<String, Timer> dcLatency = 
MessagingService.instance().metrics.dcLatency;
+        dcLatency.clear();
+
+        long now = System.currentTimeMillis();
+        long sentAt = now - latency;
+
+        assertNull(dcLatency.get("datacenter1"));
+        addDCLatency(sentAt, now);
+        assertNull(dcLatency.get("datacenter1"));
+    }
 }

Reply via email to