Add retry mechanism to OTC for non-droppable_verbs
patch by jasobrown; reviewed by jbellis for CASSANDRA-5393


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2762f484
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2762f484
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2762f484

Branch: refs/heads/cassandra-1.2
Commit: 2762f484c90152eb23c641732ecf6f650e937bfc
Parents: 3b4b143
Author: Jason Brown <jasedbr...@gmail.com>
Authored: Thu Apr 18 11:32:27 2013 -0700
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Thu Apr 18 11:32:27 2013 -0700

----------------------------------------------------------------------
 .../cassandra/net/OutboundTcpConnection.java       |   61 ++++++++++++---
 1 files changed, 51 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2762f484/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 73d0c15..ae9f028 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -25,6 +25,7 @@ import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.Socket;
+import java.net.SocketException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -72,7 +73,7 @@ public class OutboundTcpConnection extends Thread
         expireMessages();
         try
         {
-            backlog.put(new Entry(message, id, System.currentTimeMillis()));
+            backlog.put(new Entry(message, id));
         }
         catch (InterruptedException e)
         {
@@ -127,7 +128,7 @@ public class OutboundTcpConnection extends Thread
             if (entry.timestamp < System.currentTimeMillis() - 
DatabaseDescriptor.getRpcTimeout())
                 dropped.incrementAndGet();
             else if (socket != null || connect())
-                writeConnected(m, id);
+                writeConnected(entry, id);
             else
                 // clear out the queue, else gossip messages back up.
                 active.clear();
@@ -149,8 +150,9 @@ public class OutboundTcpConnection extends Thread
         return dropped.get();
     }
 
-    private void writeConnected(Message message, String id)
+    private void writeConnected(Entry entry, String id)
     {
+        Message message = entry.message;
         try
         {
             write(message, id, out);
@@ -162,12 +164,32 @@ public class OutboundTcpConnection extends Thread
         }
         catch (Exception e)
         {
-            // Non IO exceptions is likely a programming error so let's not 
silence it
-            if (!(e instanceof IOException))
-                logger.error("error writing to " + poolReference.endPoint(), 
e);
-            else if (logger.isDebugEnabled())
-                logger.debug("error writing to " + poolReference.endPoint(), 
e);
             disconnect();
+
+            if (e instanceof IOException)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("error writing to " + 
poolReference.endPoint(), e);
+
+                // if the message was important, such as a repair 
acknowledgement, put it back on the queue
+                // to retry after re-connecting.  See CASSANDRA-5393
+                if (e instanceof SocketException && entry.shouldRetry())
+                {
+                    try
+                    {
+                        backlog.put(new RetriedEntry(entry));
+                    }
+                    catch (InterruptedException e1)
+                    {
+                        throw new AssertionError(e1);
+                    }
+                }
+            }
+            else
+            {
+                // Non IO exceptions are likely a programming error so let's 
not silence them
+                logger.error("error writing to " + poolReference.endPoint(), 
e);
+            }
         }
     }
 
@@ -283,17 +305,36 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
+    /** messages that have not been retried yet */
     private static class Entry
     {
         final Message message;
         final String id;
         final long timestamp;
 
-        Entry(Message message, String id, long timestamp)
+        Entry(Message message, String id)
         {
             this.message = message;
             this.id = id;
-            this.timestamp = timestamp;
+            this.timestamp = System.currentTimeMillis();
+        }
+
+        boolean shouldRetry()
+        {
+            return 
!MessagingService.DROPPABLE_VERBS.contains(message.getVerb());
+        }
+    }
+
+    private static class RetriedEntry extends Entry
+    {
+        RetriedEntry(Entry e)
+        {
+            super(e.message, e.id);
+        }
+
+        boolean shouldRetry()
+        {
+            return false;
         }
     }
 }

Reply via email to