Repository: cassandra Updated Branches: refs/heads/trunk 732b10183 -> c26bd9185
Retry all MS messages once after reopening a connection Patch by Tyler Hobbs; reviewed by Yuki Morishita for CASSANDRA-12192 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c26bd918 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c26bd918 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c26bd918 Branch: refs/heads/trunk Commit: c26bd91852cbf19d7dba9f62078f5da31b04dbe0 Parents: 732b101 Author: Tyler Hobbs <tylerlho...@gmail.com> Authored: Wed Aug 24 17:31:07 2016 -0500 Committer: Tyler Hobbs <tylerlho...@gmail.com> Committed: Wed Aug 24 17:31:07 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../apache/cassandra/db/ColumnFamilyStore.java | 4 +-- .../cassandra/net/OutboundTcpConnection.java | 33 +++++++++----------- .../net/OutboundTcpConnectionPool.java | 6 ++-- 4 files changed, 22 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index af7d0dd..eadb4a4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.10 + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) * Add support to rebuild from targeted replica (CASSANDRA-9875) * Add sequence distribution type to cassandra stress (CASSANDRA-12490) * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 14d6440..94d71ff 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2144,7 +2144,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // beginning if we restart before they [the CL segments] are discarded for // normal reasons post-truncate. To prevent this, we store truncation // position in the System keyspace. - logger.trace("truncating {}", name); + logger.info("Truncating {}.{}", keyspace.getName(), name); final long truncatedAt; final CommitLogPosition replayAfter; @@ -2197,7 +2197,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean }; runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true); - logger.trace("truncate complete"); + logger.info("Truncate of {}.{} is complete", keyspace.getName(), name); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 155cd1a..9a47fda 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -113,7 +113,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread if (coalescingWindow < 0) throw new ExceptionInInitializerError( - "Value provided for coalescing window must be greather than 0: " + coalescingWindow); + "Value provided for coalescing window must be greater than 0: " + coalescingWindow); } private static final MessageOut CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE); @@ -137,9 +137,9 @@ public class OutboundTcpConnection extends FastThreadLocalThread private volatile int currentMsgBufferCount = 0; private volatile int targetVersion; - public OutboundTcpConnection(OutboundTcpConnectionPool pool) + public OutboundTcpConnection(OutboundTcpConnectionPool pool, String name) { - super("MessagingService-Outgoing-" + pool.endPoint()); + super("MessagingService-Outgoing-" + pool.endPoint() + "-" + name); this.poolReference = pool; cs = newCoalescingStrategy(pool.endPoint().getHostAddress()); @@ -176,6 +176,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread void closeSocket(boolean destroyThread) { + logger.debug("Enqueuing socket close for {}", poolReference.endPoint()); backlog.clear(); isStopped = destroyThread; // Exit loop to stop the thread enqueue(CLOSE_SENTINEL, -1); @@ -308,11 +309,10 @@ public class OutboundTcpConnection extends FastThreadLocalThread disconnect(); if (e instanceof IOException || e.getCause() instanceof IOException) { - if (logger.isTraceEnabled()) - logger.trace("error writing to {}", poolReference.endPoint(), e); + logger.debug("Error writing to {}", poolReference.endPoint(), e); - // if the message was important, such as a repair acknowledgement, put it back on the queue - // to retry after re-connecting. See CASSANDRA-5393 + // If we haven't retried this message yet, put it back on the queue to retry after re-connecting. + // See CASSANDRA-5393 and CASSANDRA-12192. if (qm.shouldRetry()) { try @@ -370,13 +370,11 @@ public class OutboundTcpConnection extends FastThreadLocalThread try { socket.close(); - if (logger.isTraceEnabled()) - logger.trace("Socket to {} closed", poolReference.endPoint()); + logger.debug("Socket to {} closed", poolReference.endPoint()); } catch (IOException e) { - if (logger.isTraceEnabled()) - logger.trace("exception closing connection to " + poolReference.endPoint(), e); + logger.debug("Exception closing connection to {}", poolReference.endPoint(), e); } out = null; socket = null; @@ -386,8 +384,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread @SuppressWarnings("resource") private boolean connect() { - if (logger.isTraceEnabled()) - logger.trace("attempting to connect to {}", poolReference.endPoint()); + logger.debug("Attempting to connect to {}", poolReference.endPoint()); long start = System.nanoTime(); long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout()); @@ -463,7 +460,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread if (shouldCompressConnection()) { out.flush(); - logger.trace("Upgrading OutputStream to be compressed"); + logger.trace("Upgrading OutputStream to {} to be compressed", poolReference.endPoint()); if (targetVersion < MessagingService.VERSION_21) { // Snappy is buffered, so no need for extra buffering output stream @@ -481,7 +478,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread true)); // no async flushing } } - + logger.debug("Done connecting to {}", poolReference.endPoint()); return true; } catch (SSLHandshakeException e) @@ -494,8 +491,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread catch (IOException e) { socket = null; - if (logger.isTraceEnabled()) - logger.trace("unable to connect to " + poolReference.endPoint(), e); + logger.debug("Unable to connect to {}", poolReference.endPoint(), e); Uninterruptibles.sleepUninterruptibly(OPEN_RETRY_DELAY, TimeUnit.MILLISECONDS); } } @@ -582,7 +578,8 @@ public class OutboundTcpConnection extends FastThreadLocalThread boolean shouldRetry() { - return !droppable; + // retry all messages once + return true; } public long timestampNanos() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index ecadf89..0418ff6 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -56,9 +56,9 @@ public class OutboundTcpConnectionPool resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp); started = new CountDownLatch(1); - smallMessages = new OutboundTcpConnection(this); - largeMessages = new OutboundTcpConnection(this); - gossipMessages = new OutboundTcpConnection(this); + smallMessages = new OutboundTcpConnection(this, "Small"); + largeMessages = new OutboundTcpConnection(this, "Large"); + gossipMessages = new OutboundTcpConnection(this, "Gossip"); } /**