Updated Branches: refs/heads/cassandra-1.0 d10da1552 -> 185eca5d1
prevent slow clients from postponing shutdown indefinitely patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3727 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/185eca5d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/185eca5d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/185eca5d Branch: refs/heads/cassandra-1.0 Commit: 185eca5d1fa7e384bb888c144d06abbced0fd577 Parents: d10da15 Author: Jonathan Ellis <[email protected]> Authored: Wed Jan 11 11:44:32 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Wed Jan 11 12:48:40 2012 -0600 ---------------------------------------------------------------------- .../concurrent/DebuggableThreadPoolExecutor.java | 2 +- .../cassandra/concurrent/NamedThreadFactory.java | 1 + src/java/org/apache/cassandra/db/Memtable.java | 8 +++- .../org/apache/cassandra/net/MessagingService.java | 10 +--- .../cassandra/service/AbstractCassandraDaemon.java | 8 ++- .../apache/cassandra/service/StorageService.java | 9 ++-- .../cassandra/thrift/CustomTThreadPoolServer.java | 43 +++++++-------- .../org/apache/cassandra/utils/ExpiringMap.java | 17 ++++++ .../org/apache/cassandra/service/RemoveTest.java | 2 +- 9 files changed, 59 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java index 7a344d8..f111d37 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java @@ -108,7 +108,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor protected void onFinalRejection(Runnable task) {} @Override - public void afterExecute(Runnable r, Throwable t) + protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r,t); logExceptionsAfterExecute(r, t); http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java index 4cee8dc..a60a0d5 100644 --- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java +++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java @@ -50,6 +50,7 @@ public class NamedThreadFactory implements ThreadFactory String name = id + ":" + n.getAndIncrement(); Thread thread = new Thread(runnable, name); thread.setPriority(priority); + thread.setDaemon(true); return thread; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index fdafc6d..412b800 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator; import org.apache.cassandra.db.commitlog.ReplayPosition; @@ -57,7 +58,12 @@ public class Memtable // we're careful to only allow one count to run at a time because counting is slow // (can be minutes, for a large memtable and a busy server), so we could keep memtables // alive after they're flushed and would otherwise be GC'd. - private static final ExecutorService meterExecutor = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>()) + private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1, + 1, + Integer.MAX_VALUE, + TimeUnit.MILLISECONDS, + new SynchronousQueue<Runnable>(), + new NamedThreadFactory("MemoryMeter")) { @Override protected void afterExecute(Runnable r, Throwable t) http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 1526fa3..9ff110e 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -475,15 +475,9 @@ public final class MessagingService implements MessagingServiceMBean } /** - * There isn't a good way to shut down the MessagingService. One problem (but not the only one) - * is that StorageProxy has no way to communicate back to clients, "I'm nominally alive, but I can't - * send that request to the nodes with your data." Neither TimedOut nor Unavailable is appropriate - * to return in that situation. - * - * So instead of shutting down MS and letting StorageProxy/clients cope somehow, we shut down - * the Thrift service and then wait for all the outstanding requests to finish or timeout. + * Wait for callbacks and don't allow any more to be created (since they could require writing hints) */ - public void waitForCallbacks() + public void shutdown() { logger_.info("Waiting for messaging service to quiesce"); // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java index 1a3ebc9..028f82f 100644 --- a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Schema; import org.apache.cassandra.gms.Gossiper; import org.apache.log4j.PropertyConfigurator; @@ -391,13 +392,16 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon /** * A subclass of Java's ThreadPoolExecutor which implements Jetty's ThreadPool * interface (for integration with Avro), and performs ClientState cleanup. + * + * (Note that the tasks being executed perform their own while-command-process + * loop until the client disconnects.) */ public static class CleaningThreadPool extends ThreadPoolExecutor { private ThreadLocal<ClientState> state; public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread, int maxWorkerThreads) { - super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); + super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("Thrift")); this.state = state; } @@ -408,7 +412,5 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t); state.get().logout(); } - - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 9c1195d..33f58a0 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -323,7 +323,6 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe daemon.startRPCServer(); } - // should only be called via JMX public void stopRPCServer() { if (daemon == null) @@ -347,7 +346,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe Gossiper.instance.unregister(migrationManager); Gossiper.instance.unregister(this); Gossiper.instance.stop(); - MessagingService.instance().waitForCallbacks(); + MessagingService.instance().shutdown(); // give it a second so that task accepted before the MessagingService shutdown gets submitted to the stage (to avoid RejectedExecutionException) try { Thread.sleep(1000L); } catch (InterruptedException e) {} StageManager.shutdownNow(); @@ -449,7 +448,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // In-progress writes originating here could generate hints to be written, so shut down MessagingService // before mutation stage, so we can get all the hints saved before shutting down - MessagingService.instance().waitForCallbacks(); + MessagingService.instance().shutdown(); mutationStage.shutdown(); mutationStage.awaitTermination(3600, TimeUnit.SECONDS); StorageProxy.instance.verifyNoHintsInProgress(); @@ -2110,7 +2109,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe public void run() { Gossiper.instance.stop(); - MessagingService.instance().waitForCallbacks(); + MessagingService.instance().shutdown(); StageManager.shutdownNow(); setMode(Mode.DECOMMISSIONED, true); // let op be responsible for killing the process @@ -2512,7 +2511,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe Gossiper.instance.stop(); setMode(Mode.DRAINING, "shutting down MessageService", false); - MessagingService.instance().waitForCallbacks(); + MessagingService.instance().shutdown(); setMode(Mode.DRAINING, "waiting for streaming", false); MessagingService.instance().waitForStreaming(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java index c9a5f5b..161ff12 100644 --- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java +++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java @@ -119,27 +119,24 @@ public class CustomTThreadPoolServer extends TServer } executorService_.shutdown(); - - // Loop until awaitTermination finally does return without a interrupted - // exception. If we don't do this, then we'll shut down prematurely. We want - // to let the executorService clear it's task queue, closing client sockets - // appropriately. - long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); - long now = System.currentTimeMillis(); - while (timeoutMS >= 0) - { - try - { - executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); - break; - } - catch (InterruptedException ix) - { - long newnow = System.currentTimeMillis(); - timeoutMS -= (newnow - now); - now = newnow; - } - } + // Thrift's default shutdown waits for the WorkerProcess threads to complete. We do not, + // because doing that allows a client to hold our shutdown "hostage" by simply not sending + // another message after stop is called (since process will block indefinitely trying to read + // the next meessage header). + // + // The "right" fix would be to update thrift to set a socket timeout on client connections + // (and tolerate unintentional timeouts until stopped_ is set). But this requires deep + // changes to the code generator, so simply setting these threads to daemon (in our custom + // CleaningThreadPool) and ignoring them after shutdown is good enough. + // + // Remember, our goal on shutdown is not necessarily that each client request we receive + // gets answered first [to do that, you should redirect clients to a different coordinator + // first], but rather (1) to make sure that for each update we ack as successful, we generate + // hints for any non-responsive replicas, and (2) to make sure that we quickly stop + // accepting client connections so shutdown can continue. Not waiting for the WorkerProcess + // threads here accomplishes (2); MessagingService's shutdown method takes care of (1). + // + // See CASSANDRA-3335 and CASSANDRA-3727. } public void stop() @@ -184,7 +181,9 @@ public class CustomTThreadPoolServer extends TServer inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); // we check stopped_ first to make sure we're not supposed to be shutting - // down. this is necessary for graceful shutdown. + // down. this is necessary for graceful shutdown. (but not sufficient, + // since process() can take arbitrarily long waiting for client input. + // See comments at the end of serve().) while (!stopped_ && processor.process(inputProtocol, outputProtocol)) { inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/utils/ExpiringMap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java index ffd3c2e..0672259 100644 --- a/src/java/org/apache/cassandra/utils/ExpiringMap.java +++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java @@ -30,6 +30,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap; public class ExpiringMap<K, V> { private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class); + private volatile boolean shutdown; private static class CacheableObject<T> { @@ -104,6 +105,7 @@ public class ExpiringMap<K, V> public void shutdown() { + shutdown = true; while (!cache.isEmpty()) { logger.trace("Waiting for {} entries before shutting down ExpiringMap", cache.size()); @@ -131,6 +133,21 @@ public class ExpiringMap<K, V> public V put(K key, V value, long timeout) { + if (shutdown) + { + // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out." + // So we'll just sit on this thread until the rest of the server shutdown completes. + // + // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727. + try + { + Thread.sleep(Long.MAX_VALUE); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + } CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout)); return (previous == null) ? null : previous.getValue(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index 3034ab3..7ee7d69 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -85,7 +85,7 @@ public class RemoveTest extends CleanupHelper { SinkManager.clear(); MessagingService.instance().clearCallbacksUnsafe(); - MessagingService.instance().waitForCallbacks(); + MessagingService.instance().shutdown(); ss.setPartitionerUnsafe(oldPartitioner); }
