Updated Branches:
  refs/heads/trunk 29974346a -> 317dca584

Add retry mechanism to OTC for non-droppable_verbs
patch by jasobrown; reviewed by jbellis for CASSANDRA-5393


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/262e006e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/262e006e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/262e006e

Branch: refs/heads/trunk
Commit: 262e006eb76ca7fce366fd51310a980fd5312837
Parents: 9fc071a
Author: Jason Brown <[email protected]>
Authored: Thu Apr 18 13:38:49 2013 -0700
Committer: Jason Brown <[email protected]>
Committed: Thu Apr 18 13:38:49 2013 -0700

----------------------------------------------------------------------
 .../cassandra/net/OutboundTcpConnection.java       |   54 ++++++++++++--
 1 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/262e006e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 2da678c..b1a1fe9 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -82,7 +82,7 @@ public class OutboundTcpConnection extends Thread
         expireMessages();
         try
         {
-            backlog.put(new QueuedMessage(message, id, 
System.currentTimeMillis()));
+            backlog.put(new QueuedMessage(message, id));
         }
         catch (InterruptedException e)
         {
@@ -192,12 +192,31 @@ public class OutboundTcpConnection extends Thread
         }
         catch (Exception e)
         {
-            // Non IO exceptions is likely a programming error so let's not 
silence it
-            if (!(e instanceof IOException))
-                logger.error("error writing to " + poolReference.endPoint(), 
e);
-            else if (logger.isDebugEnabled())
-                logger.debug("error writing to " + poolReference.endPoint(), 
e);
             disconnect();
+            if (e instanceof IOException)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("error writing to " + 
poolReference.endPoint(), e);
+
+                // if the message was important, such as a repair 
acknowledgement, put it back on the queue
+                // to retry after re-connecting.  See CASSANDRA-5393
+                if (e instanceof SocketException && qm.shouldRetry())
+                {
+                    try
+                    {
+                        backlog.put(new RetriedQueuedMessage(qm));
+                    }
+                    catch (InterruptedException e1)
+                    {
+                        throw new AssertionError(e1);
+                    }
+                }
+            }
+            else
+            {
+                // Non IO exceptions are likely a programming error so let's 
not silence them
+                logger.error("error writing to " + poolReference.endPoint(), 
e);
+            }
         }
     }
 
@@ -366,17 +385,36 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
+    /** messages that have not been retried yet */
     private static class QueuedMessage
     {
         final MessageOut<?> message;
         final String id;
         final long timestamp;
 
-        QueuedMessage(MessageOut<?> message, String id, long timestamp)
+        QueuedMessage(MessageOut<?> message, String id)
         {
             this.message = message;
             this.id = id;
-            this.timestamp = timestamp;
+            this.timestamp = System.currentTimeMillis();
+        }
+
+        boolean shouldRetry()
+        {
+            return MessagingService.DROPPABLE_VERBS.contains(message.verb);
+        }
+    }
+
+    private static class RetriedQueuedMessage extends QueuedMessage
+    {
+        RetriedQueuedMessage(QueuedMessage msg)
+        {
+            super(msg.message, msg.id);
+        }
+
+        boolean shouldRetry()
+        {
+            return false;
         }
     }
 }

Reply via email to