Repository: cassandra
Updated Branches:
  refs/heads/trunk 04a99ab84 -> 3c8d87f43


Add latency logging for dropped messages

Patch by akale; reviewed by pmotta for CASSANDRA-10580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c8d87f4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c8d87f4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c8d87f4

Branch: refs/heads/trunk
Commit: 3c8d87f4324e5ff8bf6b1c3652e9c5eacf03bc20
Parents: 04a99ab
Author: anubhavkale <[email protected]>
Authored: Thu Dec 10 12:28:45 2015 -0800
Committer: Joshua McKenzie <[email protected]>
Committed: Wed Dec 23 13:15:10 2015 -0500

----------------------------------------------------------------------
 .../cassandra/net/MessageDeliveryTask.java      | 42 +++++++++++++++++--
 .../apache/cassandra/service/StorageProxy.java  | 44 ++++++++++++++++++--
 2 files changed, 79 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8d87f4/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java 
b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 818cfc6..bede3d8 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -18,11 +18,13 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.IndexNotAvailableException;
@@ -43,10 +45,11 @@ public class MessageDeliveryTask implements Runnable
 
     public void run()
     {
+        long timeTaken = System.currentTimeMillis() - 
message.constructionTime.timestamp;
         MessagingService.Verb verb = message.verb;
-        if (MessagingService.DROPPABLE_VERBS.contains(verb)
-            && System.currentTimeMillis() > message.constructionTime.timestamp 
+ message.getTimeout())
+        if (MessagingService.DROPPABLE_VERBS.contains(verb)&& 
message.getTimeout() > timeTaken)
         {
+            LogDroppedMessageDetails(timeTaken);
             MessagingService.instance().incrementDroppedMessages(message);
             return;
         }
@@ -82,6 +85,37 @@ public class MessageDeliveryTask implements Runnable
             
Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp);
     }
 
+    private void LogDroppedMessageDetails(long timeTaken)
+    {
+        logger.debug("MessageDeliveryTask ran after {} ms, allowed time was {} 
ms. Dropping message {}",
+                timeTaken, message.getTimeout(), message.toString());
+        // Print KS and CF if Payload is mutation or a list of mutations (sent 
due to schema announcements)
+        IMutation mutation;
+        if (message.payload instanceof IMutation)
+        {
+            mutation = (IMutation)message.payload;
+            if (mutation != null)
+            {
+                logger.debug("MessageDeliveryTask dropped mutation of KS {}, 
CF {}", mutation.getKeyspaceName(), 
Arrays.toString(mutation.getColumnFamilyIds().toArray()));
+            }
+        }
+        else if (message.payload instanceof Collection<?>)
+        {
+            Collection<?> payloadItems = (Collection<?>)message.payload;
+            for (Object payloadItem : payloadItems)
+            {
+                if (payloadItem instanceof IMutation)
+                {
+                    mutation = (IMutation)payloadItem;
+                    if (mutation != null)
+                    {
+                        logger.debug("MessageDeliveryTask dropped mutation of 
KS {}, CF {}", mutation.getKeyspaceName(), 
Arrays.toString(mutation.getColumnFamilyIds().toArray()));
+                    }
+                }
+            }
+        }
+    }
+
     private void handleFailure(Throwable t)
     {
         if (message.doCallbackOnFailure())
@@ -95,4 +129,4 @@ public class MessageDeliveryTask implements Runnable
     private static final EnumSet<MessagingService.Verb> GOSSIP_VERBS = 
EnumSet.of(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                                                
   MessagingService.Verb.GOSSIP_DIGEST_ACK2,
                                                                                
   MessagingService.Verb.GOSSIP_DIGEST_SYN);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8d87f4/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index f161607..1c30cd7 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean
             submitHint(mutation, endpointsToHint, responseHandler);
 
         if (insertLocal)
-            performLocally(stage, mutation::apply, responseHandler);
+            performLocally(stage, mutation, mutation::apply, responseHandler);
 
         if (dcGroups != null)
         {
@@ -1286,6 +1286,27 @@ public class StorageProxy implements StorageProxyMBean
         });
     }
 
+    private static void performLocally(Stage stage, IMutation mutation, final 
Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
+    {
+        StageManager.getStage(stage).maybeExecuteImmediately(new 
LocalMutationRunnable(mutation)
+        {
+            public void runMayThrow()
+            {
+                try
+                {
+                    runnable.run();
+                    handler.response(null);
+                }
+                catch (Exception ex)
+                {
+                    if (!(ex instanceof WriteTimeoutException))
+                        logger.error("Failed to apply mutation locally : {}", 
ex);
+                    handler.onFailure(FBUtilities.getBroadcastAddress());
+                }
+            }
+        });
+    }
+
     /**
      * Handle counter mutation on the coordinator host.
      *
@@ -2408,11 +2429,28 @@ public class StorageProxy implements StorageProxyMBean
     private static abstract class LocalMutationRunnable implements Runnable
     {
         private final long constructionTime = System.currentTimeMillis();
+        private IMutation mutation;
+
+        public LocalMutationRunnable(IMutation mutation)
+        {
+            this.mutation = mutation;
+        }
+
+        public LocalMutationRunnable()
+        {
+        }
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + 
DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+            long mutationTimeout = 
DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION);
+            if (System.currentTimeMillis() > constructionTime + 
mutationTimeout)
             {
+                long timeTaken = System.currentTimeMillis() - constructionTime;
+                logger.debug("LocalMutationRunnable thread ran after {} ms, 
allowed time was {} ms. ", timeTaken, mutationTimeout);
+                if (this.mutation != null)
+                {
+                    logger.debug("MessageDeliveryTask dropped mutation of KS 
{}, CF {}", this.mutation.getKeyspaceName(), 
Arrays.toString(this.mutation.getColumnFamilyIds().toArray()));
+                }
                 
MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
                 HintRunnable runnable = new 
HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
@@ -2596,4 +2634,4 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }
-}
+}
\ No newline at end of file

Reply via email to