This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 86f58e8  [BOOKIE-LEDGER-RECOVERY] Fix bookie recovery stuck even with 
enough ack-quorum response
86f58e8 is described below

commit 86f58e8a7269c8e51f246386968ceed051375b8f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Mar 23 09:02:26 2020 -0700

    [BOOKIE-LEDGER-RECOVERY] Fix bookie recovery stuck even with enough 
ack-quorum response
    
    ### Motivation
    As discussed at https://github.com/apache/pulsar/issues/6505
    
    Bk-client was not able to recover ledger which has 2 write/ack quorum and 
one of the bookie went down and recovery was kept failing and bookkeeper client 
was not able to recover the ledger.
    
    **BK-Client log**
    
    ```
    20:44:43.721 [BookKeeperClientWorker-OrderedExecutor-1-0] ERROR 
org.apache.bookkeeper.client.ReadLastConfirmedOp - While readLastConfirmed 
ledger: 1234567 did not hear success responses from all quorums
    20:44:43.721 [bookkeeper-io-12-27] ERROR 
org.apache.bookkeeper.proto.PerChannelBookieClient - Could not connect to 
bookie: [id: 0xb8b97441, L:/1.1.1.1:1234]/1.1.1.2:3181, current s
    tate CONNECTING :
    io.netty.channel.AbstractChannel$AnnotatedConnectException: 
finishConnect(..) failed: No route to host: /1.1.1.2:3181
            at 
io.netty.channel.unix.Errors.throwConnectException(Errors.java:112) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.unix.Socket.finishConnect(Socket.java:269) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:665)
 [netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
            at 
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:642)
 [netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
            at 
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:522)
 [netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
            at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:423) 
[netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
            at 
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330) 
[netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
 [netty-common-4.1.31.Final.jar:4.1.31.Final]
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.31.Final.jar:4.1.31.Final]
            at java.lang.Thread.run(Thread.java:834) [?:?]
    Caused by: java.net.ConnectException: finishConnect(..) failed: No route to 
host
    ```
    
    **Ledger metadata**
    
    ```
    BookieMetadataFormatVersion 2
    quorumSize: 2
    ensembleSize: 2
    length: 0
    lastEntryId: -1
    state: IN_RECOVERY
    segment {
      ensembleMember: "1.1.1.1:3181"
      ensembleMember: "1.1.1.2:3181"
      firstEntryId: 0
    }
    digestType: CRC32
    ```
    
    **Root cause:**
    Bookie should be able to recover ledger once it receives the response from 
total N (`(Qw - Qa)+1`) bookies. But it was waiting for a successful response 
from both quorums.
    Reference: https://bookkeeper.apache.org/docs/4.5.0/development/protocol/
    
    ### Modification
    Bookie should be able to recover ledger once it receives the response from 
total N (`(Qw - Qa)+1`) bookies.
    
    
    Reviewers: Diego Salvi <[email protected]>,  Enrico Olivelli 
<[email protected]>, Sijie Guo <[email protected]>
    
    This closes #2281 from rdhabalia/recover_q
---
 .../bookkeeper/client/ReadLastConfirmedOp.java     | 12 ++-
 .../client/ParallelLedgerRecoveryTest.java         | 90 ++++++++++++++++++++++
 .../apache/bookkeeper/test/BookieFailureTest.java  | 18 ++---
 3 files changed, 107 insertions(+), 13 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index dfbc167..2b75403 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -38,6 +38,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
     LedgerHandle lh;
     BookieClient bookieClient;
     int numResponsesPending;
+    int numSuccessfulResponse;
     RecoveryData maxRecoveredData;
     volatile boolean completed = false;
     int lastSeenError = BKException.Code.ReadException;
@@ -59,6 +60,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
         this.bookieClient = bookieClient;
         this.maxRecoveredData = new 
RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
         this.lh = lh;
