Updated Branches: refs/heads/trunk c550cc829 -> fdbddc132
Batch read from OTC's queue and cleanup patch by jasorown and belliotsmith for CASSANDRA-1632 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fdbddc13 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fdbddc13 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fdbddc13 Branch: refs/heads/trunk Commit: fdbddc13272bed6dfa3019e77acfb5f9107d6dce Parents: c550cc8 Author: Jason Brown <[email protected]> Authored: Thu Nov 21 15:20:28 2013 -0800 Committer: Jason Brown <[email protected]> Committed: Thu Nov 21 15:20:28 2013 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/net/OutboundTcpConnection.java | 95 ++++++++++---------- 2 files changed, 51 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdbddc13/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6a3de3c..06628a0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278) * User-defined types for CQL3 (CASSANDRA-5590) * Use of o.a.c.metrics in nodetool (CASSANDRA-5871) + * Batch read from OTC's queue and cleanup (CASSANDRA-1632) 2.0.4 http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdbddc13/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 3bdbca3..7422f22 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -25,6 +25,9 @@ import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -65,10 +68,7 @@ public class OutboundTcpConnection extends Thread static final int LZ4_HASH_SEED = 0x9747b28c; - // sending thread reads from "active" (one of queue1, queue2) until it is empty. - // then it swaps it with "backlog." - private volatile BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<QueuedMessage>(); - private volatile BlockingQueue<QueuedMessage> active = new LinkedBlockingQueue<QueuedMessage>(); + private final BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<>(); private final OutboundTcpConnectionPool poolReference; @@ -76,6 +76,7 @@ public class OutboundTcpConnection extends Thread private Socket socket; private volatile long completed; private final AtomicLong dropped = new AtomicLong(); + private volatile int currentMsgBufferCount = 0; private int targetVersion; public OutboundTcpConnection(OutboundTcpConnectionPool pool) @@ -93,7 +94,8 @@ public class OutboundTcpConnection extends Thread public void enqueue(MessageOut<?> message, int id) { - expireMessages(); + if (backlog.size() > 1024) + expireMessages(); try { backlog.put(new QueuedMessage(message, id)); @@ -106,7 +108,6 @@ public class OutboundTcpConnection extends Thread void closeSocket(boolean destroyThread) { - active.clear(); backlog.clear(); isStopped = destroyThread; // Exit loop to stop the thread enqueue(CLOSE_SENTINEL, -1); @@ -124,47 +125,61 @@ public class OutboundTcpConnection extends Thread public void run() { + // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize) + final List<QueuedMessage> drainedMessages = new ArrayList<>(128); + outer: while (true) { - QueuedMessage qm = active.poll(); - if (qm == null) + if (backlog.drainTo(drainedMessages, drainedMessages.size()) == 0) { - // exhausted the active queue. switch to backlog, once there's something to process there try { - qm = backlog.take(); + drainedMessages.add(backlog.take()); } catch (InterruptedException e) { throw new AssertionError(e); } - BlockingQueue<QueuedMessage> tmp = backlog; - backlog = active; - active = tmp; } + currentMsgBufferCount = drainedMessages.size(); - MessageOut<?> m = qm.message; - if (m == CLOSE_SENTINEL) + int count = drainedMessages.size(); + for (QueuedMessage qm : drainedMessages) { - disconnect(); - if (isStopped) - break; - continue; + try + { + MessageOut<?> m = qm.message; + if (m == CLOSE_SENTINEL) + { + disconnect(); + if (isStopped) + break outer; + continue; + } + if (qm.timestamp < System.currentTimeMillis() - m.getTimeout()) + dropped.incrementAndGet(); + else if (socket != null || connect()) + writeConnected(qm, count == 1 && backlog.size() == 0); + else + // clear out the queue, else gossip messages back up. + backlog.clear(); + } + catch (Exception e) + { + // really shouldn't get here, as exception handling in writeConnected() is reasonably robust + // but we want to catch anything bad we don't drop the messages in the current batch + logger.error("error processing a message intended for {}", poolReference.endPoint(), e); + } + currentMsgBufferCount = --count; } - if (qm.timestamp < System.currentTimeMillis() - m.getTimeout()) - dropped.incrementAndGet(); - else if (socket != null || connect()) - writeConnected(qm); - else - // clear out the queue, else gossip messages back up. - active.clear(); + drainedMessages.clear(); } } public int getPendingMessages() { - return active.size() + backlog.size(); + return backlog.size() + currentMsgBufferCount; } public long getCompletedMesssages() @@ -184,7 +199,7 @@ public class OutboundTcpConnection extends Thread || (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint())); } - private void writeConnected(QueuedMessage qm) + private void writeConnected(QueuedMessage qm, boolean flush) { try { @@ -210,7 +225,7 @@ public class OutboundTcpConnection extends Thread writeInternal(qm.message, qm.id, qm.timestamp); completed++; - if (active.peek() == null) + if (flush) out.flush(); } catch (Exception e) @@ -435,23 +450,13 @@ public class OutboundTcpConnection extends Thread private void expireMessages() { - while (true) + Iterator<QueuedMessage> iter = backlog.iterator(); + while (iter.hasNext()) { - QueuedMessage qm = backlog.peek(); - if (qm == null || qm.timestamp >= System.currentTimeMillis() - qm.message.getTimeout()) - break; - - QueuedMessage qm2 = backlog.poll(); - if (qm2 != qm) - { - // sending thread switched queues. add this entry (from the "new" backlog) - // at the end of the active queue, which keeps it in the same position relative to the other entries - // without having to contend with other clients for the head-of-backlog lock. - if (qm2 != null) - active.add(qm2); - break; - } - + QueuedMessage qm = iter.next(); + if (qm.timestamp >= System.currentTimeMillis() - qm.message.getTimeout()) + return; + iter.remove(); dropped.incrementAndGet(); } }
