This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 45e4c5e [MERGE YAHOO REPO] CMS-1599: make sure we always release the
underreplicated lock
45e4c5e is described below
commit 45e4c5e1c26284635e1eeac392cd896392e2084e
Author: Siddharth Boobna <[email protected]>
AuthorDate: Fri Jan 19 01:19:34 2018 -0800
[MERGE YAHOO REPO] CMS-1599: make sure we always release the
underreplicated lock
Descriptions of the changes in this PR:
CMS-1599: make sure we always release the underreplicated lock
Here is original commit:
https://github.com/yahoo/bookkeeper/commit/850122ed
Author: Siddharth Boobna <[email protected]>
Author: Jia Zhai <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1010 from jiazhai/cherry_picks/i_120
---
.../bookkeeper/replication/ReplicationWorker.java | 82 +++++++++++++---------
1 file changed, 48 insertions(+), 34 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 014e73c..cd9bce4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -213,59 +213,73 @@ public class ReplicationWorker implements Runnable {
}
}
+ private void logBKExceptionAndReleaseLedger(BKException e, long
ledgerIdToReplicate)
+ throws UnavailableException {
+ LOG.info("{} while"
+ + " rereplicating ledger {}."
+ + " Enough Bookies might not have available"
+ + " So, no harm to continue",
+ e.getClass().getSimpleName(),
+ ledgerIdToReplicate);
+ underreplicationManager
+ .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+ getExceptionCounter(e.getClass().getSimpleName()).inc();
+ }
+
private boolean rereplicate(long ledgerIdToReplicate) throws
InterruptedException, BKException,
UnavailableException {
-
if (LOG.isDebugEnabled()) {
LOG.debug("Going to replicate the fragments of the ledger: {}",
ledgerIdToReplicate);
}
+
+ boolean deferLedgerLockRelease = false;
+
try (LedgerHandle lh =
admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
- Set<LedgerFragment> fragmentsBeforeReplicate =
getUnderreplicatedFragments(lh);
+ Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
if (LOG.isDebugEnabled()) {
- LOG.debug("Founds fragments {} for replication from ledger:
{}",
- fragmentsBeforeReplicate, ledgerIdToReplicate);
+ LOG.debug("Founds fragments {} for replication from ledger:
{}", fragments, ledgerIdToReplicate);
}
boolean foundOpenFragments = false;
- long numFragsReplicated = 0;
- for (LedgerFragment ledgerFragment : fragmentsBeforeReplicate) {
+ for (LedgerFragment ledgerFragment : fragments) {
if (!ledgerFragment.isClosed()) {
foundOpenFragments = true;
continue;
}
- LOG.info("Going to replicate the fragments of the ledger: {}",
ledgerIdToReplicate);
- admin.replicateLedgerFragment(lh, ledgerFragment);
- numFragsReplicated++;
- }
-
- if (numFragsReplicated > 0) {
- numLedgersReplicated.inc();
+ try {
+ admin.replicateLedgerFragment(lh, ledgerFragment);
+ } catch (BKException.BKBookieHandleNotAvailableException e) {
+ LOG.warn("BKBookieHandleNotAvailableException while
replicating the fragment", e);
+ } catch (BKException.BKLedgerRecoveryException e) {
+ LOG.warn("BKLedgerRecoveryException while replicating the
fragment", e);
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ LOG.warn("BKNotEnoughBookiesException while replicating
the fragment", e);
+ }
}
if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
+ deferLedgerLockRelease = true;
deferLedgerLockRelease(ledgerIdToReplicate);
return false;
}
- Set<LedgerFragment> fragmentsAfterReplicate =
getUnderreplicatedFragments(lh);
- if (fragmentsAfterReplicate.size() == 0) {
- LOG.info("Ledger {} is replicated successfully.",
ledgerIdToReplicate);
+ fragments = getUnderreplicatedFragments(lh);
+ if (fragments.size() == 0) {
+ LOG.info("Ledger replicated successfully. ledger id is: " +
ledgerIdToReplicate);
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
return true;
} else {
- LOG.info("Fail to replicate ledger {}.", ledgerIdToReplicate);
// Releasing the underReplication ledger lock and compete
// for the replication again for the pending fragments
- underreplicationManager
- .releaseUnderreplicatedLedger(ledgerIdToReplicate);
return false;
}
+
} catch (BKNoSuchLedgerExistsException e) {
// Ledger might have been deleted by user
LOG.info("BKNoSuchLedgerExistsException while opening "
- + "ledger {} for replication. Other clients "
- + "might have deleted the ledger. "
- + "So, no harm to continue", ledgerIdToReplicate);
+ + "ledger {} for replication. Other clients "
+ + "might have deleted the ledger. "
+ + "So, no harm to continue", ledgerIdToReplicate);
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
getExceptionCounter("BKNoSuchLedgerExistsException").inc();
return false;
@@ -275,21 +289,21 @@ public class ReplicationWorker implements Runnable {
} catch (BKException e) {
logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
return false;
+ } finally {
+ // we make sure we always release the underreplicated lock, unless
we decided to defer it. If the lock has
+ // already been released, this is a no-op
+ if (!deferLedgerLockRelease) {
+ try {
+
underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
+ } catch (UnavailableException e) {
+ LOG.error("UnavailableException while releasing the
underreplicated lock for ledger {}:",
+ ledgerIdToReplicate, e);
+ shutdown();
+ }
+ }
}
}
- private void logBKExceptionAndReleaseLedger(BKException e, long
ledgerIdToReplicate)
- throws UnavailableException {
- LOG.info("{} while"
- + " rereplicating ledger {}."
- + " Enough Bookies might not have available"
- + " So, no harm to continue",
- e.getClass().getSimpleName(),
- ledgerIdToReplicate);
- underreplicationManager
- .releaseUnderreplicatedLedger(ledgerIdToReplicate);
- getExceptionCounter(e.getClass().getSimpleName()).inc();
- }
/**
* When checking the fragments of a ledger, there is a corner case
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].