Repository: cassandra Updated Branches: refs/heads/cassandra-3.X d2d7299ee -> 9d9a1a122 refs/heads/trunk 8fcedf544 -> 22a3db065
Fix crossNode value when receiving messages patch by Sylvain Lebresne; reviewed by Stefania Alborghetti for CASSANDRA-12791 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9d9a1a12 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9d9a1a12 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9d9a1a12 Branch: refs/heads/cassandra-3.X Commit: 9d9a1a12248eb37affd0cb131c8aa4f658c3bcc9 Parents: d2d7299 Author: Sylvain Lebresne <[email protected]> Authored: Thu Oct 27 11:23:18 2016 +0800 Committer: Stefania Alborghetti <[email protected]> Committed: Thu Nov 3 16:09:34 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/ReadCommandVerbHandler.java | 4 +- .../db/monitoring/ConstructionTime.java | 41 ----------- .../cassandra/db/monitoring/Monitorable.java | 3 +- .../db/monitoring/MonitorableImpl.java | 22 ++++-- .../cassandra/db/monitoring/MonitoringTask.java | 10 +-- .../cassandra/net/IncomingTcpConnection.java | 2 +- .../cassandra/net/MessageDeliveryTask.java | 4 +- .../org/apache/cassandra/net/MessageIn.java | 71 +++++++++++++------- .../apache/cassandra/net/MessagingService.java | 44 ++++++------ .../apache/cassandra/service/ReadCallback.java | 3 +- .../apache/cassandra/service/StorageProxy.java | 3 +- .../db/monitoring/MonitoringTaskTest.java | 56 ++++++++------- .../org/apache/cassandra/hints/HintTest.java | 4 +- .../cassandra/hints/HintsServiceTest.java | 3 +- .../apache/cassandra/net/MatcherResponse.java | 2 +- .../cassandra/net/MessagingServiceTest.java | 16 ++--- .../cassandra/net/MockMessagingServiceTest.java | 6 +- .../cassandra/service/DataResolverTest.java | 3 +- 19 files changed, 147 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 25facd5..40aace3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Fix crossNode value when receiving messages (CASSANDRA-12791) * Don't load MX4J beans twice (CASSANDRA-12869) * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838) * Set JOINING mode when running pre-join tasks (CASSANDRA-12836) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index 7948590..a71e92d 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -41,7 +41,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> } ReadCommand command = message.payload; - command.setMonitoringTime(message.constructionTime, message.getTimeout(), message.getSlowQueryTimeout()); + command.setMonitoringTime(message.constructionTime, message.isCrossNode(), message.getTimeout(), message.getSlowQueryTimeout()); ReadResponse response; try (ReadExecutionController executionController = command.executionController(); @@ -53,7 +53,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> if (!command.complete()) { Tracing.trace("Discarding partial response to {} (timed out)", message.from); - MessagingService.instance().incrementDroppedMessages(message, System.currentTimeMillis() - message.constructionTime.timestamp); + MessagingService.instance().incrementDroppedMessages(message, message.getLifetimeInMS()); return; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java b/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java deleted file mode 100644 index d6b6078..0000000 --- a/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db.monitoring; - -public final class ConstructionTime -{ - public final long timestamp; - public final boolean isCrossNode; - - public ConstructionTime() - { - this(ApproximateTime.currentTimeMillis()); - } - - public ConstructionTime(long timestamp) - { - this(timestamp, false); - } - - public ConstructionTime(long timestamp, boolean isCrossNode) - { - this.timestamp = timestamp; - this.isCrossNode = isCrossNode; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/monitoring/Monitorable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java index f4c5ee8..c9bf94e 100644 --- a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java +++ b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java @@ -21,7 +21,7 @@ package org.apache.cassandra.db.monitoring; public interface Monitorable { String name(); - ConstructionTime constructionTime(); + long constructionTime(); long timeout(); long slowTimeout(); @@ -29,6 +29,7 @@ public interface Monitorable boolean isAborted(); boolean isCompleted(); boolean isSlow(); + boolean isCrossNode(); boolean abort(); boolean complete(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java index 7363e10..48c8152 100644 --- a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java +++ b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java @@ -22,9 +22,10 @@ public abstract class MonitorableImpl implements Monitorable { private MonitoringState state; private boolean isSlow; - private ConstructionTime constructionTime; + private long constructionTime = -1; private long timeout; private long slowTimeout; + private boolean isCrossNode; protected MonitorableImpl() { @@ -37,14 +38,16 @@ public abstract class MonitorableImpl implements Monitorable * is too complex, it would require passing new parameters to all serializers * or specializing the serializers to accept these message properties. */ - public void setMonitoringTime(ConstructionTime constructionTime, long timeout, long slowTimeout) + public void setMonitoringTime(long constructionTime, boolean isCrossNode, long timeout, long slowTimeout) { + assert constructionTime >= 0; this.constructionTime = constructionTime; + this.isCrossNode = isCrossNode; this.timeout = timeout; this.slowTimeout = slowTimeout; } - public ConstructionTime constructionTime() + public long constructionTime() { return constructionTime; } @@ -54,6 +57,11 @@ public abstract class MonitorableImpl implements Monitorable return timeout; } + public boolean isCrossNode() + { + return isCrossNode; + } + public long slowTimeout() { return slowTimeout; @@ -87,7 +95,7 @@ public abstract class MonitorableImpl implements Monitorable { if (state == MonitoringState.IN_PROGRESS) { - if (constructionTime != null) + if (constructionTime >= 0) MonitoringTask.addFailedOperation(this, ApproximateTime.currentTimeMillis()); state = MonitoringState.ABORTED; @@ -101,7 +109,7 @@ public abstract class MonitorableImpl implements Monitorable { if (state == MonitoringState.IN_PROGRESS) { - if (isSlow && slowTimeout > 0 && constructionTime != null) + if (isSlow && slowTimeout > 0 && constructionTime >= 0) MonitoringTask.addSlowOperation(this, ApproximateTime.currentTimeMillis()); state = MonitoringState.COMPLETED; @@ -113,10 +121,10 @@ public abstract class MonitorableImpl implements Monitorable private void check() { - if (constructionTime == null || state != MonitoringState.IN_PROGRESS) + if (constructionTime < 0 || state != MonitoringState.IN_PROGRESS) return; - long elapsed = ApproximateTime.currentTimeMillis() - constructionTime.timestamp; + long elapsed = ApproximateTime.currentTimeMillis() - constructionTime; if (elapsed >= slowTimeout && !isSlow) isSlow = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java index b116485..9426042 100644 --- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java +++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java @@ -330,7 +330,7 @@ class MonitoringTask { this.operation = operation; numTimesReported = 1; - totalTime = failedAt - operation.constructionTime().timestamp; + totalTime = failedAt - operation.constructionTime(); minTime = totalTime; maxTime = totalTime; } @@ -370,7 +370,7 @@ class MonitoringTask name(), totalTime, operation.timeout(), - operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec"); + operation.isCrossNode() ? "msec/cross-node" : "msec"); else return String.format("<%s> timed out %d times, avg/min/max %d/%d/%d msec, timeout %d %s", name(), @@ -379,7 +379,7 @@ class MonitoringTask minTime, maxTime, operation.timeout(), - operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec"); + operation.isCrossNode() ? "msec/cross-node" : "msec"); } } @@ -400,7 +400,7 @@ class MonitoringTask name(), totalTime, operation.slowTimeout(), - operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec"); + operation.isCrossNode() ? "msec/cross-node" : "msec"); else return String.format("<%s>, was slow %d times: avg/min/max %d/%d/%d msec - slow timeout %d %s", name(), @@ -409,7 +409,7 @@ class MonitoringTask minTime, maxTime, operation.slowTimeout(), - operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec"); + operation.isCrossNode() ? "msec/cross-node" : "msec"); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index cd80e00..7d3c607 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -188,7 +188,7 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos else id = input.readInt(); - MessageIn message = MessageIn.read(input, version, id, MessageIn.readTimestamp(from, input, System.currentTimeMillis())); + MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input)); if (message == null) { // callback expired; nothing to do http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index c97a98f..c91e9da 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -46,7 +46,7 @@ public class MessageDeliveryTask implements Runnable public void run() { MessagingService.Verb verb = message.verb; - long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp; + long timeTaken = message.getLifetimeInMS(); if (MessagingService.DROPPABLE_VERBS.contains(verb) && timeTaken > message.getTimeout()) { @@ -82,7 +82,7 @@ public class MessageDeliveryTask implements Runnable } if (GOSSIP_VERBS.contains(message.verb)) - Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp); + Gossiper.instance.setLastProcessedMessageAt(message.constructionTime); } private void handleFailure(Throwable t) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index 0562df6..a254741 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -26,7 +26,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.monitoring.ConstructionTime; +import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; @@ -39,14 +39,14 @@ public class MessageIn<T> public final Map<String, byte[]> parameters; public final MessagingService.Verb verb; public final int version; - public final ConstructionTime constructionTime; + public final long constructionTime; private MessageIn(InetAddress from, T payload, Map<String, byte[]> parameters, MessagingService.Verb verb, int version, - ConstructionTime constructionTime) + long constructionTime) { this.from = from; this.payload = payload; @@ -61,17 +61,26 @@ public class MessageIn<T> Map<String, byte[]> parameters, MessagingService.Verb verb, int version, - ConstructionTime constructionTime) + long constructionTime) { return new MessageIn<>(from, payload, parameters, verb, version, constructionTime); } + public static <T> MessageIn<T> create(InetAddress from, + T payload, + Map<String, byte[]> parameters, + MessagingService.Verb verb, + int version) + { + return new MessageIn<>(from, payload, parameters, verb, version, ApproximateTime.currentTimeMillis()); + } + public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id) throws IOException { - return read(in, version, id, new ConstructionTime()); + return read(in, version, id, ApproximateTime.currentTimeMillis()); } - public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, ConstructionTime constructionTime) throws IOException + public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime) throws IOException { InetAddress from = CompactEndpointSerializationHelper.deserialize(in); @@ -115,28 +124,42 @@ public class MessageIn<T> return MessageIn.create(from, payload, parameters, verb, version, constructionTime); } - public static ConstructionTime createTimestamp() + public static long readConstructionTime(InetAddress from, DataInputPlus input) throws IOException { - return new ConstructionTime(); + long currentTime = ApproximateTime.currentTimeMillis(); + + // Reconstruct the message construction time sent by the remote host (we sent only the lower 4 bytes, assuming the + // higher 4 bytes wouldn't change between the sender and receiver) + int partial = input.readInt(); // make sure to readInt, even if cross_node_to is not enabled + long sentConstructionTime = (currentTime & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); + + // Because nodes may not have their clock perfectly in sync, it's actually possible the sentConstructionTime is + // later than the currentTime (the received time). If that's the case, as we definitively know there is a lack + // of proper synchronziation of the clock, we ignore sentConstructionTime. We also ignore that + // sentConstructionTime if we're told to. + long elapsed = currentTime - sentConstructionTime; + if (elapsed > 0) + MessagingService.instance().metrics.addTimeTaken(from, elapsed); + + boolean useSentTime = DatabaseDescriptor.hasCrossNodeTimeout() && elapsed > 0; + return useSentTime ? sentConstructionTime : currentTime; } - public static ConstructionTime readTimestamp(InetAddress from, DataInputPlus input, long timestamp) throws IOException + /** + * Since how long (in milliseconds) the message has lived. + */ + public long getLifetimeInMS() { - // make sure to readInt, even if cross_node_to is not enabled - int partial = input.readInt(); - long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); - if (timestamp > crossNodeTimestamp) - { - MessagingService.instance().metrics.addTimeTaken(from, timestamp - crossNodeTimestamp); - } - if(DatabaseDescriptor.hasCrossNodeTimeout()) - { - return new ConstructionTime(crossNodeTimestamp, timestamp != crossNodeTimestamp); - } - else - { - return new ConstructionTime(); - } + return ApproximateTime.currentTimeMillis() - constructionTime; + } + + /** + * Whether the message has crossed the node boundary, that is whether it originated from another node. + * + */ + public boolean isCrossNode() + { + return !from.equals(DatabaseDescriptor.getBroadcastAddress()); } public Stage getMessageType() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index ba40a58..f82e80b 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -426,14 +426,14 @@ public final class MessagingService implements MessagingServiceMBean private static final class DroppedMessages { final DroppedMessageMetrics metrics; - final AtomicInteger droppedInternalTimeout; - final AtomicInteger droppedCrossNodeTimeout; + final AtomicInteger droppedInternal; + final AtomicInteger droppedCrossNode; DroppedMessages(Verb verb) { this.metrics = new DroppedMessageMetrics(verb); - this.droppedInternalTimeout = new AtomicInteger(0); - this.droppedCrossNodeTimeout = new AtomicInteger(0); + this.droppedInternal = new AtomicInteger(0); + this.droppedCrossNode = new AtomicInteger(0); } } @@ -1160,19 +1160,19 @@ public final class MessagingService implements MessagingServiceMBean { updateDroppedMutationCount((IMutation) message.payload); } - incrementDroppedMessages(message.verb, timeTaken, message.constructionTime.isCrossNode); + incrementDroppedMessages(message.verb, timeTaken, message.isCrossNode()); } - public void incrementDroppedMessages(Verb verb, long timeTaken, boolean isCrossNodeTimeout) + public void incrementDroppedMessages(Verb verb, long timeTaken, boolean isCrossNode) { assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped"; - incrementDroppedMessages(droppedMessagesMap.get(verb), timeTaken, isCrossNodeTimeout); + incrementDroppedMessages(droppedMessagesMap.get(verb), timeTaken, isCrossNode); } - public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout) + public void incrementDroppedMessages(Verb verb, boolean isCrossNode) { assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped"; - incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout); + incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNode); } private void updateDroppedMutationCount(IMutation mutation) @@ -1189,22 +1189,22 @@ public final class MessagingService implements MessagingServiceMBean } } - private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNodeTimeout) + private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNode) { - if (isCrossNodeTimeout) + if (isCrossNode) droppedMessages.metrics.crossNodeDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS); else droppedMessages.metrics.internalDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS); - incrementDroppedMessages(droppedMessages, isCrossNodeTimeout); + incrementDroppedMessages(droppedMessages, isCrossNode); } - private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNodeTimeout) + private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNode) { droppedMessages.metrics.dropped.mark(); - if (isCrossNodeTimeout) - droppedMessages.droppedCrossNodeTimeout.incrementAndGet(); + if (isCrossNode) + droppedMessages.droppedCrossNode.incrementAndGet(); else - droppedMessages.droppedInternalTimeout.incrementAndGet(); + droppedMessages.droppedInternal.incrementAndGet(); } private void logDroppedMessages() @@ -1226,16 +1226,16 @@ public final class MessagingService implements MessagingServiceMBean Verb verb = entry.getKey(); DroppedMessages droppedMessages = entry.getValue(); - int droppedInternalTimeout = droppedMessages.droppedInternalTimeout.getAndSet(0); - int droppedCrossNodeTimeout = droppedMessages.droppedCrossNodeTimeout.getAndSet(0); - if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0) + int droppedInternal = droppedMessages.droppedInternal.getAndSet(0); + int droppedCrossNode = droppedMessages.droppedCrossNode.getAndSet(0); + if (droppedInternal > 0 || droppedCrossNode > 0) { - ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout." + ret.add(String.format("%s messages were dropped in last %d ms: %d internal and %d cross node." + " Mean internal dropped latency: %d ms and Mean cross-node dropped latency: %d ms", verb, LOG_DROPPED_INTERVAL_IN_MS, - droppedInternalTimeout, - droppedCrossNodeTimeout, + droppedInternal, + droppedCrossNode, TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.internalDroppedLatency.getSnapshot().getMean()), TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.crossNodeDroppedLatency.getSnapshot().getMean()))); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index b5cb477..11c0b12 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -198,8 +198,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> result, Collections.<String, byte[]>emptyMap(), MessagingService.Verb.INTERNAL_RESPONSE, - MessagingService.current_version, - MessageIn.createTimestamp()); + MessagingService.current_version); response(message); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 529e4e3..e0be68c 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -52,7 +52,6 @@ import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; -import org.apache.cassandra.db.monitoring.ConstructionTime; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.view.ViewUtils; @@ -1876,7 +1875,7 @@ public class StorageProxy implements StorageProxyMBean { try { - command.setMonitoringTime(new ConstructionTime(constructionTime), verb.getTimeout(), DatabaseDescriptor.getSlowQueryTimeout()); + command.setMonitoringTime(constructionTime, false, verb.getTimeout(), DatabaseDescriptor.getSlowQueryTimeout()); ReadResponse response; try (ReadExecutionController executionController = command.executionController(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java b/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java index 14659e3..acc988f 100644 --- a/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java +++ b/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java @@ -65,10 +65,10 @@ public class MonitoringTaskTest { private final String name; - TestMonitor(String name, ConstructionTime constructionTime, long timeout, long slow) + TestMonitor(String name, long timestamp, boolean isCrossNode, long timeout, long slow) { this.name = name; - setMonitoringTime(constructionTime, timeout, slow); + setMonitoringTime(timestamp, isCrossNode, timeout, slow); } public String name() @@ -124,7 +124,7 @@ public class MonitoringTaskTest @Test public void testAbort() throws InterruptedException { - Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + Monitorable operation = new TestMonitor("Test abort", System.currentTimeMillis(), false, timeout, slowTimeout); waitForOperationsToComplete(operation); assertTrue(operation.isAborted()); @@ -135,7 +135,7 @@ public class MonitoringTaskTest @Test public void testAbortIdemPotent() throws InterruptedException { - Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + Monitorable operation = new TestMonitor("Test abort", System.currentTimeMillis(), false, timeout, slowTimeout); waitForOperationsToComplete(operation); assertTrue(operation.abort()); @@ -148,7 +148,7 @@ public class MonitoringTaskTest @Test public void testAbortCrossNode() throws InterruptedException { - Monitorable operation = new TestMonitor("Test for cross node", new ConstructionTime(System.currentTimeMillis(), true), timeout, slowTimeout); + Monitorable operation = new TestMonitor("Test for cross node", System.currentTimeMillis(), true, timeout, slowTimeout); waitForOperationsToComplete(operation); assertTrue(operation.isAborted()); @@ -159,7 +159,7 @@ public class MonitoringTaskTest @Test public void testComplete() throws InterruptedException { - Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + Monitorable operation = new TestMonitor("Test complete", System.currentTimeMillis(), false, timeout, slowTimeout); operation.complete(); waitForOperationsToComplete(operation); @@ -171,7 +171,7 @@ public class MonitoringTaskTest @Test public void testCompleteIdemPotent() throws InterruptedException { - Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + Monitorable operation = new TestMonitor("Test complete", System.currentTimeMillis(), false, timeout, slowTimeout); operation.complete(); waitForOperationsToComplete(operation); @@ -185,7 +185,7 @@ public class MonitoringTaskTest @Test public void testReportSlow() throws InterruptedException { - Monitorable operation = new TestMonitor("Test report slow", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + Monitorable operation = new TestMonitor("Test report slow", System.currentTimeMillis(), false, timeout, slowTimeout); waitForOperationsToBeReportedAsSlow(operation); assertTrue(operation.isSlow()); @@ -199,7 +199,7 @@ public class MonitoringTaskTest public void testNoReportSlowIfZeroSlowTimeout() throws InterruptedException { // when the slow timeout is set to zero then operation won't be reported as slow - Monitorable operation = new TestMonitor("Test report slow disabled", new ConstructionTime(System.currentTimeMillis()), timeout, 0); + Monitorable operation = new TestMonitor("Test report slow disabled", System.currentTimeMillis(), false, timeout, 0); waitForOperationsToBeReportedAsSlow(operation); assertTrue(operation.isSlow()); @@ -212,7 +212,7 @@ public class MonitoringTaskTest @Test public void testReport() throws InterruptedException { - Monitorable operation = new TestMonitor("Test report", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + Monitorable operation = new TestMonitor("Test report", System.currentTimeMillis(), false, timeout, slowTimeout); waitForOperationsToComplete(operation); assertTrue(operation.isSlow()); @@ -233,13 +233,13 @@ public class MonitoringTaskTest MonitoringTask.instance = MonitoringTask.make(10, -1); try { - Monitorable operation1 = new TestMonitor("Test report 1", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + Monitorable operation1 = new TestMonitor("Test report 1", System.currentTimeMillis(), false, timeout, slowTimeout); waitForOperationsToComplete(operation1); assertTrue(operation1.isAborted()); assertFalse(operation1.isCompleted()); - Monitorable operation2 = new TestMonitor("Test report 2", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + Monitorable operation2 = new TestMonitor("Test report 2", System.currentTimeMillis(), false, timeout, slowTimeout); waitForOperationsToBeReportedAsSlow(operation2); operation2.complete(); @@ -266,7 +266,7 @@ public class MonitoringTaskTest for (int i = 0; i < opCount; i++) { executorService.submit(() -> - operations.add(new TestMonitor(UUID.randomUUID().toString(), new ConstructionTime(), timeout, slowTimeout)) + operations.add(new TestMonitor(UUID.randomUUID().toString(), System.currentTimeMillis(), false, timeout, slowTimeout)) ); } @@ -311,13 +311,17 @@ public class MonitoringTaskTest for (int j = 0; j < numTimes; j++) { Monitorable operation1 = new TestMonitor(operationName, - new ConstructionTime(System.currentTimeMillis()), - timeout, slowTimeout); + System.currentTimeMillis(), + false, + timeout, + slowTimeout); waitForOperationsToComplete(operation1); Monitorable operation2 = new TestMonitor(operationName, - new ConstructionTime(System.currentTimeMillis()), - timeout, slowTimeout); + System.currentTimeMillis(), + false, + timeout, + slowTimeout); waitForOperationsToBeReportedAsSlow(operation2); operation2.complete(); } @@ -362,8 +366,10 @@ public class MonitoringTaskTest try { Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName failed", - new ConstructionTime(System.currentTimeMillis()), - timeout, slowTimeout); + System.currentTimeMillis(), + false, + timeout, + slowTimeout); operations.add(operation); } finally @@ -394,8 +400,10 @@ public class MonitoringTaskTest try { Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName slow", - new ConstructionTime(System.currentTimeMillis()), - timeout, slowTimeout); + System.currentTimeMillis(), + false, + timeout, + slowTimeout); operations.add(operation); } finally @@ -428,8 +436,10 @@ public class MonitoringTaskTest try { Monitorable operation = new TestMonitor("Test thread " + Thread.currentThread().getName(), - new ConstructionTime(System.currentTimeMillis()), - timeout, slowTimeout); + System.currentTimeMillis(), + false, + timeout, + slowTimeout); operations.add(operation); operation.complete(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/hints/HintTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintTest.java b/test/unit/org/apache/cassandra/hints/HintTest.java index 4cc2188..e4a33fd 100644 --- a/test/unit/org/apache/cassandra/hints/HintTest.java +++ b/test/unit/org/apache/cassandra/hints/HintTest.java @@ -232,7 +232,7 @@ public class HintTest // Process hint message. HintMessage message = new HintMessage(localId, hint); MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb( - MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version, MessageIn.createTimestamp()), + MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version), -1); // hint should not be applied as we no longer are a replica @@ -277,7 +277,7 @@ public class HintTest // Process hint message. HintMessage message = new HintMessage(localId, hint); MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb( - MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version, MessageIn.createTimestamp()), + MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version), -1); // hint should not be applied as we no longer are a replica http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/hints/HintsServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java index ffb7f73..077a9d1 100644 --- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java @@ -184,8 +184,7 @@ public class HintsServiceTest HintResponse.instance, Collections.emptyMap(), MessagingService.Verb.REQUEST_RESPONSE, - MessagingService.current_version, - MessageIn.createTimestamp()); + MessagingService.current_version); MockMessagingSpy spy; if (noOfResponses != -1) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/net/MatcherResponse.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java b/test/unit/org/apache/cassandra/net/MatcherResponse.java index c8984eb..21a75c9 100644 --- a/test/unit/org/apache/cassandra/net/MatcherResponse.java +++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java @@ -106,7 +106,7 @@ public class MatcherResponse if (payload == null) return null; else - return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version, MessageIn.createTimestamp()); + return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version); }, limit); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index 2a3ecbe..ec27b7e 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -82,8 +82,8 @@ public class MessagingServiceTest List<String> logs = messagingService.getDroppedMessagesLogs(); assertEquals(1, logs.size()); - assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout. Mean internal dropped latency: 2730 ms and Mean cross-node dropped latency: 2731 ms", logs.get(0)); - assertEquals(5000, (int)messagingService.getDroppedMessages().get(verb.toString())); + assertEquals("READ messages were dropped in last 5000 ms: 2500 internal and 2500 cross node. Mean internal dropped latency: 2730 ms and Mean cross-node dropped latency: 2731 ms", logs.get(0)); + assertEquals(5000, (int) messagingService.getDroppedMessages().get(verb.toString())); logs = messagingService.getDroppedMessagesLogs(); assertEquals(0, logs.size()); @@ -92,8 +92,8 @@ public class MessagingServiceTest messagingService.incrementDroppedMessages(verb, i, i % 2 == 0); logs = messagingService.getDroppedMessagesLogs(); - assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout. Mean internal dropped latency: 2277 ms and Mean cross-node dropped latency: 2278 ms", logs.get(0)); - assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString())); + assertEquals("READ messages were dropped in last 5000 ms: 1250 internal and 1250 cross node. Mean internal dropped latency: 2277 ms and Mean cross-node dropped latency: 2278 ms", logs.get(0)); + assertEquals(7500, (int) messagingService.getDroppedMessages().get(verb.toString())); } @Test @@ -108,7 +108,7 @@ public class MessagingServiceTest long sentAt = now - latency; assertNull(dcLatency.get("datacenter1")); - addDCLatency(sentAt, now); + addDCLatency(sentAt); assertNotNull(dcLatency.get("datacenter1")); assertEquals(1, dcLatency.get("datacenter1").getCount()); long expectedBucket = bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, TimeUnit.MILLISECONDS.toNanos(latency))) - 1]; @@ -128,7 +128,7 @@ public class MessagingServiceTest long sentAt = now - latency; assertNull(dcLatency.get("datacenter1")); - addDCLatency(sentAt, now); + addDCLatency(sentAt); assertNull(dcLatency.get("datacenter1")); } @@ -221,7 +221,7 @@ public class MessagingServiceTest assertFalse(MockBackPressureStrategy.applied); } - private static void addDCLatency(long sentAt, long now) throws IOException + private static void addDCLatency(long sentAt) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos)) @@ -229,7 +229,7 @@ public class MessagingServiceTest out.writeInt((int) sentAt); } DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray())); - MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now); + MessageIn.readConstructionTime(InetAddress.getLocalHost(), in); } public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState> http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java index ce94f33..3f6564e 100644 --- a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java @@ -59,9 +59,7 @@ public class MockMessagingServiceTest EchoMessage.instance, Collections.emptyMap(), MessagingService.Verb.ECHO, - MessagingService.current_version, - MessageIn.createTimestamp() - ); + MessagingService.current_version); MockMessagingSpy spy = MockMessagingService .when( all( @@ -94,4 +92,4 @@ public class MockMessagingServiceTest // and return a mocked response assertEquals(1, spy.mockedMessageResponses); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java index 93415ba..b7624ca 100644 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@ -828,8 +828,7 @@ public class DataResolverTest ReadResponse.createRemoteDataResponse(partitionIterator, cmd), Collections.EMPTY_MAP, MessagingService.Verb.REQUEST_RESPONSE, - MessagingService.current_version, - MessageIn.createTimestamp()); + MessagingService.current_version); } private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
