Updated Branches: refs/heads/trunk 29974346a -> 317dca584
Add retry mechanism to OTC for non-droppable_verbs patch by jasobrown; reviewed by jbellis for CASSANDRA-5393 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/262e006e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/262e006e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/262e006e Branch: refs/heads/trunk Commit: 262e006eb76ca7fce366fd51310a980fd5312837 Parents: 9fc071a Author: Jason Brown <[email protected]> Authored: Thu Apr 18 13:38:49 2013 -0700 Committer: Jason Brown <[email protected]> Committed: Thu Apr 18 13:38:49 2013 -0700 ---------------------------------------------------------------------- .../cassandra/net/OutboundTcpConnection.java | 54 ++++++++++++-- 1 files changed, 46 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/262e006e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 2da678c..b1a1fe9 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -82,7 +82,7 @@ public class OutboundTcpConnection extends Thread expireMessages(); try { - backlog.put(new QueuedMessage(message, id, System.currentTimeMillis())); + backlog.put(new QueuedMessage(message, id)); } catch (InterruptedException e) { @@ -192,12 +192,31 @@ public class OutboundTcpConnection extends Thread } catch (Exception e) { - // Non IO exceptions is likely a programming error so let's not silence it - if (!(e instanceof IOException)) - logger.error("error writing to " + poolReference.endPoint(), e); - else if (logger.isDebugEnabled()) - logger.debug("error writing to " + poolReference.endPoint(), e); disconnect(); + if (e instanceof IOException) + { + if (logger.isDebugEnabled()) + logger.debug("error writing to " + poolReference.endPoint(), e); + + // if the message was important, such as a repair acknowledgement, put it back on the queue + // to retry after re-connecting. See CASSANDRA-5393 + if (e instanceof SocketException && qm.shouldRetry()) + { + try + { + backlog.put(new RetriedQueuedMessage(qm)); + } + catch (InterruptedException e1) + { + throw new AssertionError(e1); + } + } + } + else + { + // Non IO exceptions are likely a programming error so let's not silence them + logger.error("error writing to " + poolReference.endPoint(), e); + } } } @@ -366,17 +385,36 @@ public class OutboundTcpConnection extends Thread } } + /** messages that have not been retried yet */ private static class QueuedMessage { final MessageOut<?> message; final String id; final long timestamp; - QueuedMessage(MessageOut<?> message, String id, long timestamp) + QueuedMessage(MessageOut<?> message, String id) { this.message = message; this.id = id; - this.timestamp = timestamp; + this.timestamp = System.currentTimeMillis(); + } + + boolean shouldRetry() + { + return MessagingService.DROPPABLE_VERBS.contains(message.verb); + } + } + + private static class RetriedQueuedMessage extends QueuedMessage + { + RetriedQueuedMessage(QueuedMessage msg) + { + super(msg.message, msg.id); + } + + boolean shouldRetry() + { + return false; } } }
