This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 86f58e8 [BOOKIE-LEDGER-RECOVERY] Fix bookie recovery stuck even with
enough ack-quorum response
86f58e8 is described below
commit 86f58e8a7269c8e51f246386968ceed051375b8f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Mar 23 09:02:26 2020 -0700
[BOOKIE-LEDGER-RECOVERY] Fix bookie recovery stuck even with enough
ack-quorum response
### Motivation
As discussed at https://github.com/apache/pulsar/issues/6505
Bk-client was not able to recover ledger which has 2 write/ack quorum and
one of the bookie went down and recovery was kept failing and bookkeeper client
was not able to recover the ledger.
**BK-Client log**
```
20:44:43.721 [BookKeeperClientWorker-OrderedExecutor-1-0] ERROR
org.apache.bookkeeper.client.ReadLastConfirmedOp - While readLastConfirmed
ledger: 1234567 did not hear success responses from all quorums
20:44:43.721 [bookkeeper-io-12-27] ERROR
org.apache.bookkeeper.proto.PerChannelBookieClient - Could not connect to
bookie: [id: 0xb8b97441, L:/1.1.1.1:1234]/1.1.1.2:3181, current s
tate CONNECTING :
io.netty.channel.AbstractChannel$AnnotatedConnectException:
finishConnect(..) failed: No route to host: /1.1.1.2:3181
at
io.netty.channel.unix.Errors.throwConnectException(Errors.java:112)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.unix.Socket.finishConnect(Socket.java:269)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:665)
[netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
at
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:642)
[netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
at
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:522)
[netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
at
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:423)
[netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
at
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330)
[netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
[netty-common-4.1.31.Final.jar:4.1.31.Final]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[netty-common-4.1.31.Final.jar:4.1.31.Final]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.net.ConnectException: finishConnect(..) failed: No route to
host
```
**Ledger metadata**
```
BookieMetadataFormatVersion 2
quorumSize: 2
ensembleSize: 2
length: 0
lastEntryId: -1
state: IN_RECOVERY
segment {
ensembleMember: "1.1.1.1:3181"
ensembleMember: "1.1.1.2:3181"
firstEntryId: 0
}
digestType: CRC32
```
**Root cause:**
Bookie should be able to recover ledger once it receives the response from
total N (`(Qw - Qa)+1`) bookies. But it was waiting for a successful response
from both quorums.
Reference: https://bookkeeper.apache.org/docs/4.5.0/development/protocol/
### Modification
Bookie should be able to recover ledger once it receives the response from
total N (`(Qw - Qa)+1`) bookies.
Reviewers: Diego Salvi <[email protected]>, Enrico Olivelli
<[email protected]>, Sijie Guo <[email protected]>
This closes #2281 from rdhabalia/recover_q
---
.../bookkeeper/client/ReadLastConfirmedOp.java | 12 ++-
.../client/ParallelLedgerRecoveryTest.java | 90 ++++++++++++++++++++++
.../apache/bookkeeper/test/BookieFailureTest.java | 18 ++---
3 files changed, 107 insertions(+), 13 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index dfbc167..2b75403 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -38,6 +38,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
LedgerHandle lh;
BookieClient bookieClient;
int numResponsesPending;
+ int numSuccessfulResponse;
RecoveryData maxRecoveredData;
volatile boolean completed = false;
int lastSeenError = BKException.Code.ReadException;
@@ -59,6 +60,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
this.bookieClient = bookieClient;
this.maxRecoveredData = new
RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
this.lh = lh;
+ this.numSuccessfulResponse = 0;
this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
this.coverageSet = lh.distributionSchedule.getCoverageSet();
this.currentEnsemble = ensemble;
@@ -99,6 +101,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
maxRecoveredData = recoveryData;
}
heardValidResponse = true;
+ numSuccessfulResponse++;
} catch (BKDigestMatchException e) {
// Too bad, this bookie didn't give us a valid answer, we
// still might be able to recover though so continue
@@ -137,8 +140,15 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
}
if (numResponsesPending == 0 && !completed) {
+ int totalExepctedResponse =
lh.getLedgerMetadata().getWriteQuorumSize()
+ - lh.getLedgerMetadata().getAckQuorumSize() + 1;
+ if (numSuccessfulResponse >= totalExepctedResponse) {
+ cb.readLastConfirmedDataComplete(BKException.Code.OK,
maxRecoveredData);
+ return;
+ }
// Have got all responses back but was still not enough, just fail
the operation
- LOG.error("While readLastConfirmed ledger: {} did not hear success
responses from all quorums", ledgerId);
+ LOG.error("While readLastConfirmed ledger: {} did not hear success
responses from all quorums {}", ledgerId,
+ lastSeenError);
cb.readLastConfirmedDataComplete(lastSeenError, maxRecoveredData);
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 177cbe3..ab69fa7 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -66,6 +66,7 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
@@ -693,4 +694,93 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
readBk.close();
}
+ /**
+ * Validate ledger can recover with response: (Qw - Qa)+1.
+ * @throws Exception
+ */
+ @Test
+ public void testRecoveryWithUnavailableBookie() throws Exception {
+
+ byte[] passwd = "".getBytes(UTF_8);
+ ClientConfiguration newConf = new ClientConfiguration();
+ newConf.addConfiguration(baseClientConf);
+ final BookKeeper readBk = new BookKeeper(newConf);
+ final BookKeeper newBk0 = new BookKeeper(newConf);
+
+ /**
+ * Test Group-1 : Expected Response for recovery: Qr = (Qw - Qa)+1 = (3
+ * -2) + 1 = 2
+ */
+ int ensembleSize = 3;
+ int writeQuorumSize = 3;
+ int ackQuormSize = 2;
+ LedgerHandle lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize,
ackQuormSize, DigestType.DUMMY, passwd);
+ LedgerHandle readLh = readBk.openLedgerNoRecovery(lh0.getId(),
DigestType.DUMMY, passwd);
+ // Test 1: bookie response: OK, NO_SUCH_LEDGER_EXISTS, NOT_AVAILABLE
+ // Expected: Recovery successful Q(response) = 2
+ int responseCode = readLACFromQuorum(readLh,
BKException.Code.BookieHandleNotAvailableException,
+ BKException.Code.OK,
BKException.Code.NoSuchLedgerExistsException);
+ assertEquals(responseCode, BKException.Code.OK);
+ // Test 2: bookie response: OK, NOT_AVAILABLE, NOT_AVAILABLE
+ // Expected: Recovery fail Q(response) = 1
+ responseCode = readLACFromQuorum(readLh,
BKException.Code.BookieHandleNotAvailableException,
+ BKException.Code.OK,
BKException.Code.BookieHandleNotAvailableException);
+ assertEquals(responseCode,
BKException.Code.BookieHandleNotAvailableException);
+
+ /**
+ * Test Group-2 : Expected Response for recovery: Qr = (Qw - Qa)+1 = (2
+ * -2) + 1 = 1
+ */
+ ensembleSize = 2;
+ writeQuorumSize = 2;
+ ackQuormSize = 2;
+ lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize, ackQuormSize,
DigestType.DUMMY, passwd);
+ readLh = readBk.openLedgerNoRecovery(lh0.getId(), DigestType.DUMMY,
passwd);
+ // Test 1: bookie response: OK, NOT_AVAILABLE
+ // Expected: Recovery successful Q(response) = 1
+ responseCode = readLACFromQuorum(readLh,
BKException.Code.BookieHandleNotAvailableException,
+ BKException.Code.OK);
+ assertEquals(responseCode, BKException.Code.OK);
+
+ // Test 1: bookie response: OK, NO_SUCH_LEDGER_EXISTS
+ // Expected: Recovery successful Q(response) = 2
+ responseCode = readLACFromQuorum(readLh,
BKException.Code.NoSuchLedgerExistsException, BKException.Code.OK);
+ assertEquals(responseCode, BKException.Code.OK);
+
+ // Test 3: bookie response: NOT_AVAILABLE, NOT_AVAILABLE
+ // Expected: Recovery fail Q(response) = 0
+ responseCode = readLACFromQuorum(readLh,
BKException.Code.BookieHandleNotAvailableException,
+ BKException.Code.BookieHandleNotAvailableException);
+ assertEquals(responseCode,
BKException.Code.BookieHandleNotAvailableException);
+
+ newBk0.close();
+ readBk.close();
+ }
+
+ private int readLACFromQuorum(LedgerHandle ledger, int...
bookieLACResponse) throws Exception {
+ MutableInt responseCode = new MutableInt(100);
+ CountDownLatch responseLatch = new CountDownLatch(1);
+ ReadLastConfirmedOp readLCOp = new ReadLastConfirmedOp(ledger,
bkc.getBookieClient(),
+
ledger.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(),
+ new ReadLastConfirmedOp.LastConfirmedDataCallback() {
+ @Override
+ public void readLastConfirmedDataComplete(int rc,
DigestManager.RecoveryData data) {
+ System.out.println("response = " + rc);
+ responseCode.setValue(rc);
+ responseLatch.countDown();
+ }
+ });
+ byte[] lac = new byte[Long.SIZE * 3];
+ ByteBuf data = Unpooled.wrappedBuffer(lac, 0, lac.length);
+ int writerIndex = data.writerIndex();
+ data.resetWriterIndex();
+ data.writeLong(ledger.getId());
+ data.writeLong(0L);
+ data.writerIndex(writerIndex);
+ for (int i = 0; i < bookieLACResponse.length; i++) {
+ readLCOp.readEntryComplete(bookieLACResponse[i], 0, 0, data, i);
+ }
+ responseLatch.await();
+ return responseCode.intValue();
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
index 8a18127..255e4f3 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
@@ -313,12 +313,9 @@ public class BookieFailureTest extends
BookKeeperClusterTestCase
killBookie(0);
// try to open ledger no recovery
- try {
- bkc.openLedgerNoRecovery(beforelh2.getId(), digestType,
"".getBytes());
- fail("Should have thrown exception");
- } catch (BKException.BKReadException e) {
- // correct behaviour
- }
+ // should be able to open ledger with one bookie down:
+ // Total bookies available 3 >= 1 (Qw(4) - Qa(4) + 1)
+ bkc.openLedgerNoRecovery(beforelh2.getId(), digestType, "".getBytes());
}
@Test
@@ -351,12 +348,9 @@ public class BookieFailureTest extends
BookKeeperClusterTestCase
killBookie(0);
// try to open ledger no recovery
- try {
- bkc.openLedger(beforelh2.getId(), digestType, "".getBytes());
- fail("Should have thrown exception");
- } catch (BKException.BKLedgerRecoveryException e) {
- // correct behaviour
- }
+ // should be able to open ledger with one bookie down:
+ // Total bookies available 3 >= 1 (Qw(4) - Qa(4) + 1)
+ bkc.openLedger(beforelh2.getId(), digestType, "".getBytes());
}
/**