This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e6a2a13  ISSUE #1067: PendingReadOp: recovery, return NoSuchEntry on 
wQ-aQ+1 errors
e6a2a13 is described below

commit e6a2a13876fe676b8fcafa56c3568ea450ad5f67
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Fri Feb 2 16:26:45 2018 -0800

    ISSUE #1067: PendingReadOp: recovery, return NoSuchEntry on wQ-aQ+1 errors
    
    In the case of a recovery read, we rely on a NoSuchEntry response on the
    first missing entry to determine the final LAC.  As such, we need to
    consider an entry to be gone once we see wQ-aQ+1 NoSuchEntry/Ledger
    responses from bookies.  Otherwise, a zombie bookie can prevent ledger
    recovery from suceeding indefinitely (it'll hit the timeout each time).
    
    There was some preexisting logic from
    https://issues.apache.org/jira/browse/BOOKKEEPER-365
    b6c1a8bbd7c2d44c2edb59d9938fa073f6f478de,
    but it seems to have been too conservative in that it waited for all
    responses and required that there were no other errors present.  It
    seems now to be unnecessary, and so has been removed.
    
    TestParallelRead.testFailParallelReadMissingEntryImmediately seemed to
    rely on the previous logic working outside of recovery, but I believe it
    was really meant to test the recovery case, so it has been adjusted.
    
    (bug W-4651456)
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Author: Samuel Just <sj...@salesforce.com>
    
    Reviewers: Samuel Just <sj...@salesforce.com>, Ivan Kelly 
<iv...@apache.org>, Sijie Guo <si...@apache.org>, Venkateswararao Jujjuri (JV) 
<None>
    
    This closes #1077 from athanatos/forupstream/wip-4651456-recovery-read, 
closes #1067
---
 .../apache/bookkeeper/client/PendingReadOp.java    | 43 +++++++++++-----------
 .../client/ReadLastConfirmedAndEntryOp.java        |  2 +-
 .../org/apache/bookkeeper/client/TestFencing.java  | 35 ++++++++++++++++++
 .../apache/bookkeeper/client/TestParallelRead.java |  4 +-
 4 files changed, 60 insertions(+), 24 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 041ccd1..58ee31d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -74,7 +74,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
     OpStatsLogger readOpLogger;
     private final Counter speculativeReadCounter;
 
-    final int maxMissedReadsAllowed;
+    final int requiredBookiesMissingEntryForRecovery;
     final boolean isRecoveryRead;
     boolean parallelRead = false;
     final AtomicBoolean complete = new AtomicBoolean(false);
@@ -85,7 +85,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
 
         int rc = BKException.Code.OK;
         int firstError = BKException.Code.OK;
-        int numMissedEntryReads = 0;
+        int numBookiesMissingEntry = 0;
 
         final ArrayList<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
@@ -200,7 +200,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
             }
             if (BKException.Code.NoSuchEntryException == rc
                 || BKException.Code.NoSuchLedgerExistsException == rc) {
-                ++numMissedEntryReads;
+                ++numBookiesMissingEntry;
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("No such entry found on bookie.  L{} E{} bookie: 
{}",
                             lh.ledgerId, entryImpl.getEntryId(), host);
@@ -298,14 +298,15 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         @Override
         synchronized void logErrorAndReattemptRead(int bookieIndex, 
BookieSocketAddress host, String errMsg, int rc) {
             super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
-            --numPendings;
             // if received all responses or this entry doesn't meet quorum 
write, complete the request.
-            if (numMissedEntryReads > maxMissedReadsAllowed || numPendings == 
0) {
-                if (BKException.Code.BookieHandleNotAvailableException == 
firstError
-                    && numMissedEntryReads > maxMissedReadsAllowed) {
-                    firstError = BKException.Code.NoSuchEntryException;
-                }
 
+            --numPendings;
+            if (isRecoveryRead && numBookiesMissingEntry >= 
requiredBookiesMissingEntryForRecovery) {
+                /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies 
report that they do not
+                 * have the entry */
+                fail(BKException.Code.NoSuchEntryException);
+            } else if (numPendings == 0) {
+                // if received all responses, complete the request.
                 fail(firstError);
             }
         }
@@ -364,8 +365,9 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
             BitSet sentTo = getSentToBitSet();
             sentTo.and(heardFrom);
 
-            // only send another read, if we have had no response at all (even 
for other entries)
-            // from any of the other bookies we have sent the request to
+            // only send another read if we have had no successful response at 
all
+            // (even for other entries) from any of the other bookies we have 
sent the
+            // request to
             if (sentTo.cardinality() == 0) {
                 speculativeReadCounter.inc();
                 return sendNextRead();
@@ -383,14 +385,6 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
             if (nextReplicaIndexToReadFrom >= 
getLedgerMetadata().getWriteQuorumSize()) {
                 // we are done, the read has failed from all replicas, just 
fail the
                 // read
-
-                // Do it a bit pessimistically, only when finished trying all 
replicas
-                // to check whether we received more missed reads than 
maxMissedReadsAllowed
-                if (BKException.Code.BookieHandleNotAvailableException == 
firstError
-                    && numMissedEntryReads > maxMissedReadsAllowed) {
-                    firstError = BKException.Code.NoSuchEntryException;
-                }
-
                 fail(firstError);
                 return null;
             }
@@ -424,6 +418,13 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
             }
             erroredReplicas.set(replica);
 
+            if (isRecoveryRead && (numBookiesMissingEntry >= 
requiredBookiesMissingEntryForRecovery)) {
+                /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies 
report that they do not
+                 * have the entry */
+                fail(BKException.Code.NoSuchEntryException);
+                return;
+            }
+
             if (!readsOutstanding()) {
                 sendNextRead();
             }
@@ -471,8 +472,8 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         this.scheduler = scheduler;
         this.isRecoveryRead = isRecoveryRead;
         numPendingEntries = endEntryId - startEntryId + 1;
-        maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
-                - getLedgerMetadata().getAckQuorumSize();
+        requiredBookiesMissingEntryForRecovery = 
getLedgerMetadata().getWriteQuorumSize()
+                - getLedgerMetadata().getAckQuorumSize() + 1;
         heardFromHosts = new HashSet<>();
         heardFromHostsBitSet = new 
BitSet(getLedgerMetadata().getEnsembleSize());
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index aba3a17..a8bc84f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -360,7 +360,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
                 // read
 
                 // Do it a bit pessimistically, only when finished trying all 
replicas
-                // to check whether we received more missed reads than 
maxMissedReadsAllowed
+                // to check whether we received more missed reads than 
requiredBookiesMissingEntryForRecovery
                 if (BKException.Code.BookieHandleNotAvailableException == 
firstError
                         && numMissedEntryReads > maxMissedReadsAllowed) {
                     firstError = BKException.Code.NoSuchEntryException;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
index c067689..d0787d5 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
@@ -335,6 +335,41 @@ public class TestFencing extends BookKeeperClusterTestCase 
{
     }
 
     /**
+     * create a ledger and write entries.
+     * sleep a bookie
+     * Ensure that fencing proceeds even with the bookie sleeping
+     */
+    @Test
+    public void testFencingWithHungBookie() throws Exception {
+        LedgerHandle writelh = bkc.createLedger(digestType, 
"testPasswd".getBytes());
+
+        String tmp = "Foobar";
+
+        final int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            writelh.addEntry(tmp.getBytes());
+        }
+
+        CountDownLatch sleepLatch = new CountDownLatch(1);
+        sleepBookie(writelh.getLedgerMetadata().getEnsembles().get(0L).get(1), 
sleepLatch);
+
+        LedgerHandle readlh = bkc.openLedger(writelh.getId(),
+                                             digestType, 
"testPasswd".getBytes());
+
+        try {
+            writelh.addEntry(tmp.getBytes());
+            LOG.error("Should have thrown an exception");
+            fail("Should have thrown an exception when trying to write");
+        } catch (BKException.BKLedgerFencedException e) {
+            // correct behaviour
+        }
+
+        sleepLatch.countDown();
+        readlh.close();
+        writelh.close();
+    }
+
+    /**
      * Test that fencing doesn't work with a bad password.
      */
     @Test
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index a12b6dc..d73a973 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -139,7 +139,7 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
     }
 
     @Test
-    public void testFailParallelReadMissingEntryImmediately() throws Exception 
{
+    public void testFailParallelRecoveryReadMissingEntryImmediately() throws 
Exception {
         int numEntries = 1;
 
         long id = getLedgerToRead(5, 5, 3, numEntries);
@@ -160,7 +160,7 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         sleepBookie(ensemble.get(1), latch2);
 
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 10, 10);
+                new PendingReadOp(lh, lh.bk.scheduler, 10, 10, true);
         readOp.parallelRead(true).submit();
         // would fail immediately if found missing entries don't cover ack 
quorum
         expectFail(readOp.future(), Code.NoSuchEntryException);

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to