Repository: cassandra Updated Branches: refs/heads/trunk e31e21623 -> 04afa2bf5
Add cross-DC latency metrics Patch by Chris Lohfink, reviewed by Carl Yeksigian for CASSANDRA-11596 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/04afa2bf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/04afa2bf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/04afa2bf Branch: refs/heads/trunk Commit: 04afa2bf52ce6f5a534323678defd625dca67336 Parents: e31e216 Author: Chris Lohfink <chris.lohf...@datastax.com> Authored: Wed May 18 16:00:04 2016 -0500 Committer: Carl Yeksigian <c...@apache.org> Committed: Thu Jun 16 10:49:24 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/metrics/MessagingMetrics.java | 59 +++++++++++++++++++ .../cassandra/net/IncomingTcpConnection.java | 2 +- .../org/apache/cassandra/net/MessageIn.java | 9 ++- .../apache/cassandra/net/MessagingService.java | 3 + .../cassandra/net/MessagingServiceTest.java | 62 +++++++++++++++++++- 6 files changed, 131 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9c44a63..08b5e4a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.8 + * Add cross-DC latency metrics (CASSANDRA-11596) * Allow terms in selection clause (CASSANDRA-10783) * Add bind variables to trace (CASSANDRA-11719) * Switch counter shards' clock to timestamps (CASSANDRA-9811) http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/src/java/org/apache/cassandra/metrics/MessagingMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java new file mode 100644 index 0000000..e126c93 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + +import java.net.InetAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Timer; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +/** + * Metrics for messages + */ +public class MessagingMetrics +{ + private static Logger logger = LoggerFactory.getLogger(MessagingMetrics.class); + private static final MetricNameFactory factory = new DefaultNameFactory("Messaging"); + public final Timer crossNodeLatency; + public final ConcurrentHashMap<String, Timer> dcLatency; + + public MessagingMetrics() + { + crossNodeLatency = Metrics.timer(factory.createMetricName("CrossNodeLatency")); + dcLatency = new ConcurrentHashMap<>(); + } + + public void addTimeTaken(InetAddress from, long timeTaken) + { + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(from); + Timer timer = dcLatency.get(dc); + if (timer == null) + { + timer = dcLatency.computeIfAbsent(dc, k -> Metrics.timer(factory.createMetricName(dc + "-Latency"))); + } + timer.update(timeTaken, TimeUnit.MILLISECONDS); + crossNodeLatency.update(timeTaken, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 2a09bf4..9e8e2e1 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -187,7 +187,7 @@ public class IncomingTcpConnection extends Thread implements Closeable else id = input.readInt(); - MessageIn message = MessageIn.read(input, version, id, MessageIn.readTimestamp(input)); + MessageIn message = MessageIn.read(input, version, id, MessageIn.readTimestamp(from, input, System.currentTimeMillis())); if (message == null) { // callback expired; nothing to do http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index c6e4d89..014ee93 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -120,14 +120,17 @@ public class MessageIn<T> return new ConstructionTime(); } - public static ConstructionTime readTimestamp(DataInputPlus input) throws IOException + public static ConstructionTime readTimestamp(InetAddress from, DataInputPlus input, long timestamp) throws IOException { // make sure to readInt, even if cross_node_to is not enabled int partial = input.readInt(); + long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); + if (timestamp > crossNodeTimestamp) + { + MessagingService.instance().metrics.addTimeTaken(from, timestamp - crossNodeTimestamp); + } if(DatabaseDescriptor.hasCrossNodeTimeout()) { - long timestamp = System.currentTimeMillis(); - long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); return new ConstructionTime(crossNodeTimestamp, timestamp != crossNodeTimestamp); } else http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index f930436..3828bad 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -67,6 +67,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.ILatencySubscriber; import org.apache.cassandra.metrics.ConnectionMetrics; import org.apache.cassandra.metrics.DroppedMessageMetrics; +import org.apache.cassandra.metrics.MessagingMetrics; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.*; @@ -101,6 +102,8 @@ public final class MessagingService implements MessagingServiceMBean private boolean allNodesAtLeast22 = true; private boolean allNodesAtLeast30 = true; + public final MessagingMetrics metrics = new MessagingMetrics(); + /* All verb handler identifiers */ public enum Verb { http://git-wip-us.apache.org/repos/asf/cassandra/blob/04afa2bf/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index 909bd79..ef51f30 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -20,15 +20,29 @@ */ package org.apache.cassandra.net; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import com.codahale.metrics.Timer; + +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; +import org.caffinitas.ohc.histo.EstimatedHistogram; import org.junit.Test; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; public class MessagingServiceTest { private final MessagingService messagingService = MessagingService.test(); + private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets(); @Test public void testDroppedMessages() @@ -54,4 +68,50 @@ public class MessagingServiceTest assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString())); } + private static void addDCLatency(long sentAt, long now) throws IOException + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos)) + { + out.writeInt((int) sentAt); + } + DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray())); + MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now); + } + + @Test + public void testDCLatency() throws Exception + { + int latency = 100; + + ConcurrentHashMap<String, Timer> dcLatency = MessagingService.instance().metrics.dcLatency; + dcLatency.clear(); + + long now = System.currentTimeMillis(); + long sentAt = now - latency; + + assertNull(dcLatency.get("datacenter1")); + addDCLatency(sentAt, now); + assertNotNull(dcLatency.get("datacenter1")); + assertEquals(1, dcLatency.get("datacenter1").getCount()); + long expectedBucket = bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, TimeUnit.MILLISECONDS.toNanos(latency))) - 1]; + assertEquals(expectedBucket, dcLatency.get("datacenter1").getSnapshot().getMax()); + } + + @Test + public void testNegativeDCLatency() throws Exception + { + // if clocks are off should just not track anything + int latency = -100; + + ConcurrentHashMap<String, Timer> dcLatency = MessagingService.instance().metrics.dcLatency; + dcLatency.clear(); + + long now = System.currentTimeMillis(); + long sentAt = now - latency; + + assertNull(dcLatency.get("datacenter1")); + addDCLatency(sentAt, now); + assertNull(dcLatency.get("datacenter1")); + } }