This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dc9789  Use ByteBuf for entrylogger reads
1dc9789 is described below

commit 1dc9789e89a6bf167ed965edb1ab51fbc449d897
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Oct 18 09:24:17 2017 +0200

    Use ByteBuf for entrylogger reads
    
    Original patch from merlimat in yahoo-4.3 branch
    
    Author: Matteo Merli <[email protected]>
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Ivan Kelly 
<[email protected]>
    
    This patch had conflicts when merged, resolved by
    Committer: Enrico Olivelli <[email protected]>
    
    This closes #640 from ivankelly/refcount-for-read
---
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 35 ++++++++++++----------
 .../bookie/InterleavedLedgerStorage.java           |  4 +--
 .../org/apache/bookkeeper/bookie/EntryLogTest.java | 12 ++++----
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |  6 ++--
 4 files changed, 31 insertions(+), 26 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 61aa91b..717a8ae 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.bookie;
 import static com.google.common.base.Charsets.UTF_8;
 import static 
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
@@ -854,10 +856,10 @@ public class EntryLogger {
         return logChannel.position() + size > Integer.MAX_VALUE;
     }
 
-    byte[] readEntry(long ledgerId, long entryId, long location) throws 
IOException, Bookie.NoEntryException {
+    ByteBuf readEntry(long ledgerId, long entryId, long location) throws 
IOException, Bookie.NoEntryException {
         long entryLogId = logIdForOffset(location);
         long pos = location & 0xffffffffL;
-        ByteBuffer sizeBuff = ByteBuffer.allocate(4);
+        RecyclableByteBuffer sizeBuff = RecyclableByteBuffer.get();
         pos -= 4; // we want to get the ledgerId and length to check
         BufferedReadChannel fc;
         try {
@@ -868,26 +870,29 @@ public class EntryLogger {
             newe.setStackTrace(e.getStackTrace());
             throw newe;
         }
-        if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) != 
sizeBuff.capacity()) {
-            throw new Bookie.NoEntryException("Short read from entrylog " + 
entryLogId, ledgerId, entryId);
+
+        if (readFromLogChannel(entryLogId, fc, sizeBuff.buffer, pos) != 
sizeBuff.buffer.capacity()) {
+            throw new Bookie.NoEntryException("Short read from entrylog " + 
entryLogId,
+                                              ledgerId, entryId);
         }
         pos += 4;
-        sizeBuff.flip();
-        int entrySize = sizeBuff.getInt();
+        sizeBuff.buffer.flip();
+        int entrySize = sizeBuff.buffer.getInt();
+        sizeBuff.recycle();
+
         // entrySize does not include the ledgerId
         if (entrySize > maxSaneEntrySize) {
             LOG.warn("Sanity check failed for entry size of " + entrySize + " 
at location " + pos + " in "
                     + entryLogId);
-
         }
         if (entrySize < MIN_SANE_ENTRY_SIZE) {
             LOG.error("Read invalid entry length {}", entrySize);
             throw new IOException("Invalid entry length " + entrySize);
         }
-        byte data[] = new byte[entrySize];
-        ByteBuffer buff = ByteBuffer.wrap(data);
-        int rc = readFromLogChannel(entryLogId, fc, buff, pos);
-        if (rc != data.length) {
+
+        ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entrySize, 
entrySize);
+        int rc = readFromLogChannel(entryLogId, fc, data.nioBuffer(0, 
entrySize), pos);
+        if (rc != entrySize) {
             // Note that throwing NoEntryException here instead of IOException 
is not
             // without risk. If all bookies in a quorum throw this same 
exception
             // the client will assume that it has reached the end of the 
ledger.
@@ -897,15 +902,15 @@ public class EntryLogger {
             // returning NoEntryException is mostly safe.
             throw new Bookie.NoEntryException("Short read for " + ledgerId + 
"@"
                                               + entryId + " in " + entryLogId 
+ "@"
-                                              + pos + "(" + rc + "!=" + 
data.length + ")", ledgerId, entryId);
+                                              + pos + "(" + rc + "!=" + 
entrySize + ")", ledgerId, entryId);
         }
-        buff.flip();
-        long thisLedgerId = buff.getLong();
+        data.writerIndex(entrySize);
+        long thisLedgerId = data.getLong(0);
         if (thisLedgerId != ledgerId) {
             throw new IOException("problem found in " + entryLogId + "@" + 
entryId + " at position + " + pos
                     + " entry belongs to " + thisLedgerId + " not " + 
ledgerId);
         }
-        long thisEntryId = buff.getLong();
+        long thisEntryId = data.getLong(8);
         if (thisEntryId != entryId) {
             throw new IOException("problem found in " + entryLogId + "@" + 
entryId + " at position + " + pos
                     + " entry is " + thisEntryId + " not " + entryId);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index f773e05..19a6105 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -316,9 +316,9 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
         startTimeNanos = MathUtils.nowInNano();
         success = false;
         try {
-            byte[] retBytes = entryLogger.readEntry(ledgerId, entryId, offset);
+            ByteBuf retBytes = entryLogger.readEntry(ledgerId, entryId, 
offset);
             success = true;
-            return Unpooled.wrappedBuffer(retBytes);
+            return retBytes;
         } finally {
             if (success) {
                 
getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 95a5ed3..e939123 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -152,12 +152,12 @@ public class EntryLogTest {
         for (int i=0; i<2*numLogs; i++) {
             for (int j=0; j<numEntries; j++) {
                 String expectedValue = "ledger-" + i + "-" + j;
-                byte[] value = newLogger.readEntry(i, j, positions[i][j]);
-                ByteBuffer buf = ByteBuffer.wrap(value);
-                long ledgerId = buf.getLong();
-                long entryId = buf.getLong();
-                byte[] data = new byte[buf.remaining()];
-                buf.get(data);
+                ByteBuf value = newLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = value.readLong();
+                long entryId = value.readLong();
+                byte[] data = new byte[value.readableBytes()];
+                value.readBytes(data);
+                value.release();
                 assertEquals(i, ledgerId);
                 assertEquals(j, entryId);
                 assertEquals(expectedValue, new String(data));
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 3f5d565..5f62ecc 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -303,9 +303,9 @@ public class LedgerCacheTest {
 
         assertFalse("After flush index file should be changed", 
before.equals(after));
         // Verify written entries
-        Assert.assertArrayEquals(generateEntry(1, 1).array(), 
ledgerStorage.getEntry(1, 1).array());
-        Assert.assertArrayEquals(generateEntry(1, 2).array(), 
ledgerStorage.getEntry(1, 2).array());
-        Assert.assertArrayEquals(generateEntry(1, 3).array(), 
ledgerStorage.getEntry(1, 3).array());
+        Assert.assertEquals(generateEntry(1, 1), ledgerStorage.getEntry(1, 1));
+        Assert.assertEquals(generateEntry(1, 2), ledgerStorage.getEntry(1, 2));
+        Assert.assertEquals(generateEntry(1, 3), ledgerStorage.getEntry(1, 3));
     }
 
     /**

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to