This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new f782a9d ISSUE #1578: Fixed deadlock in auditor blocking ZK thread
f782a9d is described below
commit f782a9d818a12479d08c580a68b2566715da4c89
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Aug 21 11:42:14 2018 -0700
ISSUE #1578: Fixed deadlock in auditor blocking ZK thread
### Motivation
Fixes #1578
After getting ZK callback from ZK event thread, we need to jump to a
background thread before doing synchronous call to
`admin.openLedgerNoRecovery(ledgerId);` which will try to make a ZK request a
wait for a response (which would be coming through same ZK event thread
currently blocked..)
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #1608 from merlimat/fix-auditor-deadlock, closes #1578
---
.../org/apache/bookkeeper/replication/Auditor.java | 83 ++++++++++++----------
1 file changed, 44 insertions(+), 39 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 15868d9..8578a5b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -640,47 +641,51 @@ public class Auditor {
return;
}
- LedgerHandle lh = null;
- try {
- lh = admin.openLedgerNoRecovery(ledgerId);
- checker.checkLedger(lh,
- new ProcessLostFragmentsCb(lh, callback),
- conf.getAuditorLedgerVerificationPercentage());
- // we collect the following stats to get a measure of
the
- // distribution of a single ledger within the bk
cluster
- // the higher the number of fragments/bookies, the
more distributed it is
-
numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
-
numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
- numLedgersChecked.inc();
- } catch (BKException.BKNoSuchLedgerExistsException bknsle)
{
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ledger was deleted before we could
check it", bknsle);
- }
- callback.processResult(BKException.Code.OK,
- null, null);
- return;
- } catch (BKException bke) {
- LOG.error("Couldn't open ledger " + ledgerId, bke);
-
callback.processResult(BKException.Code.BookieHandleNotAvailableException,
- null, null);
- return;
- } catch (InterruptedException ie) {
- LOG.error("Interrupted opening ledger", ie);
- Thread.currentThread().interrupt();
-
callback.processResult(BKException.Code.InterruptedException, null, null);
- return;
- } finally {
- if (lh != null) {
- try {
- lh.close();
- } catch (BKException bke) {
- LOG.warn("Couldn't close ledger " + ledgerId,
bke);
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted closing ledger " +
ledgerId, ie);
- Thread.currentThread().interrupt();
+ // Do not perform blocking calls that involve making ZK
calls from within the ZK
+ // event thread. Jump to background thread instead to
avoid deadlock.
+ ForkJoinPool.commonPool().execute(() -> {
+ LedgerHandle lh = null;
+ try {
+ lh = admin.openLedgerNoRecovery(ledgerId);
+ checker.checkLedger(lh,
+ new ProcessLostFragmentsCb(lh, callback),
+
conf.getAuditorLedgerVerificationPercentage());
+ // we collect the following stats to get a measure
of the
+ // distribution of a single ledger within the bk
cluster
+ // the higher the number of fragments/bookies, the
more distributed it is
+
numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
+
numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
+ numLedgersChecked.inc();
+ } catch (BKException.BKNoSuchLedgerExistsException
bknsle) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ledger was deleted before we could
check it", bknsle);
+ }
+ callback.processResult(BKException.Code.OK,
+ null, null);
+ return;
+ } catch (BKException bke) {
+ LOG.error("Couldn't open ledger " + ledgerId, bke);
+
callback.processResult(BKException.Code.BookieHandleNotAvailableException,
+ null, null);
+ return;
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted opening ledger", ie);
+ Thread.currentThread().interrupt();
+
callback.processResult(BKException.Code.InterruptedException, null, null);
+ return;
+ } finally {
+ if (lh != null) {
+ try {
+ lh.close();
+ } catch (BKException bke) {
+ LOG.warn("Couldn't close ledger " +
ledgerId, bke);
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted closing ledger " +
ledgerId, ie);
+ Thread.currentThread().interrupt();
+ }
}
}
- }
+ });
}
};