Add retry mechanism to OTC for non-droppable_verbs patch by jasobrown; reviewed by jbellis for CASSANDRA-5393
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2762f484 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2762f484 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2762f484 Branch: refs/heads/cassandra-1.2 Commit: 2762f484c90152eb23c641732ecf6f650e937bfc Parents: 3b4b143 Author: Jason Brown <jasedbr...@gmail.com> Authored: Thu Apr 18 11:32:27 2013 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Thu Apr 18 11:32:27 2013 -0700 ---------------------------------------------------------------------- .../cassandra/net/OutboundTcpConnection.java | 61 ++++++++++++--- 1 files changed, 51 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2762f484/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 73d0c15..ae9f028 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -25,6 +25,7 @@ import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; +import java.net.SocketException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; @@ -72,7 +73,7 @@ public class OutboundTcpConnection extends Thread expireMessages(); try { - backlog.put(new Entry(message, id, System.currentTimeMillis())); + backlog.put(new Entry(message, id)); } catch (InterruptedException e) { @@ -127,7 +128,7 @@ public class OutboundTcpConnection extends Thread if (entry.timestamp < System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout()) dropped.incrementAndGet(); else if (socket != null || connect()) - writeConnected(m, id); + writeConnected(entry, id); else // clear out the queue, else gossip messages back up. active.clear(); @@ -149,8 +150,9 @@ public class OutboundTcpConnection extends Thread return dropped.get(); } - private void writeConnected(Message message, String id) + private void writeConnected(Entry entry, String id) { + Message message = entry.message; try { write(message, id, out); @@ -162,12 +164,32 @@ public class OutboundTcpConnection extends Thread } catch (Exception e) { - // Non IO exceptions is likely a programming error so let's not silence it - if (!(e instanceof IOException)) - logger.error("error writing to " + poolReference.endPoint(), e); - else if (logger.isDebugEnabled()) - logger.debug("error writing to " + poolReference.endPoint(), e); disconnect(); + + if (e instanceof IOException) + { + if (logger.isDebugEnabled()) + logger.debug("error writing to " + poolReference.endPoint(), e); + + // if the message was important, such as a repair acknowledgement, put it back on the queue + // to retry after re-connecting. See CASSANDRA-5393 + if (e instanceof SocketException && entry.shouldRetry()) + { + try + { + backlog.put(new RetriedEntry(entry)); + } + catch (InterruptedException e1) + { + throw new AssertionError(e1); + } + } + } + else + { + // Non IO exceptions are likely a programming error so let's not silence them + logger.error("error writing to " + poolReference.endPoint(), e); + } } } @@ -283,17 +305,36 @@ public class OutboundTcpConnection extends Thread } } + /** messages that have not been retried yet */ private static class Entry { final Message message; final String id; final long timestamp; - Entry(Message message, String id, long timestamp) + Entry(Message message, String id) { this.message = message; this.id = id; - this.timestamp = timestamp; + this.timestamp = System.currentTimeMillis(); + } + + boolean shouldRetry() + { + return !MessagingService.DROPPABLE_VERBS.contains(message.getVerb()); + } + } + + private static class RetriedEntry extends Entry + { + RetriedEntry(Entry e) + { + super(e.message, e.id); + } + + boolean shouldRetry() + { + return false; } } }