Updated Branches: refs/heads/trunk 488129d95 -> e1b199c05
merge from 1.0 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1b199c0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1b199c0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1b199c0 Branch: refs/heads/trunk Commit: e1b199c054c51fa73b1d73cc4171c3e663d61dec Parents: 488129d 02346a1 Author: Jonathan Ellis <[email protected]> Authored: Wed Dec 28 09:52:48 2011 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Wed Dec 28 09:52:48 2011 -0600 ---------------------------------------------------------------------- .../org/apache/cassandra/net/MessagingService.java | 1 + .../apache/cassandra/service/StorageService.java | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1b199c0/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index 2d393d6,1526fa3..7511293 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -513,44 -463,15 +513,45 @@@ public final class MessagingService imp subscribers.add(subcriber); } - public void waitForStreaming() throws InterruptedException + public void clearCallbacksUnsafe() { - streamExecutor_.shutdown(); - streamExecutor_.awaitTermination(24, TimeUnit.HOURS); + callbacks.clear(); } - public void clearCallbacksUnsafe() + public void waitForStreaming() throws InterruptedException { - callbacks.clear(); + while (true) + { + boolean stillWaiting = false; + + streamExecutorsLock.lock(); + try + { + for (DebuggableThreadPoolExecutor e : streamExecutors.values()) + { ++ e.shutdown() + if (!e.isTerminated()) + { + stillWaiting = true; + break; + } + } + } + finally + { + streamExecutorsLock.unlock(); + } + if (stillWaiting) + { + // Up to a second of unneeded delay is acceptable, relative to the amount of time a typical stream + // takes. + Thread.sleep(1000); + } + else + { + break; + } + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1b199c0/src/java/org/apache/cassandra/service/StorageService.java ----------------------------------------------------------------------
