Updated Branches: refs/heads/trunk 48f5d5716 -> 952c6e536
clean up streamExecutors map patch by jbellis; reviewed by pschuller for CASSANDRA-3679 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/952c6e53 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/952c6e53 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/952c6e53 Branch: refs/heads/trunk Commit: 952c6e536b90abe94874813030033975e33672a4 Parents: 48f5d57 Author: Jonathan Ellis <[email protected]> Authored: Wed Dec 28 12:09:06 2011 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Thu Dec 29 12:16:40 2011 -0600 ---------------------------------------------------------------------- .../org/apache/cassandra/net/MessagingService.java | 72 +++++---------- 1 files changed, 25 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/952c6e53/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 0a072bf..2845495 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ServerSocketChannel; import java.util.*; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -97,9 +98,7 @@ public final class MessagingService implements MessagingServiceMBean * means the overhead in the degenerate case of having streamed to everyone in the ring over time as a ring changes, * is not going to be a thread per node - but rather an instance per node. That's totally fine. */ - private final HashMap<InetAddress, DebuggableThreadPoolExecutor> streamExecutors = new HashMap<InetAddress, DebuggableThreadPoolExecutor>(); - /** Very rarely acquired lock protecting streamExecutors. */ - private final Lock streamExecutorsLock = new ReentrantLock(); + private final ConcurrentMap<InetAddress, DebuggableThreadPoolExecutor> streamExecutors = new NonBlockingHashMap<InetAddress, DebuggableThreadPoolExecutor>(); private final AtomicInteger activeStreamsOutbound = new AtomicInteger(0); private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>(); @@ -472,24 +471,25 @@ public final class MessagingService implements MessagingServiceMBean public void stream(StreamHeader header, InetAddress to) { - this.streamExecutorsLock.lock(); - try + DebuggableThreadPoolExecutor executor = streamExecutors.get(to); + if (executor == null) { - if (!streamExecutors.containsKey(to)) + // Using a core pool size of 0 is important. See documentation of streamExecutors. + executor = new DebuggableThreadPoolExecutor(0, + 1, + 1, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("Streaming to " + to)); + DebuggableThreadPoolExecutor old = streamExecutors.putIfAbsent(to, executor); + if (old != null) { - // Using a core pool size of 0 is important. See documentation of streamExecutors. - streamExecutors.put(to, new DebuggableThreadPoolExecutor(0, 1, 1, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("Streaming to " + to))); + executor.shutdown(); + executor = old; } - DebuggableThreadPoolExecutor executor = streamExecutors.get(to); - - executor.execute(new FileStreamTask(header, to)); - } - finally - { - this.streamExecutorsLock.unlock(); } + + executor.execute(new FileStreamTask(header, to)); } public void incrementActiveStreamsOutbound() @@ -520,37 +520,15 @@ public final class MessagingService implements MessagingServiceMBean public void waitForStreaming() throws InterruptedException { - while (true) - { - boolean stillWaiting = false; + // this does not prevent new streams from beginning after a drain begins, but since streams are only + // started in response to explicit operator action (bootstrap/move/repair/etc) that feels like a feature. + for (DebuggableThreadPoolExecutor e : streamExecutors.values()) + e.shutdown(); - streamExecutorsLock.lock(); - try - { - for (DebuggableThreadPoolExecutor e : streamExecutors.values()) - { - e.shutdown(); - if (!e.isTerminated()) - { - stillWaiting = true; - break; - } - } - } - finally - { - streamExecutorsLock.unlock(); - } - if (stillWaiting) - { - // Up to a second of unneeded delay is acceptable, relative to the amount of time a typical stream - // takes. - Thread.sleep(1000); - } - else - { - break; - } + for (DebuggableThreadPoolExecutor e : streamExecutors.values()) + { + if (e.awaitTermination(24, TimeUnit.HOURS)) + logger_.error("Stream took more than 24H to complete; skipping"); } }
