This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b7bfac8  Refactored EntryLogger Buffered channel
b7bfac8 is described below

commit b7bfac88ffae8acd82cae05fe6236f5a7db3a796
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Dec 8 18:19:23 2017 -0800

    Refactored EntryLogger Buffered channel
    
    Porting 
https://github.com/yahoo/bookkeeper/commit/ecce14449b07e0b40434835bbc01d75d0a36880b
 from Yahoo branch.
    
    Refactored entry logger to always use pooled `ByteBuf` when reading entries 
or scanning the log.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie 
Guo <[email protected]>
    
    This closes #824 from merlimat/entry-logger-refactor and squashes the 
following commits:
    
    6ddb31d6 [Matteo Merli] Fix DoubleByteBuf readableBytes()
    a6df2974 [Matteo Merli] Fixed padding length and journal test
    bbe65eb8 [Matteo Merli] Removed BufferedReadChannel close since the buffer 
was not pooled anyway
    c50f4683 [Matteo Merli] Added required changes to uses of BufferedChannel 
from Journal
    6746d038 [Matteo Merli] Refactored EntryLogger Buffered channel
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  41 +--
 .../apache/bookkeeper/bookie/BufferedChannel.java  |  97 +++----
 .../bookkeeper/bookie/BufferedChannelBase.java     |   2 +-
 .../bookkeeper/bookie/BufferedReadChannel.java     |  41 +--
 .../bookkeeper/bookie/EntryLogCompactor.java       |  11 +-
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 321 ++++++++++-----------
 .../apache/bookkeeper/bookie/EntryMemTable.java    |   2 +-
 .../bookie/InterleavedLedgerStorage.java           |  10 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java |  38 ++-
 .../apache/bookkeeper/bookie/SkipListFlusher.java  |   5 +-
 .../bookkeeper/bookie/SortedLedgerStorage.java     |   3 +-
 .../bookie/TransactionalEntryLogCompactor.java     |  19 +-
 .../org/apache/bookkeeper/util/DoubleByteBuf.java  |   5 -
 .../bookkeeper/bookie/BookieJournalTest.java       |  75 +++--
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |  24 +-
 .../bookkeeper/bookie/TestEntryMemTable.java       |  13 +-
 .../org/apache/bookkeeper/bookie/UpgradeTest.java  |   5 +-
 .../apache/bookkeeper/util/DoubleByteBufTest.java  |  25 ++
 18 files changed, 374 insertions(+), 363 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index fa4d410..1d19a30 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -23,6 +23,10 @@ import static com.google.common.base.Charsets.UTF_8;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractFuture;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -52,6 +56,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
@@ -102,6 +107,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Bookie Shell is to provide utilities for users to administer a bookkeeper 
cluster.
  */
@@ -2449,7 +2455,7 @@ public class BookieShell implements Tool {
                 return true;
             }
             @Override
-            public void process(long ledgerId, long startPos, ByteBuffer 
entry) {
+            public void process(long ledgerId, long startPos, ByteBuf entry) {
                 formatEntry(startPos, entry, printMsg);
             }
         });
@@ -2476,10 +2482,9 @@ public class BookieShell implements Tool {
             }
 
             @Override