+        this.numSuccessfulResponse = 0;
         this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
         this.coverageSet = lh.distributionSchedule.getCoverageSet();
         this.currentEnsemble = ensemble;
@@ -99,6 +101,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
                     maxRecoveredData = recoveryData;
                 }
                 heardValidResponse = true;
+                numSuccessfulResponse++;
             } catch (BKDigestMatchException e) {
                 // Too bad, this bookie didn't give us a valid answer, we
                 // still might be able to recover though so continue
@@ -137,8 +140,15 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
         }
 
         if (numResponsesPending == 0 && !completed) {
+            int totalExepctedResponse = 
lh.getLedgerMetadata().getWriteQuorumSize()
+                    - lh.getLedgerMetadata().getAckQuorumSize() + 1;
+            if (numSuccessfulResponse >= totalExepctedResponse) {
+                cb.readLastConfirmedDataComplete(BKException.Code.OK, 
maxRecoveredData);
+                return;
+            }
             // Have got all responses back but was still not enough, just fail 
the operation
-            LOG.error("While readLastConfirmed ledger: {} did not hear success 
responses from all quorums", ledgerId);
+            LOG.error("While readLastConfirmed ledger: {} did not hear success 
responses from all quorums {}", ledgerId,
+                    lastSeenError);
             cb.readLastConfirmedDataComplete(lastSeenError, maxRecoveredData);
         }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 177cbe3..ab69fa7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -66,6 +66,7 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
@@ -693,4 +694,93 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
         readBk.close();
     }
 
