This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.6 by this push:
new cc5770d Issue #1195 Handle optional fields gracefully in
ReadExplicitLac response
cc5770d is described below
commit cc5770d60f06e09f3b032dcdec266234a20eab97
Author: JV Jujjuri <[email protected]>
AuthorDate: Mon Feb 26 00:23:10 2018 -0800
Issue #1195 Handle optional fields gracefully in ReadExplicitLac response
Descriptions of the changes in this PR:
ExplicitLac and header of the last entry are optional fields in the
protocol.
Handle them as optionals in the response processing.
TestPendingReadLacOp.java test added by: Samuel Just <sjustsalesforce.com>
Signed-off-by: Venkateswararao Jujjuri (JV) <vjujjurisalesforce.com>
Signed-off-by: Samuel Just <sjustsalesforce.com>
Master Issue: #1195
Author: JV Jujjuri <[email protected]>
Author: JV <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1196 from jvrao/bk-issue-1195-lacfix, closes #1195
(cherry picked from commit 8f6373e60fd77f3c6253657b5eff9b970a0d2143)
Signed-off-by: Sijie Guo <[email protected]>
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 28 +++---
.../apache/bookkeeper/client/PendingReadLacOp.java | 20 ++--
.../bookkeeper/proto/ReadLacProcessorV3.java | 24 ++++-
.../apache/bookkeeper/client/BookKeeperTest.java | 18 ++--
.../bookkeeper/client/TestPendingReadLacOp.java | 112 +++++++++++++++++++++
5 files changed, 167 insertions(+), 35 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 2207fc5..e523b37 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -884,7 +884,7 @@ public class LedgerHandle implements WriteHandle {
/**
* Add entry asynchronously to an open ledger, using an offset and range.
* This can be used only with {@link LedgerHandleAdv} returned through
- * ledgers created with {@link BookKeeper#createLedgerAdv(int, int, int,
DigestType, byte[])}.
+ * ledgers created with {@link createLedgerAdv(int, int, int, DigestType,
byte[])}.
*
* @param entryId
* entryId of the entry to add.
@@ -1275,11 +1275,12 @@ public class LedgerHandle implements WriteHandle {
/**
* Obtains asynchronously the explicit last add confirmed from a quorum of
- * bookies. This call obtains the the explicit last add confirmed each
- * bookie has received for this ledger and returns the maximum. If in the
- * write LedgerHandle, explicitLAC feature is not enabled then this will
- * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit
- * lastaddconfirmed is greater than getLastAddConfirmed, then it updates
the
+ * bookies. This call obtains Explicit LAC value and piggy-backed LAC
value (just like
+ * {@Link #asyncReadLastConfirmed(ReadLastConfirmedCallback, Object)})
from each
+ * bookie in the ensemble and returns the maximum.
+ * If in the write LedgerHandle, explicitLAC feature is not enabled then
this call behavior
+ * will be similar to {@Link
#asyncReadLastConfirmed(ReadLastConfirmedCallback, Object)}.
+ * If the read explicit lastaddconfirmed is greater than
getLastAddConfirmed, then it updates the
* lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it
* returns the value of the last add confirmed from the metadata.
*
@@ -1320,13 +1321,13 @@ public class LedgerHandle implements WriteHandle {
new PendingReadLacOp(this, innercb).initiate();
}
- /**
+ /*
* Obtains synchronously the explicit last add confirmed from a quorum of
- * bookies. This call obtains the the explicit last add confirmed each
- * bookie has received for this ledger and returns the maximum. If in the
- * write LedgerHandle, explicitLAC feature is not enabled then this will
- * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit
- * lastaddconfirmed is greater than getLastAddConfirmed, then it updates
the
+ * bookies. This call obtains Explicit LAC value and piggy-backed LAC
value (just like
+ * {@Link #readLastAddConfirmed()) from each bookie in the ensemble and
returns the maximum.
+ * If in the write LedgerHandle, explicitLAC feature is not enabled then
this call behavior
+ * will be similar to {@Link #readLastAddConfirmed()}.
+ * If the read explicit lastaddconfirmed is greater than
getLastAddConfirmed, then it updates the
* lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it
* returns the value of the last add confirmed from the metadata.
*
@@ -1334,8 +1335,7 @@ public class LedgerHandle implements WriteHandle {
*
* @return The entry id of the explicit last confirmed write or
* {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} if no entry has been
- * confirmed or if explicitLAC feature is not enabled in write
- * LedgerHandle.
+ * confirmed.
* @throws InterruptedException
* @throws BKException
*/
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index 1dffb15..7d6a12e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -94,16 +94,22 @@ class PendingReadLacOp implements ReadLacCallback {
// This routine picks both of them and compares to return
// the latest Lac.
+ // lacBuffer and lastEntryBuffer are optional in the protocol.
+ // So check if they exist before processing them.
+
// Extract lac from FileInfo on the ledger.
- long lac = lh.macManager.verifyDigestAndReturnLac(lacBuffer);
- if (lac > maxLac) {
- maxLac = lac;
+ if (lacBuffer != null && lacBuffer.readableBytes() > 0) {
+ long lac =
lh.macManager.verifyDigestAndReturnLac(lacBuffer);
+ if (lac > maxLac) {
+ maxLac = lac;
+ }
}
-
// Extract lac from last entry on the disk
- RecoveryData recoveryData =
lh.macManager.verifyDigestAndReturnLastConfirmed(lastEntryBuffer);
- if (recoveryData.lastAddConfirmed > maxLac) {
- maxLac = recoveryData.lastAddConfirmed;
+ if (lastEntryBuffer != null && lastEntryBuffer.readableBytes()
> 0) {
+ RecoveryData recoveryData =
lh.macManager.verifyDigestAndReturnLastConfirmed(lastEntryBuffer);
+ if (recoveryData.lastAddConfirmed > maxLac) {
+ maxLac = recoveryData.lastAddConfirmed;
+ }
}
heardValidResponse = true;
} catch (BKDigestMatchException e) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index 01ebead..72a62ed 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -65,13 +65,9 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3
implements Runnable {
ByteBuf lastEntry = null;
ByteBuf lac = null;
try {
- lastEntry = requestProcessor.bookie.readEntry(ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED);
lac = requestProcessor.bookie.getExplicitLac(ledgerId);
if (lac != null) {
readLacResponse.setLacBody(ByteString.copyFrom(lac.nioBuffer()));
-
readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry.nioBuffer()));
- } else {
- status = StatusCode.ENOENTRY;
}
} catch (Bookie.NoLedgerException e) {
status = StatusCode.ENOLEDGER;
@@ -80,10 +76,28 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3
implements Runnable {
status = StatusCode.EIO;
logger.error("IOException while performing readLac from ledger:
{}", ledgerId);
} finally {
- ReferenceCountUtil.release(lastEntry);
ReferenceCountUtil.release(lac);
}
+ try {
+ lastEntry = requestProcessor.bookie.readEntry(ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED);
+ if (lastEntry != null) {
+
readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry.nioBuffer()));
+ }
+ } catch (Bookie.NoLedgerException e) {
+ status = StatusCode.ENOLEDGER;
+ logger.error("No ledger found while trying to read last entry:
{}", ledgerId, e);
+ } catch (IOException e) {
+ status = StatusCode.EIO;
+ logger.error("IOException while trying to read last entry: {}",
ledgerId, e);
+ } finally {
+ ReferenceCountUtil.release(lastEntry);
+ }
+
+ if ((lac == null) && (lastEntry == null)) {
+ status = StatusCode.ENOENTRY;
+ }
+
if (status == StatusCode.EOK) {
requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 3527575..aa57d32 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -340,13 +340,6 @@ public class BookKeeperTest extends
BookKeeperClusterTestCase {
}
Thread.sleep(3000);
- // since explicitlacflush policy is not enabled for writeledgerhandle,
when we try
- // to read explicitlac for rlh, it will be
LedgerHandle.INVALID_ENTRY_ID. But it
- // wont throw some exception.
- long explicitlac = rlh.readExplicitLastConfirmed();
- Assert.assertTrue(
- "Expected Explicit LAC of rlh: " +
LedgerHandle.INVALID_ENTRY_ID + " actual ExplicitLAC of rlh: " + explicitlac,
- (explicitlac == LedgerHandle.INVALID_ENTRY_ID));
Assert.assertTrue(
"Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual
LAC of rlh: " + wlh.getLastAddConfirmed(),
(wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
@@ -354,9 +347,16 @@ public class BookKeeperTest extends
BookKeeperClusterTestCase {
"Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of
rlh: " + rlh.getLastAddConfirmed(),
(rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+ // since explicitlacflush policy is not enabled for writeledgerhandle,
when we try
+ // to read explicitlac for rlh, it will be reading up to the piggyback
value.
+ long explicitlac = rlh.readExplicitLastConfirmed();
+ assertTrue(
+ "Expected Explicit LAC of rlh: " + (numOfEntries - 2) + "
actual ExplicitLAC of rlh: " + explicitlac,
+ (explicitlac == (2 * numOfEntries - 2)));
+
try {
- rlh.readEntries(numOfEntries - 1, numOfEntries - 1);
- fail("rlh readEntries beyond " + (numOfEntries - 2) + " should
fail with ReadException");
+ rlh.readEntries(2 * numOfEntries - 1, 2 * numOfEntries - 1);
+ fail("rlh readEntries beyond " + (2 * numOfEntries - 2) + " should
fail with ReadException");
} catch (BKException.BKReadException readException) {
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
new file mode 100644
index 0000000..c534fdd
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests PendingReadLacOp internals.
+ */
+public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestPendingReadLacOp.class);
+ byte pwd[] = "asdf".getBytes();
+ byte data[] = "foo".getBytes();
+
+ public TestPendingReadLacOp() {
+ super(3);
+ }
+
+ @Test
+ public void testPendingReadLacOpMissingExplicitLAC() throws Exception {
+ LedgerHandle lh = bkc.createLedger(3, 3, 2,
BookKeeper.DigestType.CRC32, pwd);
+ lh.append(data);
+ lh.append(data);
+ lh.append(data);
+
+ final CompletableFuture<Long> result = new CompletableFuture<>();
+ PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) ->
result.complete(lac)) {
+ @Override
+ public void initiate() {
+ for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+ final int index = i;
+ ByteBuf buffer =
lh.getDigestManager().computeDigestAndPackageForSending(
+ 2,
+ 1,
+ data.length,
+ Unpooled.wrappedBuffer(data));
+ bkc.scheduler.schedule(() -> {
+ readLacComplete(
+ 0,
+ lh.getId(),
+ null,
+ Unpooled.copiedBuffer(buffer),
+ index);
+
+ }, 0, TimeUnit.SECONDS);
+
lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+ lh.ledgerId, this, i);
+ }
+ }
+ };
+ pro.initiate();
+ assertEquals(1, result.get().longValue());
+ }
+
+ @Test
+ public void testPendingReadLacOpMissingLAC() throws Exception {
+ LedgerHandle lh = bkc.createLedger(3, 3, 2, BookKeeper.DigestType.MAC,
pwd);
+ lh.append(data);
+ lh.append(data);
+ lh.append(data);
+
+ final CompletableFuture<Long> result = new CompletableFuture<>();
+ PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) ->
result.complete(lac)) {
+ @Override
+ public void initiate() {
+ for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+ final int index = i;
+ ByteBuf buffer =
lh.getDigestManager().computeDigestAndPackageForSendingLac(1);
+ bkc.scheduler.schedule(() -> {
+ readLacComplete(
+ 0,
+ lh.getId(),
+ buffer,
+ null,
+ index);
+ }, 0, TimeUnit.SECONDS);
+
lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+ lh.ledgerId, this, i);
+ }
+ }
+ };
+ pro.initiate();
+ assertEquals(result.get().longValue(), 1);
+ }
+}
--
To stop receiving notification emails like this one, please contact
[email protected].