sijie closed pull request #1077: ISSUE #1067: PendingReadOp: recovery, return 
NoSuchEntry on wQ-aQ+1 errors
URL: https://github.com/apache/bookkeeper/pull/1077
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 041ccd1d2..58ee31dcd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -74,7 +74,7 @@
     OpStatsLogger readOpLogger;
     private final Counter speculativeReadCounter;
 
-    final int maxMissedReadsAllowed;
+    final int requiredBookiesMissingEntryForRecovery;
     final boolean isRecoveryRead;
     boolean parallelRead = false;
     final AtomicBoolean complete = new AtomicBoolean(false);
@@ -85,7 +85,7 @@
 
         int rc = BKException.Code.OK;
         int firstError = BKException.Code.OK;
-        int numMissedEntryReads = 0;
+        int numBookiesMissingEntry = 0;
 
         final ArrayList<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
@@ -200,7 +200,7 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, 
BookieSocketAddress
             }
             if (BKException.Code.NoSuchEntryException == rc
                 || BKException.Code.NoSuchLedgerExistsException == rc) {
-                ++numMissedEntryReads;
+                ++numBookiesMissingEntry;
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("No such entry found on bookie.  L{} E{} bookie: 
{}",
                             lh.ledgerId, entryImpl.getEntryId(), host);
@@ -298,14 +298,15 @@ void read() {
         @Override
         synchronized void logErrorAndReattemptRead(int bookieIndex, 
BookieSocketAddress host, String errMsg, int rc) {
             super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
-            --numPendings;
             // if received all responses or this entry doesn't meet quorum 
write, complete the request.
-            if (numMissedEntryReads > maxMissedReadsAllowed || numPendings == 
0) {
-                if (BKException.Code.BookieHandleNotAvailableException == 
firstError
-                    && numMissedEntryReads > maxMissedReadsAllowed) {
-                    firstError = BKException.Code.NoSuchEntryException;
-                }
 
+            --numPendings;
+            if (isRecoveryRead && numBookiesMissingEntry >= 
requiredBookiesMissingEntryForRecovery) {
+                /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies 
report that they do not
+                 * have the entry */
+                fail(BKException.Code.NoSuchEntryException);
+            } else if (numPendings == 0) {
+                // if received all responses, complete the request.
                 fail(firstError);
             }
         }
@@ -364,8 +365,9 @@ synchronized BookieSocketAddress 
maybeSendSpeculativeRead(BitSet heardFrom) {
             BitSet sentTo = getSentToBitSet();
             sentTo.and(heardFrom);
 
-            // only send another read, if we have had no response at all (even 
for other entries)
-            // from any of the other bookies we have sent the request to
+            // only send another read if we have had no successful response at 
all
+            // (even for other entries) from any of the other bookies we have 
sent the
+            // request to
             if (sentTo.cardinality() == 0) {
                 speculativeReadCounter.inc();
                 return sendNextRead();
@@ -383,14 +385,6 @@ synchronized BookieSocketAddress sendNextRead() {
             if (nextReplicaIndexToReadFrom >= 
getLedgerMetadata().getWriteQuorumSize()) {
                 // we are done, the read has failed from all replicas, just 
fail the
                 // read
-
-                // Do it a bit pessimistically, only when finished trying all 
replicas
-                // to check whether we received more missed reads than 
maxMissedReadsAllowed
-                if (BKException.Code.BookieHandleNotAvailableException == 
firstError
-                    && numMissedEntryReads > maxMissedReadsAllowed) {
-                    firstError = BKException.Code.NoSuchEntryException;
-                }
-
                 fail(firstError);
                 return null;
             }
@@ -424,6 +418,13 @@ synchronized void logErrorAndReattemptRead(int 
bookieIndex, BookieSocketAddress
             }
             erroredReplicas.set(replica);
 
+            if (isRecoveryRead && (numBookiesMissingEntry >= 
requiredBookiesMissingEntryForRecovery)) {
+                /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies 
report that they do not
+                 * have the entry */
+                fail(BKException.Code.NoSuchEntryException);
+                return;
+            }
+
             if (!readsOutstanding()) {
                 sendNextRead();
             }
@@ -471,8 +472,8 @@ boolean complete(int bookieIndex, BookieSocketAddress host, 
ByteBuf buffer) {
         this.scheduler = scheduler;
         this.isRecoveryRead = isRecoveryRead;
         numPendingEntries = endEntryId - startEntryId + 1;
-        maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
-                - getLedgerMetadata().getAckQuorumSize();
+        requiredBookiesMissingEntryForRecovery = 
getLedgerMetadata().getWriteQuorumSize()
+                - getLedgerMetadata().getAckQuorumSize() + 1;
         heardFromHosts = new HashSet<>();
         heardFromHostsBitSet = new 
BitSet(getLedgerMetadata().getEnsembleSize());
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index aba3a17d4..a8bc84f4e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -360,7 +360,7 @@ synchronized BookieSocketAddress sendNextRead() {
                 // read
 
                 // Do it a bit pessimistically, only when finished trying all 
replicas
-                // to check whether we received more missed reads than 
maxMissedReadsAllowed
+                // to check whether we received more missed reads than 
requiredBookiesMissingEntryForRecovery
                 if (BKException.Code.BookieHandleNotAvailableException == 
firstError
                         && numMissedEntryReads > maxMissedReadsAllowed) {
                     firstError = BKException.Code.NoSuchEntryException;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
index c067689b8..d0787d515 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
@@ -334,6 +334,41 @@ public void testFencingInteractionWithBookieRecovery2() 
throws Exception {
         writelh.close();
     }
 
+    /**
+     * create a ledger and write entries.
+     * sleep a bookie
+     * Ensure that fencing proceeds even with the bookie sleeping
+     */
+    @Test
+    public void testFencingWithHungBookie() throws Exception {
+        LedgerHandle writelh = bkc.createLedger(digestType, 
"testPasswd".getBytes());
+
+        String tmp = "Foobar";
+
+        final int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            writelh.addEntry(tmp.getBytes());
+        }
+
+        CountDownLatch sleepLatch = new CountDownLatch(1);
+        sleepBookie(writelh.getLedgerMetadata().getEnsembles().get(0L).get(1), 
sleepLatch);
+
+        LedgerHandle readlh = bkc.openLedger(writelh.getId(),
+                                             digestType, 
"testPasswd".getBytes());
+
+        try {
+            writelh.addEntry(tmp.getBytes());
+            LOG.error("Should have thrown an exception");
+            fail("Should have thrown an exception when trying to write");
+        } catch (BKException.BKLedgerFencedException e) {
+            // correct behaviour
+        }
+
+        sleepLatch.countDown();
+        readlh.close();
+        writelh.close();
+    }
+
     /**
      * Test that fencing doesn't work with a bad password.
      */
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index a12b6dce4..d73a97375 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -139,7 +139,7 @@ public void testParallelReadMissingEntries() throws 
Exception {
     }
 
     @Test
-    public void testFailParallelReadMissingEntryImmediately() throws Exception 
{
+    public void testFailParallelRecoveryReadMissingEntryImmediately() throws 
Exception {
         int numEntries = 1;
 
         long id = getLedgerToRead(5, 5, 3, numEntries);
@@ -160,7 +160,7 @@ public void testFailParallelReadMissingEntryImmediately() 
throws Exception {
         sleepBookie(ensemble.get(1), latch2);
 
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 10, 10);
+                new PendingReadOp(lh, lh.bk.scheduler, 10, 10, true);
         readOp.parallelRead(true).submit();
         // would fail immediately if found missing entries don't cover ack 
quorum
         expectFail(readOp.future(), Code.NoSuchEntryException);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to