Repository: cassandra
Updated Branches:
  refs/heads/trunk 732b10183 -> c26bd9185


Retry all MS messages once after reopening a connection

Patch by Tyler Hobbs; reviewed by Yuki Morishita for CASSANDRA-12192


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c26bd918
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c26bd918
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c26bd918

Branch: refs/heads/trunk
Commit: c26bd91852cbf19d7dba9f62078f5da31b04dbe0
Parents: 732b101
Author: Tyler Hobbs <tylerlho...@gmail.com>
Authored: Wed Aug 24 17:31:07 2016 -0500
Committer: Tyler Hobbs <tylerlho...@gmail.com>
Committed: Wed Aug 24 17:31:07 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  4 +--
 .../cassandra/net/OutboundTcpConnection.java    | 33 +++++++++-----------
 .../net/OutboundTcpConnectionPool.java          |  6 ++--
 4 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af7d0dd..eadb4a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.10
+ * Retry all internode messages once after a connection is
+   closed and reopened (CASSANDRA-12192)
  * Add support to rebuild from targeted replica (CASSANDRA-9875)
  * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
  * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 14d6440..94d71ff 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2144,7 +2144,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         // beginning if we restart before they [the CL segments] are discarded 
for
         // normal reasons post-truncate.  To prevent this, we store truncation
         // position in the System keyspace.
-        logger.trace("truncating {}", name);
+        logger.info("Truncating {}.{}", keyspace.getName(), name);
 
         final long truncatedAt;
         final CommitLogPosition replayAfter;
@@ -2197,7 +2197,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         };
 
         runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, 
true);
-        logger.trace("truncate complete");
+        logger.info("Truncate of {}.{} is complete", keyspace.getName(), name);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 155cd1a..9a47fda 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -113,7 +113,7 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
 
         if (coalescingWindow < 0)
             throw new ExceptionInInitializerError(
-                    "Value provided for coalescing window must be greather 
than 0: " + coalescingWindow);
+                    "Value provided for coalescing window must be greater than 
0: " + coalescingWindow);
     }
 
     private static final MessageOut CLOSE_SENTINEL = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
@@ -137,9 +137,9 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
     private volatile int currentMsgBufferCount = 0;
     private volatile int targetVersion;
 
-    public OutboundTcpConnection(OutboundTcpConnectionPool pool)
+    public OutboundTcpConnection(OutboundTcpConnectionPool pool, String name)
     {
-        super("MessagingService-Outgoing-" + pool.endPoint());
+        super("MessagingService-Outgoing-" + pool.endPoint() + "-" + name);
         this.poolReference = pool;
         cs = newCoalescingStrategy(pool.endPoint().getHostAddress());
 
@@ -176,6 +176,7 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
 
     void closeSocket(boolean destroyThread)
     {
+        logger.debug("Enqueuing socket close for {}", 
poolReference.endPoint());
         backlog.clear();
         isStopped = destroyThread; // Exit loop to stop the thread
         enqueue(CLOSE_SENTINEL, -1);
@@ -308,11 +309,10 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
             disconnect();
             if (e instanceof IOException || e.getCause() instanceof 
IOException)
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("error writing to {}", 
poolReference.endPoint(), e);
+                logger.debug("Error writing to {}", poolReference.endPoint(), 
e);
 
-                // if the message was important, such as a repair 
acknowledgement, put it back on the queue
-                // to retry after re-connecting.  See CASSANDRA-5393
+                // If we haven't retried this message yet, put it back on the 
queue to retry after re-connecting.
+                // See CASSANDRA-5393 and CASSANDRA-12192.
                 if (qm.shouldRetry())
                 {
                     try
@@ -370,13 +370,11 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
             try
             {
                 socket.close();
-                if (logger.isTraceEnabled())
-                    logger.trace("Socket to {} closed", 
poolReference.endPoint());
+                logger.debug("Socket to {} closed", poolReference.endPoint());
             }
             catch (IOException e)
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("exception closing connection to " + 
poolReference.endPoint(), e);
+                logger.debug("Exception closing connection to {}", 
poolReference.endPoint(), e);
             }
             out = null;
             socket = null;
@@ -386,8 +384,7 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
     @SuppressWarnings("resource")
     private boolean connect()
     {
-        if (logger.isTraceEnabled())
-            logger.trace("attempting to connect to {}", 
poolReference.endPoint());
+        logger.debug("Attempting to connect to {}", poolReference.endPoint());
 
         long start = System.nanoTime();
         long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
@@ -463,7 +460,7 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
                 if (shouldCompressConnection())
                 {
                     out.flush();
-                    logger.trace("Upgrading OutputStream to be compressed");
+                    logger.trace("Upgrading OutputStream to {} to be 
compressed", poolReference.endPoint());
                     if (targetVersion < MessagingService.VERSION_21)
                     {
                         // Snappy is buffered, so no need for extra buffering 
output stream
@@ -481,7 +478,7 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
                                                                             
true)); // no async flushing
                     }
                 }
-
+                logger.debug("Done connecting to {}", 
poolReference.endPoint());
                 return true;
             }
             catch (SSLHandshakeException e)
@@ -494,8 +491,7 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
             catch (IOException e)
             {
                 socket = null;
-                if (logger.isTraceEnabled())
-                    logger.trace("unable to connect to " + 
poolReference.endPoint(), e);
+                logger.debug("Unable to connect to {}", 
poolReference.endPoint(), e);
                 Uninterruptibles.sleepUninterruptibly(OPEN_RETRY_DELAY, 
TimeUnit.MILLISECONDS);
             }
         }
@@ -582,7 +578,8 @@ public class OutboundTcpConnection extends 
FastThreadLocalThread
 
         boolean shouldRetry()
         {
-            return !droppable;
+            // retry all messages once
+            return true;
         }
 
         public long timestampNanos()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index ecadf89..0418ff6 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -56,9 +56,9 @@ public class OutboundTcpConnectionPool
         resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
         started = new CountDownLatch(1);
 
-        smallMessages = new OutboundTcpConnection(this);
-        largeMessages = new OutboundTcpConnection(this);
-        gossipMessages = new OutboundTcpConnection(this);
+        smallMessages = new OutboundTcpConnection(this, "Small");
+        largeMessages = new OutboundTcpConnection(this, "Large");
+        gossipMessages = new OutboundTcpConnection(this, "Gossip");
     }
 
     /**

Reply via email to