This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 45e4c5e  [MERGE YAHOO REPO] CMS-1599: make sure we always release the 
underreplicated lock
45e4c5e is described below

commit 45e4c5e1c26284635e1eeac392cd896392e2084e
Author: Siddharth Boobna <[email protected]>
AuthorDate: Fri Jan 19 01:19:34 2018 -0800

    [MERGE YAHOO REPO] CMS-1599: make sure we always release the 
underreplicated lock
    
    Descriptions of the changes in this PR:
    
    CMS-1599: make sure we always release the underreplicated lock
    
    Here is original commit:
    https://github.com/yahoo/bookkeeper/commit/850122ed
    
    Author: Siddharth Boobna <[email protected]>
    Author: Jia Zhai <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1010 from jiazhai/cherry_picks/i_120
---
 .../bookkeeper/replication/ReplicationWorker.java  | 82 +++++++++++++---------
 1 file changed, 48 insertions(+), 34 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 014e73c..cd9bce4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -213,59 +213,73 @@ public class ReplicationWorker implements Runnable {
         }
     }
 
+    private void logBKExceptionAndReleaseLedger(BKException e, long 
ledgerIdToReplicate)
+        throws UnavailableException {
+        LOG.info("{} while"
+                + " rereplicating ledger {}."
+                + " Enough Bookies might not have available"
+                + " So, no harm to continue",
+            e.getClass().getSimpleName(),
+            ledgerIdToReplicate);
+        underreplicationManager
+            .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+        getExceptionCounter(e.getClass().getSimpleName()).inc();
+    }
+
     private boolean rereplicate(long ledgerIdToReplicate) throws 
InterruptedException, BKException,
             UnavailableException {
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("Going to replicate the fragments of the ledger: {}", 
ledgerIdToReplicate);
         }
+
+        boolean deferLedgerLockRelease = false;
+
         try (LedgerHandle lh = 
admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
-            Set<LedgerFragment> fragmentsBeforeReplicate = 
getUnderreplicatedFragments(lh);
+            Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Founds fragments {} for replication from ledger: 
{}",
-                    fragmentsBeforeReplicate, ledgerIdToReplicate);
+                LOG.debug("Founds fragments {} for replication from ledger: 
{}", fragments, ledgerIdToReplicate);
             }
 
             boolean foundOpenFragments = false;
-            long numFragsReplicated = 0;
-            for (LedgerFragment ledgerFragment : fragmentsBeforeReplicate) {
+            for (LedgerFragment ledgerFragment : fragments) {
                 if (!ledgerFragment.isClosed()) {
                     foundOpenFragments = true;
                     continue;
                 }
-                LOG.info("Going to replicate the fragments of the ledger: {}", 
ledgerIdToReplicate);
-                admin.replicateLedgerFragment(lh, ledgerFragment);
-                numFragsReplicated++;
-            }
-
-            if (numFragsReplicated > 0) {
-                numLedgersReplicated.inc();
+                try {
+                    admin.replicateLedgerFragment(lh, ledgerFragment);
+                } catch (BKException.BKBookieHandleNotAvailableException e) {
+                    LOG.warn("BKBookieHandleNotAvailableException while 
replicating the fragment", e);
+                } catch (BKException.BKLedgerRecoveryException e) {
+                    LOG.warn("BKLedgerRecoveryException while replicating the 
fragment", e);
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    LOG.warn("BKNotEnoughBookiesException while replicating 
the fragment", e);
+                }
             }
 
             if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
+                deferLedgerLockRelease = true;
                 deferLedgerLockRelease(ledgerIdToReplicate);
                 return false;
             }
 
-            Set<LedgerFragment> fragmentsAfterReplicate = 
getUnderreplicatedFragments(lh);
-            if (fragmentsAfterReplicate.size() == 0) {
-                LOG.info("Ledger {} is replicated successfully.", 
ledgerIdToReplicate);
+            fragments = getUnderreplicatedFragments(lh);
+            if (fragments.size() == 0) {
+                LOG.info("Ledger replicated successfully. ledger id is: " + 
ledgerIdToReplicate);
                 
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
                 return true;
             } else {
-                LOG.info("Fail to replicate ledger {}.", ledgerIdToReplicate);
                 // Releasing the underReplication ledger lock and compete
                 // for the replication again for the pending fragments
-                underreplicationManager
-                        .releaseUnderreplicatedLedger(ledgerIdToReplicate);
                 return false;
             }
+
         } catch (BKNoSuchLedgerExistsException e) {
             // Ledger might have been deleted by user
             LOG.info("BKNoSuchLedgerExistsException while opening "
-                    + "ledger {} for replication. Other clients "
-                    + "might have deleted the ledger. "
-                    + "So, no harm to continue", ledgerIdToReplicate);
+                + "ledger {} for replication. Other clients "
+                + "might have deleted the ledger. "
+                + "So, no harm to continue", ledgerIdToReplicate);
             underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
             getExceptionCounter("BKNoSuchLedgerExistsException").inc();
             return false;
@@ -275,21 +289,21 @@ public class ReplicationWorker implements Runnable {
         } catch (BKException e) {
             logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
             return false;
+        } finally {
+            // we make sure we always release the underreplicated lock, unless 
we decided to defer it. If the lock has
+            // already been released, this is a no-op
+            if (!deferLedgerLockRelease) {
+                try {
+                    
underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
+                } catch (UnavailableException e) {
+                    LOG.error("UnavailableException while releasing the 
underreplicated lock for ledger {}:",
+                        ledgerIdToReplicate, e);
+                    shutdown();
+                }
+            }
         }
     }
 
-    private void logBKExceptionAndReleaseLedger(BKException e, long 
ledgerIdToReplicate)
-            throws UnavailableException {
-        LOG.info("{} while"
-                + " rereplicating ledger {}."
-                + " Enough Bookies might not have available"
-                + " So, no harm to continue",
-            e.getClass().getSimpleName(),
-            ledgerIdToReplicate);
-        underreplicationManager
-                .releaseUnderreplicatedLedger(ledgerIdToReplicate);
-        getExceptionCounter(e.getClass().getSimpleName()).inc();
-    }
 
     /**
      * When checking the fragments of a ledger, there is a corner case

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to