Repository: zookeeper Updated Branches: refs/heads/master a109b8b50 -> 061e76123
ZOOKEEPER-3183: Interrupting the WatcherCleaner thread during shutdown â¦are waiting list to add watchers during the shutdown and avoid adding the dead watchers when shut down is initiated Author: vtumati <[email protected]> Reviewers: [email protected], [email protected] Closes #689 from tumativ/ZOOKEEPER-3183 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/061e7612 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/061e7612 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/061e7612 Branch: refs/heads/master Commit: 061e76123e91db4b9c28ab77e58df1b723df00d9 Parents: a109b8b Author: vtumati <[email protected]> Authored: Wed Nov 28 14:43:41 2018 -0800 Committer: Fangmin Lyu <[email protected]> Committed: Wed Nov 28 14:43:41 2018 -0800 ---------------------------------------------------------------------- .../zookeeper/server/watch/WatcherCleaner.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/061e7612/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java index 2bfb5aa..9648848 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java @@ -50,6 +50,7 @@ public class WatcherCleaner extends Thread { private volatile boolean stopped = false; private final Object cleanEvent = new Object(); + private final Object processingCompletedEvent = new Object(); private final Random r = new Random(System.nanoTime()); private final WorkerService cleaners; @@ -102,12 +103,13 @@ public class WatcherCleaner extends Thread { totalDeadWatchers.get() >= maxInProcessingDeadWatchers) { try { RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning"); - synchronized(totalDeadWatchers) { - totalDeadWatchers.wait(100); + synchronized(processingCompletedEvent) { + processingCompletedEvent.wait(100); } } catch (InterruptedException e) { LOG.info("Got interrupted while waiting for dead watches " + "queue size"); + break; } } synchronized (this) { @@ -129,7 +131,7 @@ public class WatcherCleaner extends Thread { try { // add some jitter to avoid cleaning dead watchers at the // same time in the quorum - if (deadWatchers.size() < watcherCleanThreshold) { + if (!stopped && deadWatchers.size() < watcherCleanThreshold) { int maxWaitMs = (watcherCleanIntervalInSeconds + r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000; cleanEvent.wait(maxWaitMs); @@ -163,8 +165,8 @@ public class WatcherCleaner extends Thread { long latency = Time.currentElapsedTime() - startTime; LOG.info("Takes {} to process {} watches", latency, total); totalDeadWatchers.addAndGet(-total); - synchronized(totalDeadWatchers) { - totalDeadWatchers.notifyAll(); + synchronized(processingCompletedEvent) { + processingCompletedEvent.notifyAll(); } } }); @@ -177,6 +179,10 @@ public class WatcherCleaner extends Thread { stopped = true; deadWatchers.clear(); cleaners.stop(); + this.interrupt(); + if (LOG.isInfoEnabled()) { + LOG.info("WatcherCleaner thread shutdown is initiated"); + } } }
