Repository: bookkeeper Updated Branches: refs/heads/master d86351371 -> 5d43260e8
BOOKKEEPER-1061: BookieWatcher should not do ZK blocking operations from ZK async callback thread In some cases, the BookieWatcher can get the ZK event thread stuck. This happens when a ZK blocking request is issued from a ZK callback thread. We should decouple the blocking requests in a separate executor to avoid deadlocking ZK client. Author: Matteo Merli <[email protected]> Reviewers: Jia Zhai <None>, Sijie Guo <[email protected]> Closes #149 from merlimat/bookie-watcher-thread Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/5d43260e Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/5d43260e Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/5d43260e Branch: refs/heads/master Commit: 5d43260e84c6121df72c0ee4c844651bfb726638 Parents: d863513 Author: Matteo Merli <[email protected]> Authored: Mon May 15 12:35:26 2017 -0700 Committer: Sijie Guo <[email protected]> Committed: Mon May 15 12:35:26 2017 -0700 ---------------------------------------------------------------------- .../apache/bookkeeper/client/BookieWatcher.java | 29 +++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5d43260e/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java index ae0ac50..e15f49a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java @@ -17,6 +17,8 @@ */ package org.apache.bookkeeper.client; +import static org.apache.bookkeeper.util.SafeRunnable.safeRun; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -183,14 +185,18 @@ class BookieWatcher implements Watcher, ChildrenCallback { HashSet<BookieSocketAddress> newBookieAddrs = convertToBookieAddresses(children); - synchronized (this) { - Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies(); - placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies); - if (bk.conf.getDiskWeightBasedPlacementEnabled()) { - // start collecting bookieInfo for the newly joined bookies, if any - bk.bookieInfoReader.availableBookiesChanged(newBookieAddrs); + // Update watcher outside ZK callback thread, to avoid deadlock in case some other + // component is trying to do a blocking ZK operation + bk.mainWorkerPool.submitOrdered(path, safeRun(() -> { + synchronized (BookieWatcher.this) { + Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies(); + placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies); + if (bk.conf.getDiskWeightBasedPlacementEnabled()) { + // start collecting bookieInfo for the newly joined bookies, if any + bk.bookieInfoReader.availableBookiesChanged(newBookieAddrs); + } } - } + })); // we don't need to close clients here, because: // a. the dead bookies will be removed from topology, which will not be used in new ensemble. @@ -236,13 +242,10 @@ class BookieWatcher implements Watcher, ChildrenCallback { final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); readBookies(new ChildrenCallback() { public void processResult(int rc, String path, Object ctx, List<String> children) { - try { + bk.mainWorkerPool.submitOrdered(path, safeRun(() -> { BookieWatcher.this.processResult(rc, path, ctx, children); - queue.put(rc); - } catch (InterruptedException e) { - logger.error("Interruped when trying to read bookies in a blocking fashion"); - throw new RuntimeException(e); - } + queue.add(rc); + })); } }); int rc = queue.take();
