Updated Branches:
  refs/heads/trunk 48f5d5716 -> 952c6e536

clean up streamExecutors map
patch by jbellis; reviewed by pschuller for CASSANDRA-3679


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/952c6e53
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/952c6e53
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/952c6e53

Branch: refs/heads/trunk
Commit: 952c6e536b90abe94874813030033975e33672a4
Parents: 48f5d57
Author: Jonathan Ellis <[email protected]>
Authored: Wed Dec 28 12:09:06 2011 -0600
Committer: Jonathan Ellis <[email protected]>
Committed: Thu Dec 29 12:16:40 2011 -0600

----------------------------------------------------------------------
 .../org/apache/cassandra/net/MessagingService.java |   72 +++++----------
 1 files changed, 25 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/952c6e53/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 0a072bf..2845495 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ServerSocketChannel;
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -97,9 +98,7 @@ public final class MessagingService implements 
MessagingServiceMBean
      * means the overhead in the degenerate case of having streamed to 
everyone in the ring over time as a ring changes,
      * is not going to be a thread per node - but rather an instance per node. 
That's totally fine.
      */
-    private final HashMap<InetAddress, DebuggableThreadPoolExecutor> 
streamExecutors = new HashMap<InetAddress, DebuggableThreadPoolExecutor>();
-    /** Very rarely acquired lock protecting streamExecutors. */
-    private final Lock streamExecutorsLock = new ReentrantLock();
+    private final ConcurrentMap<InetAddress, DebuggableThreadPoolExecutor> 
streamExecutors = new NonBlockingHashMap<InetAddress, 
DebuggableThreadPoolExecutor>();
     private final AtomicInteger activeStreamsOutbound = new AtomicInteger(0);
 
     private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> 
connectionManagers_ = new NonBlockingHashMap<InetAddress, 
OutboundTcpConnectionPool>();
@@ -472,24 +471,25 @@ public final class MessagingService implements 
MessagingServiceMBean
 
     public void stream(StreamHeader header, InetAddress to)
     {
-        this.streamExecutorsLock.lock();
-        try
+        DebuggableThreadPoolExecutor executor = streamExecutors.get(to);
+        if (executor == null)
         {
-            if (!streamExecutors.containsKey(to))
+            // Using a core pool size of 0 is important. See documentation of 
streamExecutors.
+            executor = new DebuggableThreadPoolExecutor(0,
+                                                        1,
+                                                        1,
+                                                        TimeUnit.SECONDS,
+                                                        new 
LinkedBlockingQueue<Runnable>(),
+                                                        new 
NamedThreadFactory("Streaming to " + to));
+            DebuggableThreadPoolExecutor old = streamExecutors.putIfAbsent(to, 
executor);
+            if (old != null)
             {
-                // Using a core pool size of 0 is important. See documentation 
of streamExecutors.
-                streamExecutors.put(to, new DebuggableThreadPoolExecutor(0, 1, 
1, TimeUnit.SECONDS,
-                        new LinkedBlockingQueue<Runnable>(),
-                        new NamedThreadFactory("Streaming to " + to)));
+                executor.shutdown();
+                executor = old;
             }
-            DebuggableThreadPoolExecutor executor = streamExecutors.get(to);
-
-            executor.execute(new FileStreamTask(header, to));
-        }
-        finally
-        {
-            this.streamExecutorsLock.unlock();
         }
+
+        executor.execute(new FileStreamTask(header, to));
     }
 
     public void incrementActiveStreamsOutbound()
@@ -520,37 +520,15 @@ public final class MessagingService implements 
MessagingServiceMBean
 
     public void waitForStreaming() throws InterruptedException
     {
-        while (true)
-        {
-            boolean stillWaiting = false;
+        // this does not prevent new streams from beginning after a drain 
begins, but since streams are only
+        // started in response to explicit operator action 
(bootstrap/move/repair/etc) that feels like a feature.
+        for (DebuggableThreadPoolExecutor e : streamExecutors.values())
+            e.shutdown();
 
-            streamExecutorsLock.lock();
-            try
-            {
-                for (DebuggableThreadPoolExecutor e : streamExecutors.values())
-                {
-                    e.shutdown();
-                    if (!e.isTerminated())
-                    {
-                        stillWaiting = true;
-                        break;
-                    }
-                }
-            }
-            finally
-            {
-                streamExecutorsLock.unlock();
-            }
-            if (stillWaiting)
-            {
-                // Up to a second of unneeded delay is acceptable, relative to 
the amount of time a typical stream
-                // takes.
-                Thread.sleep(1000);
-            }
-            else
-            {
-                break;
-            }
+        for (DebuggableThreadPoolExecutor e : streamExecutors.values())
+        {
+            if (e.awaitTermination(24, TimeUnit.HOURS))
+                logger_.error("Stream took more than 24H to complete; 
skipping");
         }
     }
 

Reply via email to