+    /**
+     * Validate ledger can recover with response: (Qw - Qa)+1.
+     * @throws Exception
+     */
+    @Test
+    public void testRecoveryWithUnavailableBookie() throws Exception {
+
+        byte[] passwd = "".getBytes(UTF_8);
+        ClientConfiguration newConf = new ClientConfiguration();
+        newConf.addConfiguration(baseClientConf);
+        final BookKeeper readBk = new BookKeeper(newConf);
+        final BookKeeper newBk0 = new BookKeeper(newConf);
+
+        /**
+         * Test Group-1 : Expected Response for recovery: Qr = (Qw - Qa)+1 = (3
+         * -2) + 1 = 2
+         */
+        int ensembleSize = 3;
+        int writeQuorumSize = 3;
+        int ackQuormSize = 2;
+        LedgerHandle lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize, 
ackQuormSize, DigestType.DUMMY, passwd);
+        LedgerHandle readLh = readBk.openLedgerNoRecovery(lh0.getId(), 
DigestType.DUMMY, passwd);
+        // Test 1: bookie response: OK, NO_SUCH_LEDGER_EXISTS, NOT_AVAILABLE
+        // Expected: Recovery successful Q(response) = 2
+        int responseCode = readLACFromQuorum(readLh, 
BKException.Code.BookieHandleNotAvailableException,
+                BKException.Code.OK, 
BKException.Code.NoSuchLedgerExistsException);
+        assertEquals(responseCode, BKException.Code.OK);
+        // Test 2: bookie response: OK, NOT_AVAILABLE, NOT_AVAILABLE
+        // Expected: Recovery fail Q(response) = 1
+        responseCode = readLACFromQuorum(readLh, 
BKException.Code.BookieHandleNotAvailableException,
+                BKException.Code.OK, 
BKException.Code.BookieHandleNotAvailableException);
+        assertEquals(responseCode, 
BKException.Code.BookieHandleNotAvailableException);
+
+        /**
+         * Test Group-2 : Expected Response for recovery: Qr = (Qw - Qa)+1 = (2
+         * -2) + 1 = 1
+         */
+        ensembleSize = 2;
+        writeQuorumSize = 2;
+        ackQuormSize = 2;
+        lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize, ackQuormSize, 
DigestType.DUMMY, passwd);
+        readLh = readBk.openLedgerNoRecovery(lh0.getId(), DigestType.DUMMY, 
passwd);
+        // Test 1: bookie response: OK, NOT_AVAILABLE
+        // Expected: Recovery successful Q(response) = 1
+        responseCode = readLACFromQuorum(readLh, 
BKException.Code.BookieHandleNotAvailableException,
+                BKException.Code.OK);
+        assertEquals(responseCode, BKException.Code.OK);
+
+        // Test 1: bookie response: OK, NO_SUCH_LEDGER_EXISTS
+        // Expected: Recovery successful Q(response) = 2
+        responseCode = readLACFromQuorum(readLh, 
BKException.Code.NoSuchLedgerExistsException, BKException.Code.OK);
+        assertEquals(responseCode, BKException.Code.OK);
+
+        // Test 3: bookie response: NOT_AVAILABLE, NOT_AVAILABLE
+        // Expected: Recovery fail Q(response) = 0
+        responseCode = readLACFromQuorum(readLh, 
BKException.Code.BookieHandleNotAvailableException,
+                BKException.Code.BookieHandleNotAvailableException);
+        assertEquals(responseCode, 
BKException.Code.BookieHandleNotAvailableException);
+
+        newBk0.close();
+        readBk.close();
+    }
+
+    private int readLACFromQuorum(LedgerHandle ledger, int... 
bookieLACResponse) throws Exception {
+        MutableInt responseCode = new MutableInt(100);
+        CountDownLatch responseLatch = new CountDownLatch(1);
+        ReadLastConfirmedOp readLCOp = new ReadLastConfirmedOp(ledger, 
bkc.getBookieClient(),
+                
ledger.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(),
+                new ReadLastConfirmedOp.LastConfirmedDataCallback() {
+                    @Override
+                    public void readLastConfirmedDataComplete(int rc, 
DigestManager.RecoveryData data) {
+                        System.out.println("response = " + rc);
+                        responseCode.setValue(rc);
+                        responseLatch.countDown();
+                    }
+                });
+        byte[] lac = new byte[Long.SIZE * 3];
+        ByteBuf data = Unpooled.wrappedBuffer(lac, 0, lac.length);
+        int writerIndex = data.writerIndex();
+        data.resetWriterIndex();
+        data.writeLong(ledger.getId());
+        data.writeLong(0L);
+        data.writerIndex(writerIndex);
+        for (int i = 0; i < bookieLACResponse.length; i++) {
+            readLCOp.readEntryComplete(bookieLACResponse[i], 0, 0, data, i);
+        }
+        responseLatch.await();
+        return responseCode.intValue();
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
index 8a18127..255e4f3 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
@@ -313,12 +313,9 @@ public class BookieFailureTest extends 
BookKeeperClusterTestCase
         killBookie(0);
 
         // try to open ledger no recovery
-        try {
-            bkc.openLedgerNoRecovery(beforelh2.getId(), digestType, 
"".getBytes());
-            fail("Should have thrown exception");
-        } catch (BKException.BKReadException e) {
-            // correct behaviour
-        }
+        // should be able to open ledger with one bookie down:
+        // Total bookies available 3 >= 1 (Qw(4) - Qa(4) + 1)
+        bkc.openLedgerNoRecovery(beforelh2.getId(), digestType, "".getBytes());
     }
 
     @Test
@@ -351,12 +348,9 @@ public class BookieFailureTest extends 
BookKeeperClusterTestCase
         killBookie(0);
 
         // try to open ledger no recovery
-        try {
-            bkc.openLedger(beforelh2.getId(), digestType, "".getBytes());
-            fail("Should have thrown exception");
-        } catch (BKException.BKLedgerRecoveryException e) {
-            // correct behaviour
-        }
+        // should be able to open ledger with one bookie down:
+        // Total bookies available 3 >= 1 (Qw(4) - Qa(4) + 1)
+        bkc.openLedger(beforelh2.getId(), digestType, "".getBytes());
     }
 
     /**

Reply via email to