-            public void process(long ledgerId, long startPos, ByteBuffer 
entry) {
-                long entrysLedgerId = entry.getLong();
-                long entrysEntryId = entry.getLong();
-                entry.rewind();
+            public void process(long ledgerId, long startPos, ByteBuf entry) {
+                long entrysLedgerId = entry.getLong(entry.readerIndex());
+                long entrysEntryId = entry.getLong(entry.readerIndex() + 8);
                 if ((ledgerId == entrysLedgerId) && ((entrysEntryId == 
entryId)) || (entryId == -1)) {
                     entryFound.setValue(true);
                     formatEntry(startPos, entry, printMsg);
@@ -2515,12 +2520,12 @@ public class BookieShell implements Tool {
             }
 
             @Override
-            public void process(long ledgerId, long entryStartPos, ByteBuffer 
entry) {
+            public void process(long ledgerId, long entryStartPos, ByteBuf 
entry) {
                 if (!stopScanning.booleanValue()) {
                     if ((rangeEndPos != -1) && (entryStartPos > rangeEndPos)) {
                         stopScanning.setValue(true);
                     } else {
-                        int entrySize = entry.limit();
+                        int entrySize = entry.readableBytes();
                         /**
                          * entrySize of an entry (inclusive of payload and
                          * header) value is stored as int value in log file, 
but
@@ -2562,7 +2567,7 @@ public class BookieShell implements Tool {
                     System.out.println("Journal Version : " + journalVersion);
                     printJournalVersion = true;
                 }
-                formatEntry(offset, entry, printMsg);
+                formatEntry(offset, Unpooled.wrappedBuffer(entry), printMsg);
             }
         });
     }
@@ -2608,17 +2613,17 @@ public class BookieShell implements Tool {
      * @param printMsg
      *          Whether printing the message body
      */
-    private void formatEntry(long pos, ByteBuffer recBuff, boolean printMsg) {
-        long ledgerId = recBuff.getLong();
-        long entryId = recBuff.getLong();
-        int entrySize = recBuff.limit();
+    private void formatEntry(long pos, ByteBuf recBuff, boolean printMsg) {
+        int entrySize = recBuff.readableBytes();
+        long ledgerId = recBuff.readLong();
+        long entryId = recBuff.readLong();
 
         System.out.println("--------- Lid=" + ledgerId + ", Eid=" + entryId
                          + ", ByteOffset=" + pos + ", EntrySize=" + entrySize 
+ " ---------");
         if (entryId == Bookie.METAENTRY_ID_LEDGER_KEY) {
-            int masterKeyLen = recBuff.getInt();
+            int masterKeyLen = recBuff.readInt();
             byte[] masterKey = new byte[masterKeyLen];
-            recBuff.get(masterKey);
+            recBuff.readBytes(masterKey);
             System.out.println("Type:           META");
             System.out.println("MasterKey:      " + bytes2Hex(masterKey));
             System.out.println();
@@ -2631,7 +2636,7 @@ public class BookieShell implements Tool {
             return;
         }
         // process a data entry
-        long lastAddConfirmed = recBuff.getLong();
+        long lastAddConfirmed = recBuff.readLong();
         System.out.println("Type:           DATA");
         System.out.println("LastConfirmed:  " + lastAddConfirmed);
         if (!printMsg) {
@@ -2639,12 +2644,12 @@ public class BookieShell implements Tool {
             return;
         }
         // skip digest checking
-        recBuff.position(32 + 8);
+        recBuff.skipBytes(8);
         System.out.println("Data:");
         System.out.println();
         try {
-            byte[] ret = new byte[recBuff.remaining()];
-            recBuff.get(ret);
+            byte[] ret = new byte[recBuff.readableBytes()];
+            recBuff.readBytes(ret);
             formatter.formatEntry(ret);
         } catch (Exception e) {
             System.out.println("N/A. Corrupted.");
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 10affae..0d21d41 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -21,22 +21,25 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.bookkeeper.util.ZeroBuffer;
 
 /**
  * Provides a buffering layer in front of a FileChannel.
  */
-public class BufferedChannel extends BufferedReadChannel {
+public class BufferedChannel extends BufferedReadChannel implements Closeable {
     // The capacity of the write buffer.
     protected final int writeCapacity;
     // The position of the file channel's write pointer.
     protected AtomicLong writeBufferStartPosition = new AtomicLong(0);
     // The buffer used to write operations.
-    protected final ByteBuffer writeBuffer;
+    protected final ByteBuf writeBuffer;
     // The absolute position of the next write operation.
     protected volatile long position;
 
@@ -48,12 +51,15 @@ public class BufferedChannel extends BufferedReadChannel {
 
     public BufferedChannel(FileChannel fc, int writeCapacity, int 
readCapacity) throws IOException {
         super(fc, readCapacity);
-        // Set the read buffer's limit to readCapacity.
-        this.readBuffer.limit(readCapacity);
         this.writeCapacity = writeCapacity;
         this.position = fc.position();
         this.writeBufferStartPosition.set(position);
-        this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
+        this.writeBuffer = 
ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+    }
+
+    @Override
+    public void close() throws IOException {
+        writeBuffer.release();
     }
 
     /**
@@ -64,19 +70,16 @@ public class BufferedChannel extends BufferedReadChannel {
      * @param src The source ByteBuffer which contains the data to be written.
      * @throws IOException if a write operation fails.
      */
-    public synchronized void write(ByteBuffer src) throws IOException {
+    public synchronized void write(ByteBuf src) throws IOException {
         int copied = 0;
-        while (src.remaining() > 0) {
-            int truncated = 0;
-            if (writeBuffer.remaining() < src.remaining()) {
-                truncated = src.remaining() - writeBuffer.remaining();
-                src.limit(src.limit() - truncated);
-            }
-            copied += src.remaining();
-            writeBuffer.put(src);
-            src.limit(src.limit() + truncated);
+        int len = src.readableBytes();
+        while (copied < len) {
+            int bytesToCopy = Math.min(src.readableBytes() - copied, 
writeBuffer.writableBytes());
+            writeBuffer.writeBytes(src, src.readerIndex() + copied, 
bytesToCopy);
+            copied += bytesToCopy;
+
             // if we have run out of buffer space, we should flush to the file
-            if (writeBuffer.remaining() == 0) {
+            if (!writeBuffer.isWritable()) {
                 flushInternal();
             }
         }
@@ -121,10 +124,10 @@ public class BufferedChannel extends BufferedReadChannel {
      * @throws IOException if the write fails.
      */
     private void flushInternal() throws IOException {
-        writeBuffer.flip();
+        ByteBuffer toWrite = writeBuffer.internalNioBuffer(0, 
writeBuffer.writerIndex());
         do {
-            fileChannel.write(writeBuffer);
-        } while (writeBuffer.hasRemaining());
+            fileChannel.write(toWrite);
+        } while (toWrite.hasRemaining());
         writeBuffer.clear();
         writeBufferStartPosition.set(fileChannel.position());
     }
@@ -140,57 +143,41 @@ public class BufferedChannel extends BufferedReadChannel {
     }
 
     @Override
-    public synchronized int read(ByteBuffer dest, long pos) throws IOException 
{
+    public synchronized int read(ByteBuf dest, long pos, int length) throws 
IOException {
         long prevPos = pos;
-        while (dest.remaining() > 0) {
+        while (length > 0) {
             // check if it is in the write buffer
             if (writeBuffer != null && writeBufferStartPosition.get() <= pos) {
-                long positionInBuffer = pos - writeBufferStartPosition.get();
-                long bytesToCopy = writeBuffer.position() - positionInBuffer;
-                if (bytesToCopy > dest.remaining()) {
-                    bytesToCopy = dest.remaining();
-                }
+                int positionInBuffer = (int) (pos - 
writeBufferStartPosition.get());
+                int bytesToCopy = Math.min(writeBuffer.writerIndex() - 
positionInBuffer, dest.writableBytes());
+
                 if (bytesToCopy == 0) {
                     throw new IOException("Read past EOF");
                 }
-                ByteBuffer src = writeBuffer.duplicate();
-                src.position((int) positionInBuffer);
-                src.limit((int) (positionInBuffer + bytesToCopy));
-                dest.put(src);
+
+                dest.writeBytes(writeBuffer, positionInBuffer, bytesToCopy);
                 pos += bytesToCopy;
+                length -= bytesToCopy;
             } else if (writeBuffer == null && writeBufferStartPosition.get() 
<= pos) {
                 // here we reach the end
                 break;
                 // first check if there is anything we can grab from the 
readBuffer
-            } else if (readBufferStartPosition <= pos && pos < 
readBufferStartPosition + readBuffer.capacity()) {
-                long positionInBuffer = pos - readBufferStartPosition;
-                long bytesToCopy = readBuffer.capacity() - positionInBuffer;
-                if (bytesToCopy > dest.remaining()) {
-                    bytesToCopy = dest.remaining();
-                }
-                ByteBuffer src = readBuffer.duplicate();
-                src.position((int) positionInBuffer);
-                src.limit((int) (positionInBuffer + bytesToCopy));
-                dest.put(src);
+            } else if (readBufferStartPosition <= pos && pos < 
readBufferStartPosition + readBuffer.writerIndex()) {
+                int positionInBuffer = (int) (pos - readBufferStartPosition);
+                int bytesToCopy = Math.min(readBuffer.writerIndex() - 
positionInBuffer, dest.writableBytes());
+                dest.writeBytes(readBuffer, positionInBuffer, bytesToCopy);
                 pos += bytesToCopy;
+                length -= bytesToCopy;
                 // let's read it
             } else {
                 readBufferStartPosition = pos;
-                readBuffer.clear();
-                // make sure that we don't overlap with the write buffer
-                if (readBufferStartPosition + readBuffer.capacity() >= 
writeBufferStartPosition.get()) {
-                    readBufferStartPosition = writeBufferStartPosition.get() - 
readBuffer.capacity();
-                    if (readBufferStartPosition < 0) {
-                        ZeroBuffer.put(readBuffer, (int) 
-readBufferStartPosition);
-                    }
-                }
-                while (readBuffer.remaining() > 0) {
-                    if (fileChannel.read(readBuffer, readBufferStartPosition + 
readBuffer.position()) <= 0) {
-                        throw new IOException("Short read");
-                    }
+
+                int readBytes = 
fileChannel.read(readBuffer.internalNioBuffer(0, readCapacity),
+                        readBufferStartPosition);
+                if (readBytes <= 0) {
+                    throw new IOException("Reading from filechannel returned a 
non-positive value. Short read.");
                 }
-                ZeroBuffer.put(readBuffer);
-                readBuffer.clear();
+                readBuffer.writerIndex(readBytes);
             }
         }
         return (int) (pos - prevPos);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
index ef4d6f3..87e1d43 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 
 /**
- * A {@code BufferedChannelBase} adds functionlity to an existing file 
channel, the ability
+ * A {@code BufferedChannelBase} adds functionality to an existing file 
channel, the ability
  * to buffer the input and output data. This class is a base class for 
wrapping the {@link FileChannel}.
  */
 public abstract class BufferedChannelBase {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
index 64557d1..96dea6f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
@@ -21,19 +21,24 @@
 
 package org.apache.bookkeeper.bookie;
 
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
 /**
  * A Buffered channel without a write buffer. Only reads are buffered.
  */
-public class BufferedReadChannel extends BufferedChannelBase {
+public class BufferedReadChannel extends BufferedChannelBase  {
 
     // The capacity of the read buffer.
     protected final int readCapacity;
+
     // The buffer for read operations.
-    protected ByteBuffer readBuffer;
+    protected final ByteBuf readBuffer;
+
     // The starting position of the data currently in the read buffer.
     protected long readBufferStartPosition = Long.MIN_VALUE;
 
@@ -43,8 +48,7 @@ public class BufferedReadChannel extends BufferedChannelBase {
     public BufferedReadChannel(FileChannel fileChannel, int readCapacity) 
throws IOException {
         super(fileChannel);
         this.readCapacity = readCapacity;
-        this.readBuffer = ByteBuffer.allocateDirect(readCapacity);
-        this.readBuffer.limit(0);
+        this.readBuffer = Unpooled.buffer(readCapacity);
     }
 
     /**
@@ -57,7 +61,11 @@ public class BufferedReadChannel extends BufferedChannelBase 
{
      *         -1 if the given position is greater than or equal to the file's 
current size.
      * @throws IOException if I/O error occurs
      */
-    public synchronized int read(ByteBuffer dest, long pos) throws IOException 
{
+    public int read(ByteBuf dest, long pos) throws IOException {
+        return read(dest, pos, dest.writableBytes());
+    }
+
+    public synchronized int read(ByteBuf dest, long pos, int length) throws 
IOException {
         invocationCount++;
         long currentPosition = pos;
         long eof = validateAndGetFileChannel().size();
@@ -65,30 +73,28 @@ public class BufferedReadChannel extends 
BufferedChannelBase {
         if (pos >= eof) {
             return -1;
         }
-        while (dest.remaining() > 0) {
+        while (length > 0) {
             // Check if the data is in the buffer, if so, copy it.
             if (readBufferStartPosition <= currentPosition
-                    && currentPosition < readBufferStartPosition + 
readBuffer.limit()) {
-                long posInBuffer = currentPosition - readBufferStartPosition;
-                long bytesToCopy = Math.min(dest.remaining(), 
readBuffer.limit() - posInBuffer);
-                ByteBuffer rbDup = readBuffer.duplicate();
-                rbDup.position((int) posInBuffer);
-                rbDup.limit((int) (posInBuffer + bytesToCopy));
-                dest.put(rbDup);
+                    && currentPosition < readBufferStartPosition + 
readBuffer.readableBytes()) {
+                int posInBuffer = (int) (currentPosition - 
readBufferStartPosition);
+                int bytesToCopy = Math.min(length, readBuffer.readableBytes() 
- posInBuffer);
+                dest.writeBytes(readBuffer, posInBuffer, bytesToCopy);
                 currentPosition += bytesToCopy;
+                length -= bytesToCopy;
                 cacheHitCount++;
             } else if (currentPosition >= eof) {
                 // here we reached eof.
                 break;
             } else {
                 // We don't have it in the buffer, so put necessary data in 
the buffer
-                readBuffer.clear();
                 readBufferStartPosition = currentPosition;
                 int readBytes = 0;
-                if ((readBytes = validateAndGetFileChannel().read(readBuffer, 
currentPosition)) <= 0) {
+                if ((readBytes = 
validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity),
+                        currentPosition)) <= 0) {
                     throw new IOException("Reading from filechannel returned a 
non-positive value. Short read.");
                 }
-                readBuffer.limit(readBytes);
+                readBuffer.writerIndex(readBytes);
             }
         }
         return (int) (currentPosition - pos);
@@ -96,7 +102,6 @@ public class BufferedReadChannel extends BufferedChannelBase 
{
 
     public synchronized void clear() {
         readBuffer.clear();
-        readBuffer.limit(0);
     }
 
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
index c31b989..88e22bf 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
@@ -21,8 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -85,15 +86,13 @@ public class EntryLogCompactor extends AbstractLogCompactor 
{
                 }
 
                 @Override
-                public void process(final long ledgerId, long offset, 
ByteBuffer entry) throws IOException {
-                    throttler.acquire(entry.remaining());
+                public void process(final long ledgerId, long offset, ByteBuf 
entry) throws IOException {
+                    throttler.acquire(entry.readableBytes());
 
                     if (offsets.size() > maxOutstandingRequests) {
                         flush();
                     }
-                    entry.getLong(); // discard ledger id, we already have it
-                    long entryId = entry.getLong();
-                    entry.rewind();
+                    long entryId = entry.getLong(entry.readerIndex() + 8);
 
                     long newoffset = entryLogger.addEntry(ledgerId, entry);
                     offsets.add(new EntryLocation(ledgerId, entryId, 
newoffset));
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index c00c84c..d229151 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -28,9 +28,10 @@ import static 
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
 import com.google.common.collect.MapMaker;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.FastThreadLocal;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -60,7 +61,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -164,7 +164,7 @@ public class EntryLogger {
      * </pre>
      */
     static final int LOGFILE_HEADER_SIZE = 1024;
-    final ByteBuffer logfileHeader = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
+    final ByteBuf logfileHeader = Unpooled.buffer(LOGFILE_HEADER_SIZE);
 
     static final int HEADER_VERSION_POSITION = 4;
     static final int LEDGERS_MAP_OFFSET_POSITION = HEADER_VERSION_POSITION + 4;
@@ -222,10 +222,10 @@ public class EntryLogger {
          * @param offset
          *          File offset of this entry.
          * @param entry
-         *          Entry ByteBuffer
+         *          Entry ByteBuf
          * @throws IOException
          */
-        void process(long ledgerId, long offset, ByteBuffer entry) throws 
IOException;
+        void process(long ledgerId, long offset, ByteBuf entry) throws 
IOException;
     }
 
     /**
@@ -265,8 +265,9 @@ public class EntryLogger {
         // within the same JVM. All of these Bookie instances access this 
header
         // so there can be race conditions when entry logs are rolled over and
         // this header buffer is cleared before writing it into the new 
logChannel.
-        logfileHeader.put("BKLO".getBytes(UTF_8));
-        logfileHeader.putInt(HEADER_CURRENT_VERSION);
+        logfileHeader.writeBytes("BKLO".getBytes(UTF_8));
+        logfileHeader.writeInt(HEADER_CURRENT_VERSION);
+        logfileHeader.writerIndex(LOGFILE_HEADER_SIZE);
 
         // Find the largest logId
         long logId = INVALID_LID;
@@ -307,13 +308,13 @@ public class EntryLogger {
      * @param pos The starting position from where we want to read.
      * @return
      */
-    private int readFromLogChannel(long entryLogId, BufferedReadChannel 
channel, ByteBuffer buff, long pos)
+    private int readFromLogChannel(long entryLogId, BufferedReadChannel 
channel, ByteBuf buff, long pos)
             throws IOException {
         BufferedLogChannel bc = logChannel;
         if (null != bc) {
             if (entryLogId == bc.getLogId()) {
                 synchronized (bc) {
-                    if (pos + buff.remaining() >= bc.getFileChannelPosition()) 
{
+                    if (pos + buff.writableBytes() >= 
bc.getFileChannelPosition()) {
                         return bc.read(buff, pos);
                     }
                 }
@@ -513,9 +514,9 @@ public class EntryLogger {
         int numberOfLedgers = (int) ledgersMap.size();
 
         // Write the ledgers map into several batches
-        final AtomicLong currentOffset = new AtomicLong(ledgerMapOffset);
+
         final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-        final ByteBuffer serializedMap = ByteBuffer.allocate(maxMapSize);
+        final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
 
         try {
             ledgersMap.forEach(new BiConsumerLong() {
@@ -530,25 +531,23 @@ public class EntryLogger {
                         int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
 
                         serializedMap.clear();
-                        serializedMap.putInt(ledgerMapSize - 4);
-                        serializedMap.putLong(INVALID_LID);
-                        serializedMap.putLong(LEDGERS_MAP_ENTRY_ID);
-                        serializedMap.putInt(batchSize);
+                        serializedMap.writeInt(ledgerMapSize - 4);
+                        serializedMap.writeLong(INVALID_LID);
+                        serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+                        serializedMap.writeInt(batchSize);
 
                         startNewBatch = false;
                         remainingInBatch = batchSize;
                     }
                     // Dump the ledger in the current batch
-                    serializedMap.putLong(ledgerId);
-                    serializedMap.putLong(size);
+                    serializedMap.writeLong(ledgerId);
+                    serializedMap.writeLong(size);
                     --remainingLedgers;
 
                     if (--remainingInBatch == 0) {
                         // Close current batch
-                        serializedMap.flip();
                         try {
-                            int written = 
entryLogChannel.fileChannel.write(serializedMap, currentOffset.get());
-                            currentOffset.addAndGet(written);
+                            entryLogChannel.write(serializedMap);
                         } catch (IOException e) {
                             throw new RuntimeException(e);
                         }
@@ -563,6 +562,8 @@ public class EntryLogger {
             } else {
                 throw e;
             }
+        } finally {
+            serializedMap.release();
         }
 
         // Update the headers with the map offset and count of ledgers
@@ -662,7 +663,8 @@ public class EntryLogger {
             FileChannel channel = new RandomAccessFile(newLogFile, 
"rw").getChannel();
             BufferedLogChannel logChannel = new BufferedLogChannel(channel,
                     conf.getWriteBufferBytes(), conf.getReadBufferBytes(), 
preallocatedLogId, newLogFile);
-            logChannel.write((ByteBuffer) logfileHeader.clear());
+            logfileHeader.readerIndex(0);
+            logChannel.write(logfileHeader);
 
             for (File f : list) {
                 setLastLogId(f, preallocatedLogId);
@@ -851,11 +853,22 @@ public class EntryLogger {
     }
 
     long addEntry(long ledger, ByteBuffer entry) throws IOException {
+        return addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
+    }
+
+    long addEntry(long ledger, ByteBuf entry) throws IOException {
         return addEntry(ledger, entry, true);
     }
 
-    synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) 
throws IOException {
-        int entrySize = entry.remaining() + 4;
+    private final FastThreadLocal<ByteBuf> sizeBuffer = new 
FastThreadLocal<ByteBuf>() {
+        @Override
+        protected ByteBuf initialValue() throws Exception {
+            return Unpooled.buffer(4);
+        }
+    };
+
+    synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
+        int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
         boolean reachEntryLogLimit =
             rollLog ? reachEntryLogLimit(entrySize) : 
readEntryLogHardLimit(entrySize);
         // Create new log if logSizeLimit reached or current disk is full
@@ -871,12 +884,11 @@ public class EntryLogger {
             }
         }
 
-        // Get a buffer from recyclable pool to store the size
-        RecyclableByteBuffer recyclableBuffer = RecyclableByteBuffer.get();
-        recyclableBuffer.buffer.putInt(entry.remaining());
-        recyclableBuffer.buffer.flip();
-        logChannel.write(recyclableBuffer.buffer);
-        recyclableBuffer.recycle();
+        // Get a buffer from thread local to store the size
+        ByteBuf sizeBuffer = this.sizeBuffer.get();
+        sizeBuffer.clear();
+        sizeBuffer.writeInt(entry.readableBytes());
+        logChannel.write(sizeBuffer);
 
         long pos = logChannel.position();
         logChannel.write(entry);
@@ -887,16 +899,18 @@ public class EntryLogger {
         return (logChannel.getLogId() << 32L) | pos;
     }
 
-    long addEntryForCompaction(long ledgerId, ByteBuffer entry) throws 
IOException {
+    long addEntryForCompaction(long ledgerId, ByteBuf entry) throws 
IOException {
         synchronized (compactionLogLock) {
-            int entrySize = entry.remaining() + 4;
+            int entrySize = entry.readableBytes() + 4;
             if (compactionLogChannel == null) {
                 createNewCompactionLog();
             }
-            ByteBuffer buff = ByteBuffer.allocate(4);
-            buff.putInt(entry.remaining());
-            buff.flip();
-            compactionLogChannel.write(buff);
+
+            ByteBuf sizeBuffer = this.sizeBuffer.get();
+            sizeBuffer.clear();
+            sizeBuffer.writeInt(entry.readableBytes());
+            compactionLogChannel.write(sizeBuffer);
+
             long pos = compactionLogChannel.position();
             compactionLogChannel.write(entry);
             compactionLogChannel.registerWrittenEntry(ledgerId, entrySize);
@@ -950,32 +964,6 @@ public class EntryLogger {
     }
 
 
-    private static final class RecyclableByteBuffer {
-        private static final Recycler<RecyclableByteBuffer> RECYCLER = new  
Recycler<RecyclableByteBuffer>() {
-            @Override
-            protected RecyclableByteBuffer 
newObject(Handle<RecyclableByteBuffer> handle) {
-                return new RecyclableByteBuffer(handle);
-            }
-        };
-
-        private final ByteBuffer buffer;
-        private final Handle<RecyclableByteBuffer> handle;
-        public RecyclableByteBuffer(Handle<RecyclableByteBuffer> handle) {
-            this.buffer = ByteBuffer.allocate(4);
-            this.handle = handle;
-        }
-
-        public static RecyclableByteBuffer get() {
-            return RECYCLER.get();
-        }
-
-        public void recycle() {
-            buffer.rewind();
-            handle.recycle(this);
-        }
-    }
-
-
     private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws 
IOException {
         if (!doRegularFlushes) {
             return;
@@ -1001,7 +989,8 @@ public class EntryLogger {
     ByteBuf readEntry(long ledgerId, long entryId, long location) throws 
IOException, Bookie.NoEntryException {
         long entryLogId = logIdForOffset(location);
         long pos = location & 0xffffffffL;
-        RecyclableByteBuffer sizeBuff = RecyclableByteBuffer.get();
+        ByteBuf sizeBuff = sizeBuffer.get();
+        sizeBuff.clear();
         pos -= 4; // we want to get the ledgerId and length to check
         BufferedReadChannel fc;
         try {
@@ -1013,14 +1002,12 @@ public class EntryLogger {
             throw newe;
         }
 
-        if (readFromLogChannel(entryLogId, fc, sizeBuff.buffer, pos) != 
sizeBuff.buffer.capacity()) {
+        if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) != 
sizeBuff.capacity()) {
             throw new Bookie.NoEntryException("Short read from entrylog " + 
entryLogId,
                                               ledgerId, entryId);
         }
         pos += 4;
-        sizeBuff.buffer.flip();
-        int entrySize = sizeBuff.buffer.getInt();
-        sizeBuff.recycle();
+        int entrySize = sizeBuff.readInt();
 
         // entrySize does not include the ledgerId
         if (entrySize > maxSaneEntrySize) {
@@ -1033,7 +1020,7 @@ public class EntryLogger {
         }
 
         ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entrySize, 
entrySize);
-        int rc = readFromLogChannel(entryLogId, fc, data.nioBuffer(0, 
entrySize), pos);
+        int rc = readFromLogChannel(entryLogId, fc, data, pos);
         if (rc != entrySize) {
             // Note that throwing NoEntryException here instead of IOException 
is not
             // without risk. If all bookies in a quorum throw this same 
exception
@@ -1042,6 +1029,7 @@ public class EntryLogger {
             // could have occurred, where the length of the entry was 
corrupted on all
             // replicas. However, the chance of this happening is very very 
low, so
             // returning NoEntryException is mostly safe.
+            data.release();
             throw new Bookie.NoEntryException("Short read for " + ledgerId + 
"@"
                                               + entryId + " in " + entryLogId 
+ "@"
                                               + pos + "(" + rc + "!=" + 
entrySize + ")", ledgerId, entryId);
@@ -1049,11 +1037,13 @@ public class EntryLogger {
         data.writerIndex(entrySize);
         long thisLedgerId = data.getLong(0);
         if (thisLedgerId != ledgerId) {
+            data.release();
             throw new IOException("problem found in " + entryLogId + "@" + 
entryId + " at position + " + pos
                     + " entry belongs to " + thisLedgerId + " not " + 
ledgerId);
         }
         long thisEntryId = data.getLong(8);
         if (thisEntryId != entryId) {
+            data.release();
             throw new IOException("problem found in " + entryLogId + "@" + 
entryId + " at position + " + pos
                     + " entry is " + thisEntryId + " not " + entryId);
         }
@@ -1068,21 +1058,24 @@ public class EntryLogger {
         BufferedReadChannel bc = getChannelForLogId(entryLogId);
 
         // Allocate buffer to read (version, ledgersMapOffset, ledgerCount)
-        ByteBuffer headers = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
-        bc.read(headers, 0);
-        headers.flip();
+        ByteBuf headers = 
PooledByteBufAllocator.DEFAULT.directBuffer(LOGFILE_HEADER_SIZE);
+        try {
+            bc.read(headers, 0);
 
-        // Skip marker string "BKLO"
-        headers.getInt();
+            // Skip marker string "BKLO"
+            headers.readInt();
 
-        int headerVersion = headers.getInt();
-        if (headerVersion < HEADER_V0 || headerVersion > 
HEADER_CURRENT_VERSION) {
-            LOG.info("Unknown entry log header version for log {}: {}", 
entryLogId, headerVersion);
-        }
+            int headerVersion = headers.readInt();
+            if (headerVersion < HEADER_V0 || headerVersion > 
HEADER_CURRENT_VERSION) {
+                LOG.info("Unknown entry log header version for log {}: {}", 
entryLogId, headerVersion);
+            }
 
-        long ledgersMapOffset = headers.getLong();
-        int ledgersCount = headers.getInt();
-        return new Header(headerVersion, ledgersMapOffset, ledgersCount);
+            long ledgersMapOffset = headers.readLong();
+            int ledgersCount = headers.readInt();
+            return new Header(headerVersion, ledgersMapOffset, ledgersCount);
+        } finally {
+            headers.release();
+        }
     }
 
     private BufferedReadChannel getChannelForLogId(long entryLogId) throws 
IOException {
@@ -1137,8 +1130,8 @@ public class EntryLogger {
      * @throws IOException
      */
     protected void scanEntryLog(long entryLogId, EntryLogScanner scanner) 
throws IOException {
-        ByteBuffer sizeBuff = ByteBuffer.allocate(4);
-        ByteBuffer lidBuff = ByteBuffer.allocate(8);
+        // Buffer where to read the entrySize (4 bytes) and the ledgerId (8 
bytes)
+        ByteBuf headerBuffer = Unpooled.buffer(4 + 8);
         BufferedReadChannel bc;
         // Get the BufferedChannel for the current entry log file
         try {
@@ -1151,49 +1144,50 @@ public class EntryLogger {
         // the header where all of the ledger entries are.
         long pos = LOGFILE_HEADER_SIZE;
 
-        // Read through the entry log file and extract the ledger ID's.
-        while (true) {
-            // Check if we've finished reading the entry log file.
-            if (pos >= bc.size()) {
-                break;
-            }
-            if (readFromLogChannel(entryLogId, bc, sizeBuff, pos) != 
sizeBuff.capacity()) {
-                LOG.warn("Short read for entry size from entrylog {}", 
entryLogId);
-                return;
-            }
-            long offset = pos;
-            pos += 4;
-            sizeBuff.flip();
-            int entrySize = sizeBuff.getInt();
-
-            sizeBuff.clear();
-            // try to read ledger id first
-            if (readFromLogChannel(entryLogId, bc, lidBuff, pos) != 
lidBuff.capacity()) {
-                LOG.warn("Short read for ledger id from entrylog {}", 
entryLogId);
-                return;
-            }
-            lidBuff.flip();
-            long lid = lidBuff.getLong();
-            lidBuff.clear();
-            if (lid == INVALID_LID || !scanner.accept(lid)) {
-                // skip this entry
+        // Start with a reasonably sized buffer size
+        ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 
1024);
+
+        try {
+
+            // Read through the entry log file and extract the ledger ID's.
+            while (true) {
+                // Check if we've finished reading the entry log file.
+                if (pos >= bc.size()) {
+                    break;
+                }
+                if (readFromLogChannel(entryLogId, bc, headerBuffer, pos) != 
headerBuffer.capacity()) {
+                    LOG.warn("Short read for entry size from entrylog {}", 
entryLogId);
+                    return;
+                }
+                long offset = pos;
+                pos += 4;
+                int entrySize = headerBuffer.readInt();
+                long ledgerId = headerBuffer.readLong();
+                headerBuffer.clear();
+
+                if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) {
+                    // skip this entry
+                    pos += entrySize;
+                    continue;
+                }
+                // read the entry
+
+                data.clear();
+                data.capacity(entrySize);
+                int rc = readFromLogChannel(entryLogId, bc, data, pos);
+                if (rc != entrySize) {
+                    LOG.warn("Short read for ledger entry from entryLog {}@{} 
({} != {})",
+                            new Object[] { entryLogId, pos, rc, entrySize });
+                    return;
+                }
+                // process the entry
+                scanner.process(ledgerId, offset, data);
+
+                // Advance position to the next entry
                 pos += entrySize;
-                continue;
-            }
-            // read the entry
-            byte data[] = new byte[entrySize];
-            ByteBuffer buff = ByteBuffer.wrap(data);
-            int rc = readFromLogChannel(entryLogId, bc, buff, pos);
-            if (rc != data.length) {
-                LOG.warn("Short read for ledger entry from entryLog {}@{} ({} 
!= {})", new Object[] { entryLogId, pos,
-                        rc, data.length });
-                return;
             }
-            buff.flip();
-            // process the entry
-            scanner.process(lid, offset, buff);
-            // Advance position to the next entry
-            pos += entrySize;
+        } finally {
+            data.release();
         }
     }
 
@@ -1232,53 +1226,55 @@ public class EntryLogger {
         long offset = header.ledgersMapOffset;
         EntryLogMetadata meta = new EntryLogMetadata(entryLogId);
 
-        while (offset < bc.size()) {
-            // Read ledgers map size
-            ByteBuffer sizeBuf = ByteBuffer.allocate(4);
-            bc.read(sizeBuf, offset);
-            sizeBuf.flip();
+        final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+        ByteBuf ledgersMap = ByteBufAllocator.DEFAULT.directBuffer(maxMapSize);
 
-            int ledgersMapSize = sizeBuf.getInt();
+        try {
+            while (offset < bc.size()) {
+                // Read ledgers map size
+                sizeBuffer.get().clear();
+                bc.read(sizeBuffer.get(), offset);
 
-            // Read the index into a buffer
-            ByteBuffer ledgersMapBuffer = ByteBuffer.allocate(ledgersMapSize);
-            bc.read(ledgersMapBuffer, offset + 4);
-            ledgersMapBuffer.flip();
+                int ledgersMapSize = sizeBuffer.get().readInt();
 
-            // Discard ledgerId and entryId
-            long lid = ledgersMapBuffer.getLong();
-            if (lid != INVALID_LID) {
-                throw new IOException("Cannot deserialize ledgers map from 
ledger " + lid + " -- entryLogId: "
-                        + entryLogId);
-            }
+                // Read the index into a buffer
+                ledgersMap.clear();
+                bc.read(ledgersMap, offset + 4, ledgersMapSize);
 
-            long entryId = ledgersMapBuffer.getLong();
-            if (entryId != LEDGERS_MAP_ENTRY_ID) {
-                throw new IOException("Cannot deserialize ledgers map from 
ledger " + lid + ":" + entryId
-                        + " -- entryLogId: " + entryLogId);
-            }
+                // Discard ledgerId and entryId
+                long lid = ledgersMap.readLong();
+                if (lid != INVALID_LID) {
+                    throw new IOException("Cannot deserialize ledgers map from 
ledger " + lid);
+                }
+
+                long entryId = ledgersMap.readLong();
+                if (entryId != LEDGERS_MAP_ENTRY_ID) {
+                    throw new IOException("Cannot deserialize ledgers map from 
entryId " + entryId);
+                }
 
-            // Read the number of ledgers in the current entry batch
-            int ledgersCount = ledgersMapBuffer.getInt();
+                // Read the number of ledgers in the current entry batch
+                int ledgersCount = ledgersMap.readInt();
 
-            // Extract all (ledger,size) tuples from buffer
-            for (int i = 0; i < ledgersCount; i++) {
-                long ledgerId = ledgersMapBuffer.getLong();
-                long size = ledgersMapBuffer.getLong();
+                // Extract all (ledger,size) tuples from buffer
+                for (int i = 0; i < ledgersCount; i++) {
+                    long ledgerId = ledgersMap.readLong();
+                    long size = ledgersMap.readLong();
 
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Recovering ledgers maps for log {} -- Found 
ledger: {} with size: {}",
-                            new Object[] { entryLogId, ledgerId, size });
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Recovering ledgers maps for log {} -- Found 
ledger: {} with size: {}",
+                                new Object[] { entryLogId, ledgerId, size });
+                    }
+                    meta.addLedgerSize(ledgerId, size);
+                }
+                if (ledgersMap.isReadable()) {
+                    throw new IOException("Invalid entry size when reading 
ledgers map");
                 }
-                meta.addLedgerSize(ledgerId, size);
-            }
 
-            if (ledgersMapBuffer.hasRemaining()) {
-                throw new IOException("Invalid entry size when reading ledgers 
map on entryLogId: " + entryLogId);
+                // Move to next entry, if any
+                offset += ledgersMapSize + 4;
             }
-
-            // Move to next entry, if any
-            offset += ledgersMapSize + 4;
+        } finally {
+            ledgersMap.release();
         }
 
         if (meta.getLedgersMap().size() != header.ledgersCount) {
@@ -1295,9 +1291,9 @@ public class EntryLogger {
         // Read through the entry log file and extract the entry log meta
         scanEntryLog(entryLogId, new EntryLogScanner() {
             @Override
-            public void process(long ledgerId, long offset, ByteBuffer entry) 
throws IOException {
+            public void process(long ledgerId, long offset, ByteBuf entry) 
throws IOException {
                 // add new entry size of a ledger to entry log meta
-                meta.addLedgerSize(ledgerId, entry.limit() + 4);
+                meta.addLedgerSize(ledgerId, entry.readableBytes() + 4);
             }
 
             @Override
@@ -1351,6 +1347,7 @@ public class EntryLogger {
         if (null == channel) {
             return;
         }
+
         FileChannel fileChannel = channel.getFileChannel();
         if (null != fileChannel) {
             fileChannel.close();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index c8f9483..50e93a3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -248,7 +248,7 @@ public class EntryMemTable {
                         ledger = kv.getLedgerId();
                         if (ledgerGC != ledger) {
                             try {
-                                flusher.process(ledger, kv.getEntryId(), 
kv.getValueAsByteBuffer().nioBuffer());
+                                flusher.process(ledger, kv.getEntryId(), 
kv.getValueAsByteBuffer());
                             } catch (NoLedgerException exception) {
                                 ledgerGC = ledger;
                             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6db3e6d..2b357ee 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -26,16 +26,18 @@ import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENT
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 
 import com.google.common.collect.Lists;
+
 import io.netty.buffer.ByteBuf;
+
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Observable;
 import java.util.Observer;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -263,7 +265,7 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
         long entryId = entry.getLong(entry.readerIndex() + 8);
         long lac = entry.getLong(entry.readerIndex() + 16);
 
-        processEntry(ledgerId, entryId, entry.nioBuffer());
+        processEntry(ledgerId, entryId, entry);
 
         ledgerCache.updateLastAddConfirmed(ledgerId, lac);
         return entryId;
@@ -409,11 +411,11 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
         ledgerDeletionListeners.add(listener);
     }
 
-    protected void processEntry(long ledgerId, long entryId, ByteBuffer entry) 
throws IOException {
+    protected void processEntry(long ledgerId, long entryId, ByteBuf entry) 
throws IOException {
         processEntry(ledgerId, entryId, entry, true);
     }
 
-    protected synchronized void processEntry(long ledgerId, long entryId, 
ByteBuffer entry, boolean rollLog)
+    protected synchronized void processEntry(long ledgerId, long entryId, 
ByteBuf entry, boolean rollLog)
             throws IOException {
         /*
          * Touch dirty flag
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index d3c2144..faac1b4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import com.google.common.base.Stopwatch;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
@@ -39,6 +40,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -49,7 +51,6 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DaemonThreadFactory;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ZeroBuffer;
 import org.apache.bookkeeper.util.collections.GrowableArrayBlockingQueue;
 import org.apache.bookkeeper.util.collections.RecyclableArrayList;
 import org.slf4j.Logger;
@@ -291,7 +292,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         static QueueEntry create(ByteBuf entry, long ledgerId, long entryId, 
WriteCallback cb, Object ctx,
                 long enqueueTime, OpStatsLogger journalAddEntryStats) {
             QueueEntry qe = RECYCLER.get();
-            qe.entry = entry.duplicate();
+            qe.entry = entry;
             qe.cb = cb;
             qe.ctx = ctx;
             qe.ledgerId = ledgerId;
@@ -508,7 +509,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
 
     static final int PADDING_MASK = -0x100;
 
-    static void writePaddingBytes(JournalChannel jc, ByteBuffer paddingBuffer, 
int journalAlignSize)
+    static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, 
int journalAlignSize)
             throws IOException {
         int bytesToAlign = (int) (jc.bc.position() % journalAlignSize);
         if (0 != bytesToAlign) {
@@ -520,14 +521,13 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
             }
             paddingBuffer.clear();
             // padding mask
-            paddingBuffer.putInt(PADDING_MASK);
+            paddingBuffer.writeInt(PADDING_MASK);
             // padding len
-            paddingBuffer.putInt(paddingBytes);
+            paddingBuffer.writeInt(paddingBytes);
             // padding bytes
-            paddingBuffer.position(8 + paddingBytes);
+            paddingBuffer.writerIndex(paddingBuffer.writerIndex() + 
paddingBytes);
 
-            paddingBuffer.flip();
-            jc.preAllocIfNeeded(paddingBuffer.limit());
+            jc.preAllocIfNeeded(paddingBuffer.readableBytes());
             // write padding bytes
             jc.bc.write(paddingBuffer);
         }
@@ -860,9 +860,9 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         LOG.info("Starting journal on {}", journalDirectory);
 
         RecyclableArrayList<QueueEntry> toFlush = 
entryListRecycler.newInstance();
-        ByteBuffer lenBuff = ByteBuffer.allocate(4);
-        ByteBuffer paddingBuff = ByteBuffer.allocate(2 * 
conf.getJournalAlignmentSize());
-        ZeroBuffer.put(paddingBuff);
+        ByteBuf lenBuff = Unpooled.buffer(4);
+        ByteBuf paddingBuff = Unpooled.buffer(2 * 
conf.getJournalAlignmentSize());
+        paddingBuff.writeZero(paddingBuff.capacity());
         final int journalFormatVersionToWrite = 
conf.getJournalFormatVersionToWrite();
         final int journalAlignmentSize = conf.getJournalAlignmentSize();
         JournalChannel logFile = null;
@@ -1020,24 +1020,20 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                     continue;
                 }
 
-                journalWriteBytes.add(qe.entry.readableBytes());
+                int entrySize = qe.entry.readableBytes();
+                journalWriteBytes.add(entrySize);
                 journalQueueSize.dec();
 
-                batchSize += (4 + qe.entry.readableBytes());
+                batchSize += (4 + entrySize);
 
                 lenBuff.clear();
-                lenBuff.putInt(qe.entry.readableBytes());
-                lenBuff.flip();
+                lenBuff.writeInt(entrySize);
 
                 // preAlloc based on size
-                logFile.preAllocIfNeeded(4 + qe.entry.readableBytes());
+                logFile.preAllocIfNeeded(4 + entrySize);
 
-                //
-                // we should be doing the following, but then we run out of
-                // direct byte buffers
-                // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
                 bc.write(lenBuff);
-                bc.write(qe.entry.nioBuffer());
+                bc.write(qe.entry);
                 qe.entry.release();
 
                 toFlush.add(qe);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
index 96fbbe1..abfb264 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
@@ -21,8 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 /**
  * Flush entries from skip list.
@@ -36,5 +37,5 @@ public interface SkipListFlusher {
      * @param entry Entry ByteBuffer
      * @throws IOException
      */
-    void process(long ledgerId, long entryId, ByteBuffer entry) throws 
IOException;
+    void process(long ledgerId, long entryId, ByteBuf entry) throws 
IOException;
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 972f085..b0c6bad 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -177,7 +176,7 @@ public class SortedLedgerStorage extends 
InterleavedLedgerStorage
 
     @Override
     public void process(long ledgerId, long entryId,
-                        ByteBuffer buffer) throws IOException {
+                        ByteBuf buffer) throws IOException {
         processEntry(ledgerId, entryId, buffer, false);
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
index e5fef35..67c1106 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
@@ -21,9 +21,10 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -172,18 +173,17 @@ public class TransactionalEntryLogCompactor extends 
AbstractLogCompactor {
                 }
 
                 @Override
-                public void process(long ledgerId, long offset, ByteBuffer 
entry) throws IOException {
-                    throttler.acquire(entry.remaining());
+                public void process(long ledgerId, long offset, ByteBuf entry) 
throws IOException {
+                    throttler.acquire(entry.readableBytes());
                     synchronized (TransactionalEntryLogCompactor.this) {
-                        long lid = entry.getLong();
-                        long entryId = entry.getLong();
+                        long lid = entry.getLong(entry.readerIndex());
+                        long entryId = entry.getLong(entry.readerIndex() + 8);
                         if (lid != ledgerId || entryId < -1) {
                             LOG.warn("Scanning expected ledgerId {}, but found 
invalid entry "
                                     + "with ledgerId {} entryId {} at offset 
{}",
                                 new Object[]{ledgerId, lid, entryId, offset});
                             throw new IOException("Invalid entry found @ 
offset " + offset);
                         }
-                        entry.rewind();
                         long newOffset = 
entryLogger.addEntryForCompaction(ledgerId, entry);
                         offsets.add(new EntryLocation(ledgerId, entryId, 
newOffset));
 
@@ -356,16 +356,15 @@ public class TransactionalEntryLogCompactor extends 
AbstractLogCompactor {
                 }
 
                 @Override
-                public void process(long ledgerId, long offset, ByteBuffer 
entry) throws IOException {
-                    long lid = entry.getLong();
-                    long entryId = entry.getLong();
+                public void process(long ledgerId, long offset, ByteBuf entry) 
throws IOException {
+                    long lid = entry.getLong(entry.readerIndex());
+                    long entryId = entry.getLong(entry.readerIndex() + 8);
                     if (lid != ledgerId || entryId < -1) {
                         LOG.warn("Scanning expected ledgerId {}, but found 
invalid entry "
                                 + "with ledgerId {} entryId {} at offset {}",
                             new Object[]{ledgerId, lid, entryId, offset});
                         throw new IOException("Invalid entry found @ offset " 
+ offset);
                     }
-                    entry.rewind();
                     long location = (compactedLogId << 32L) | (offset + 4);
                     offsets.add(new EntryLocation(lid, entryId, location));
                 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
index d3ae746..e55dca2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
@@ -121,11 +121,6 @@ public final class DoubleByteBuf extends 
AbstractReferenceCountedByteBuf {
     }
 
     @Override
-    public int readableBytes() {
-        return b1.readableBytes() + b2.readableBytes();
-    }
-
-    @Override
     public int writableBytes() {
         return 0;
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 8332b34..6f75bbe 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -20,6 +20,11 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
@@ -29,8 +34,8 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.bookkeeper.client.ClientUtil;
@@ -38,14 +43,11 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.ZeroBuffer;
 import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.junit.Test;
-import org.junit.After;
-
-import static org.junit.Assert.*;
 
 public class BookieJournalTest {
     private final static Logger LOG = 
LoggerFactory.getLogger(BookieJournalTest.class);
@@ -105,11 +107,10 @@ public class BookieJournalTest {
     /**
      * Generate fence entry
      */
-    private ByteBuffer generateFenceEntry(long ledgerId) {
-        ByteBuffer bb = ByteBuffer.allocate(8 + 8);
-        bb.putLong(ledgerId);
-        bb.putLong(Bookie.METAENTRY_ID_FENCE_KEY);
-        bb.flip();
+    private ByteBuf generateFenceEntry(long ledgerId) {
+        ByteBuf bb = Unpooled.buffer();
+        bb.writeLong(ledgerId);
+        bb.writeLong(Bookie.METAENTRY_ID_FENCE_KEY);
         return bb;
     }
 
@@ -117,13 +118,12 @@ public class BookieJournalTest {
      * Generate meta entry with given master key
      */
     private ByteBuf generateMetaEntry(long ledgerId, byte[] masterKey) {
-        ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length);
-        bb.putLong(ledgerId);
-        bb.putLong(Bookie.METAENTRY_ID_LEDGER_KEY);
-        bb.putInt(masterKey.length);
-        bb.put(masterKey);
-        bb.flip();
-        return Unpooled.wrappedBuffer(bb);
+        ByteBuf bb = Unpooled.buffer();
+        bb.writeLong(ledgerId);
+        bb.writeLong(Bookie.METAENTRY_ID_LEDGER_KEY);
+        bb.writeInt(masterKey.length);
+        bb.writeBytes(masterKey);
+        return bb;
     }
 
     private void writeJunkJournal(File journalDir) throws Exception {
@@ -204,8 +204,8 @@ public class BookieJournalTest {
             lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
-            bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
             packet.release();
         }
         bc.flush(true);
@@ -238,8 +238,8 @@ public class BookieJournalTest {
             lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
-            bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
             packet.release();
         }
         bc.flush(true);
@@ -271,15 +271,14 @@ public class BookieJournalTest {
             ByteBuffer lenBuff = ByteBuffer.allocate(4);
             lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
-            bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
             packet.release();
         }
         // write fence key
-        ByteBuffer packet = generateFenceEntry(1);
-        ByteBuffer lenBuf = ByteBuffer.allocate(4);
-        lenBuf.putInt(packet.remaining());
-        lenBuf.flip();
+        ByteBuf packet = generateFenceEntry(1);
+        ByteBuf lenBuf = Unpooled.buffer();
+        lenBuf.writeInt(packet.readableBytes());
         bc.write(lenBuf);
         bc.write(packet);
         bc.flush(true);
@@ -293,8 +292,8 @@ public class BookieJournalTest {
 
         BufferedChannel bc = jc.getBufferedChannel();
 
-        ByteBuffer paddingBuff = ByteBuffer.allocateDirect(2 * 
JournalChannel.SECTOR_SIZE);
-        ZeroBuffer.put(paddingBuff);
+        ByteBuf paddingBuff = Unpooled.buffer();
+        paddingBuff.writeZero(2 * JournalChannel.SECTOR_SIZE);
         byte[] data = new byte[4 * 1024 * 1024];
         Arrays.fill(data, (byte)'X');
         long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
@@ -308,19 +307,17 @@ public class BookieJournalTest {
             }
             lastConfirmed = i;
             length += i;
-            ByteBuffer lenBuff = ByteBuffer.allocate(4);
-            lenBuff.putInt(packet.readableBytes());
-            lenBuff.flip();
+            ByteBuf lenBuff = Unpooled.buffer();
+            lenBuff.writeInt(packet.readableBytes());
             bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(packet);
             packet.release();
             Journal.writePaddingBytes(jc, paddingBuff, 
JournalChannel.SECTOR_SIZE);
         }
         // write fence key
-        ByteBuffer packet = generateFenceEntry(1);
-        ByteBuffer lenBuf = ByteBuffer.allocate(4);
-        lenBuf.putInt(packet.remaining());
-        lenBuf.flip();
+        ByteBuf packet = generateFenceEntry(1);
+        ByteBuf lenBuf = Unpooled.buffer();
+        lenBuf.writeInt(packet.readableBytes());
         bc.write(lenBuf);
         bc.write(packet);
         Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE);
@@ -519,7 +516,7 @@ public class BookieJournalTest {
         Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
 
         JournalChannel jc = 
writeV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
-        
jc.getBufferedChannel().write(ByteBuffer.wrap("JunkJunkJunk".getBytes()));
+        
jc.getBufferedChannel().write(Unpooled.wrappedBuffer("JunkJunkJunk".getBytes()));
         jc.getBufferedChannel().flush(true);
 
         writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 8de2bfc..144871c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -21,12 +21,21 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -35,15 +44,8 @@ import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.SnapshotMap;
 import org.apache.bookkeeper.util.IOUtils;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.bookkeeper.util.SnapshotMap;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -52,8 +54,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
-
 /**
  * LedgerCache related test cases
  */
@@ -499,7 +499,7 @@ public class LedgerCacheTest {
         }
 
         @Override
-        public void process(long ledgerId, long entryId, ByteBuffer buffer) 
throws IOException {
+        public void process(long ledgerId, long entryId, ByteBuf buffer) 
throws IOException {
             if (injectFlushException.get()) {
                 throw new IOException("Injected Exception");
             }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index ec60799..7606c91 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -22,17 +22,20 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Test;
 import org.junit.Before;
+import org.junit.Test;
 
 public class TestEntryMemTable implements CacheCallback, SkipListFlusher, 
CheckpointSource {
 
@@ -97,7 +100,7 @@ public class TestEntryMemTable implements CacheCallback, 
SkipListFlusher, Checkp
         // No-op
     }
 
-    public void process(long ledgerId, long entryId, ByteBuffer entry)
+    public void process(long ledgerId, long entryId, ByteBuf entry)
             throws IOException {
         // No-op
     }
@@ -135,7 +138,7 @@ public class TestEntryMemTable implements CacheCallback, 
SkipListFlusher, Checkp
         }
 
         @Override
-        public void process(long ledgerId, long entryId, ByteBuffer entry) 
throws IOException {
+        public void process(long ledgerId, long entryId, ByteBuf entry) throws 
IOException {
             assertTrue(ledgerId + ":" + entryId + " is duplicate in store!",
                     keyValues.add(new EntryKeyValue(ledgerId, entryId, 
entry.array())));
         }
@@ -143,7 +146,7 @@ public class TestEntryMemTable implements CacheCallback, 
SkipListFlusher, Checkp
 
     private class NoLedgerFLusher implements SkipListFlusher {
         @Override
-        public void process(long ledgerId, long entryId, ByteBuffer entry) 
throws IOException {
+        public void process(long ledgerId, long entryId, ByteBuf entry) throws 
IOException {
             throw new NoLedgerException(ledgerId);
         }
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
index fba9594..00e3d15 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.bookie;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -96,8 +97,8 @@ public class UpgradeTest extends BookKeeperClusterTestCase {
             lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
-            bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
             packet.release();
         }
         bc.flush(true);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
index 824f72f..270a60f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.nio.ByteBuffer;
@@ -121,4 +122,28 @@ public class DoubleByteBufTest {
 
         assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }), 
b.nioBuffer());
     }
+
+    /**
+     * Verify that readableBytes() returns writerIndex - readerIndex. In this 
case writerIndex is the end of the buffer
+     * and readerIndex is increased by 64.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testReadableBytes() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+        ByteBuf buf = DoubleByteBuf.get(b1, b2);
+
+        assertEquals(buf.readerIndex(), 0);
+        assertEquals(buf.writerIndex(), 256);
+        assertEquals(buf.readableBytes(), 256);
+
+        for (int i = 0; i < 4; ++i) {
+            buf.skipBytes(64);
+            assertEquals(buf.readableBytes(), 256 - 64 * (i + 1));
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to