This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 8c10e56 V2 decoder marks the reads index of the payload on write
8c10e56 is described below
commit 8c10e56db285788eeb17a4290a417a0745114a77
Author: Ivan Kelly <[email protected]>
AuthorDate: Mon Feb 19 18:26:03 2018 -0800
V2 decoder marks the reads index of the payload on write
This means that any storage implementation that receives that payload,
can reset the reader index, without jumping into bytes it knows
nothing about.
This was the source of an issue when using V2 protocol with
DbLedgerStorage where the data could not be successfully read back
from the ledger.
Author: Ivan Kelly <[email protected]>
Reviewers: Matteo Merli <[email protected]>, Sijie Guo <[email protected]>
This closes #1180 from ivankelly/broken-v2-rocks
---
.../bookie/storage/ldb/DbLedgerStorage.java | 5 ++---
.../bookkeeper/proto/BookieProtoEncoding.java | 3 +++
.../storage/ldb/DbLedgerStorageBookieTest.java | 23 ++++++++++++++++++++++
3 files changed, 28 insertions(+), 3 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index ec7a3b7..59d2b1b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -308,9 +308,8 @@ public class DbLedgerStorage implements
CompactableLedgerStorage {
public long addEntry(ByteBuf entry) throws IOException {
long startTime = MathUtils.nowInNano();
- long ledgerId = entry.readLong();
- long entryId = entry.readLong();
- entry.resetReaderIndex();
+ long ledgerId = entry.getLong(entry.readerIndex());
+ long entryId = entry.getLong(entry.readerIndex() + 8);
if (log.isDebugEnabled()) {
log.debug("Add entry. {}@{}", ledgerId, entryId);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 24ef117..375caba 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -167,6 +167,9 @@ public class BookieProtoEncoding {
// Read ledger and entry id without advancing the reader index
ledgerId = packet.getLong(packet.readerIndex());
entryId = packet.getLong(packet.readerIndex() + 8);
+ // mark the reader index so that any resets will return to the
+ // start of the payload
+ packet.markReaderIndex();
return BookieProtocol.ParsedAddRequest.create(
version, ledgerId, entryId, flags,
masterKey, packet.retain());
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
index da7f32a..a2162bf 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
@@ -22,15 +22,21 @@ package org.apache.bookkeeper.bookie.storage.ldb;
import static org.junit.Assert.assertEquals;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Unit test for {@link DbLedgerStorageBookieTest}.
*/
public class DbLedgerStorageBookieTest extends BookKeeperClusterTestCase {
+ static final Logger LOG =
LoggerFactory.getLogger(DbLedgerStorageBookieTest.class);
public DbLedgerStorageBookieTest() {
super(1);
@@ -49,4 +55,21 @@ public class DbLedgerStorageBookieTest extends
BookKeeperClusterTestCase {
assertEquals(0, lh2.getLength());
assertEquals(-1, lh2.getLastAddConfirmed());
}
+
+ @Test
+ public void testV2ReadWrite() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setUseV2WireProtocol(true);
+ conf.setZkServers(zkUtil.getZooKeeperConnectString());
+
+ BookKeeper bkc = new BookKeeper(conf);
+ LedgerHandle lh1 = bkc.createLedger(1, 1, DigestType.CRC32, new
byte[0]);
+ lh1.addEntry("Foobar".getBytes());
+ lh1.close();
+
+ LedgerHandle lh2 = bkc.openLedger(lh1.getId(), DigestType.CRC32, new
byte[0]);
+ assertEquals(0, lh2.getLastAddConfirmed());
+ assertEquals(new String(lh2.readEntries(0,
0).nextElement().getEntry()),
+ "Foobar");
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].