Repository: cassandra Updated Branches: refs/heads/trunk 04a99ab84 -> 3c8d87f43
Add latency logging for dropped messages Patch by akale; reviewed by pmotta for CASSANDRA-10580 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c8d87f4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c8d87f4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c8d87f4 Branch: refs/heads/trunk Commit: 3c8d87f4324e5ff8bf6b1c3652e9c5eacf03bc20 Parents: 04a99ab Author: anubhavkale <[email protected]> Authored: Thu Dec 10 12:28:45 2015 -0800 Committer: Joshua McKenzie <[email protected]> Committed: Wed Dec 23 13:15:10 2015 -0500 ---------------------------------------------------------------------- .../cassandra/net/MessageDeliveryTask.java | 42 +++++++++++++++++-- .../apache/cassandra/service/StorageProxy.java | 44 ++++++++++++++++++-- 2 files changed, 79 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8d87f4/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index 818cfc6..bede3d8 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -18,11 +18,13 @@ package org.apache.cassandra.net; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.EnumSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.index.IndexNotAvailableException; @@ -43,10 +45,11 @@ public class MessageDeliveryTask implements Runnable public void run() { + long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp; MessagingService.Verb verb = message.verb; - if (MessagingService.DROPPABLE_VERBS.contains(verb) - && System.currentTimeMillis() > message.constructionTime.timestamp + message.getTimeout()) + if (MessagingService.DROPPABLE_VERBS.contains(verb)&& message.getTimeout() > timeTaken) { + LogDroppedMessageDetails(timeTaken); MessagingService.instance().incrementDroppedMessages(message); return; } @@ -82,6 +85,37 @@ public class MessageDeliveryTask implements Runnable Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp); } + private void LogDroppedMessageDetails(long timeTaken) + { + logger.debug("MessageDeliveryTask ran after {} ms, allowed time was {} ms. Dropping message {}", + timeTaken, message.getTimeout(), message.toString()); + // Print KS and CF if Payload is mutation or a list of mutations (sent due to schema announcements) + IMutation mutation; + if (message.payload instanceof IMutation) + { + mutation = (IMutation)message.payload; + if (mutation != null) + { + logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", mutation.getKeyspaceName(), Arrays.toString(mutation.getColumnFamilyIds().toArray())); + } + } + else if (message.payload instanceof Collection<?>) + { + Collection<?> payloadItems = (Collection<?>)message.payload; + for (Object payloadItem : payloadItems) + { + if (payloadItem instanceof IMutation) + { + mutation = (IMutation)payloadItem; + if (mutation != null) + { + logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", mutation.getKeyspaceName(), Arrays.toString(mutation.getColumnFamilyIds().toArray())); + } + } + } + } + } + private void handleFailure(Throwable t) { if (message.doCallbackOnFailure()) @@ -95,4 +129,4 @@ public class MessageDeliveryTask implements Runnable private static final EnumSet<MessagingService.Verb> GOSSIP_VERBS = EnumSet.of(MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.Verb.GOSSIP_DIGEST_ACK2, MessagingService.Verb.GOSSIP_DIGEST_SYN); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8d87f4/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index f161607..1c30cd7 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean submitHint(mutation, endpointsToHint, responseHandler); if (insertLocal) - performLocally(stage, mutation::apply, responseHandler); + performLocally(stage, mutation, mutation::apply, responseHandler); if (dcGroups != null) { @@ -1286,6 +1286,27 @@ public class StorageProxy implements StorageProxyMBean }); } + private static void performLocally(Stage stage, IMutation mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler) + { + StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(mutation) + { + public void runMayThrow() + { + try + { + runnable.run(); + handler.response(null); + } + catch (Exception ex) + { + if (!(ex instanceof WriteTimeoutException)) + logger.error("Failed to apply mutation locally : {}", ex); + handler.onFailure(FBUtilities.getBroadcastAddress()); + } + } + }); + } + /** * Handle counter mutation on the coordinator host. * @@ -2408,11 +2429,28 @@ public class StorageProxy implements StorageProxyMBean private static abstract class LocalMutationRunnable implements Runnable { private final long constructionTime = System.currentTimeMillis(); + private IMutation mutation; + + public LocalMutationRunnable(IMutation mutation) + { + this.mutation = mutation; + } + + public LocalMutationRunnable() + { + } public final void run() { - if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION)) + long mutationTimeout = DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION); + if (System.currentTimeMillis() > constructionTime + mutationTimeout) { + long timeTaken = System.currentTimeMillis() - constructionTime; + logger.debug("LocalMutationRunnable thread ran after {} ms, allowed time was {} ms. ", timeTaken, mutationTimeout); + if (this.mutation != null) + { + logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", this.mutation.getKeyspaceName(), Arrays.toString(this.mutation.getColumnFamilyIds().toArray())); + } MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION); HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress())) { @@ -2596,4 +2634,4 @@ public class StorageProxy implements StorageProxyMBean public long getReadRepairRepairedBackground() { return ReadRepairMetrics.repairedBackground.getCount(); } -} +} \ No newline at end of file
