Author: jbellis Date: Tue Dec 6 02:11:50 2011 New Revision: 1210748 URL: http://svn.apache.org/viewvc?rev=1210748&view=rev Log: multithreaded streaming patch by Peter Schuller; reviewed by yukim for CASSANDRA-3494
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1210748&r1=1210747&r2=1210748&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Tue Dec 6 02:11:50 2011 @@ -1,4 +1,5 @@ 1.1-dev + * multithreaded streaming (CASSANDRA-3494) * removed in-tree redhat spec (CASSANDRA-3567) * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503) * Recycle commitlog segments for improved performance Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1210748&r1=1210747&r2=1210748&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Tue Dec 6 02:11:50 2011 @@ -86,9 +86,9 @@ public class DebuggableThreadPoolExecuto this(corePoolSize, corePoolSize, keepAliveTime, unit, queue, factory); } - protected DebuggableThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) + public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { - super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory); + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); allowCoreThreadTimeOut(true); // block task submissions until queue has room. Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1210748&r1=1210747&r2=1210748&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue Dec 6 02:11:50 2011 @@ -27,14 +27,20 @@ import java.nio.channels.AsynchronousClo import java.nio.channels.ServerSocketChannel; import java.util.*; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.base.Function; import com.google.common.collect.Lists; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,8 +87,22 @@ public final class MessagingService impl /* Lookup table for registering message handlers based on the verb. */ private final Map<StorageService.Verb, IVerbHandler> verbHandlers_; - /* Thread pool to handle messaging write activities */ - private final DebuggableThreadPoolExecutor streamExecutor_; + /** One executor per destination InetAddress for streaming. + * + * See CASSANDRA-3494 for the background. We have streaming in place so we do not want to limit ourselves to + * one stream at a time for throttling reasons. But, we also do not want to just arbitrarily stream an unlimited + * amount of files at once because a single destination might have hundreds of files pending and it would cause a + * seek storm. So, transfer exactly one file per destination host. That puts a very natural rate limit on it, in + * addition to mapping well to the expected behavior in many cases. + * + * We will create our stream executors with a core size of 0 so that they time out and do not consume threads. This + * means the overhead in the degenerate case of having streamed to everyone in the ring over time as a ring changes, + * is not going to be a thread per node - but rather an instance per node. That's totally fine. + */ + private final HashMap<InetAddress, DebuggableThreadPoolExecutor> streamExecutors = new HashMap<InetAddress, DebuggableThreadPoolExecutor>(); + /** Very rarely acquired lock protecting streamExecutors. */ + private final Lock streamExecutorsLock = new ReentrantLock(); + private final AtomicInteger activeStreamsOutbound = new AtomicInteger(0); private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>(); @@ -137,7 +157,6 @@ public final class MessagingService impl listenGate = new SimpleCondition(); verbHandlers_ = new EnumMap<StorageService.Verb, IVerbHandler>(StorageService.Verb.class); - streamExecutor_ = new DebuggableThreadPoolExecutor("Streaming", Thread.MIN_PRIORITY); Runnable logDropped = new Runnable() { public void run() @@ -449,14 +468,40 @@ public final class MessagingService impl public void stream(StreamHeader header, InetAddress to) { - /* Streaming asynchronously on streamExector_ threads. */ - streamExecutor_.execute(new FileStreamTask(header, to)); + this.streamExecutorsLock.lock(); + try + { + if (!streamExecutors.containsKey(to)) + { + // Using a core pool size of 0 is important. See documentation of streamExecutors. + streamExecutors.put(to, new DebuggableThreadPoolExecutor(0, 1, 1, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("Streaming to " + to))); + } + DebuggableThreadPoolExecutor executor = streamExecutors.get(to); + + executor.execute(new FileStreamTask(header, to)); + } + finally + { + this.streamExecutorsLock.unlock(); + } + } + + public void incrementActiveStreamsOutbound() + { + activeStreamsOutbound.incrementAndGet(); + } + + public void decrementActiveStreamsOutbound() + { + activeStreamsOutbound.decrementAndGet(); } /** The count of active outbound stream tasks. */ public int getActiveStreamsOutbound() { - return streamExecutor_.getActiveCount(); + return activeStreamsOutbound.get(); } public void register(ILatencySubscriber subcriber) @@ -464,14 +509,44 @@ public final class MessagingService impl subscribers.add(subcriber); } - public void waitForStreaming() throws InterruptedException + public void clearCallbacksUnsafe() { - streamExecutor_.awaitTermination(24, TimeUnit.HOURS); + callbacks.clear(); } - public void clearCallbacksUnsafe() + public void waitForStreaming() throws InterruptedException { - callbacks.clear(); + while (true) + { + boolean stillWaiting = false; + + streamExecutorsLock.lock(); + try + { + for (DebuggableThreadPoolExecutor e : streamExecutors.values()) + { + if (!e.isTerminated()) + { + stillWaiting = true; + break; + } + } + } + finally + { + streamExecutorsLock.unlock(); + } + if (stillWaiting) + { + // Up to a second of unneeded delay is acceptable, relative to the amount of time a typical stream + // takes. + Thread.sleep(1000); + } + else + { + break; + } + } } public void shutdown() @@ -490,7 +565,20 @@ public final class MessagingService impl throw new IOError(e); } - streamExecutor_.shutdown(); + streamExecutorsLock.lock(); + try + { + for (DebuggableThreadPoolExecutor e : streamExecutors.values()) + { + e.shutdown(); + } + } + finally + { + streamExecutorsLock.unlock(); + } + + callbacks.shutdown(); logger_.info("Waiting for in-progress requests to complete"); callbacks.shutdown(); Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1210748&r1=1210747&r2=1210748&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Tue Dec 6 02:11:50 2011 @@ -138,6 +138,7 @@ public class FileStreamTask extends Wrap // setting up data compression stream compressedoutput = new LZFOutputStream(output); + MessagingService.instance().incrementActiveStreamsOutbound(); try { // stream each of the required sections of the file @@ -170,6 +171,8 @@ public class FileStreamTask extends Wrap } finally { + MessagingService.instance().decrementActiveStreamsOutbound(); + // no matter what happens close file FileUtils.closeQuietly(file); }