This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new df71247  Replace DoubleByteBuf with ByteBufList
df71247 is described below

commit df71247a8c9644bbeda7d18d833fde4bef88fb3d
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Feb 18 04:08:20 2018 -0800

    Replace DoubleByteBuf with ByteBufList
    
    This PR aims at a better approach to fix the issue reported in #1108
    
    Instead of using `CompositeByteBuf` which has the downside of needing 
multiple object allocations per each usage (~7-8), we should have a way to 
aggregate multiple buffers into a single "logical" frame with no additional 
overhead.
    
    This PR introduces a `ByteBufList` class which is just a holder of a list 
of buffers, without imitating the `ByteBuf` API. You can pass a `ByteBufList` 
to the channel and have the encoder take care of the multiple writes.
    
    Instances of ByteBufList are ref-counted and released to a pool after the 
write is completed. There are no object allocations or memory copies involved.
    
    Author: Matteo Merli <[email protected]>
    Author: Sijie Guo <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Venkateswararao Jujjuri 
(JV) <None>
    
    This closes #1141 from merlimat/bytebuf-list-master
---
 .../apache/bookkeeper/benchmark/BenchBookie.java   |   7 +-
 .../bookkeeper/client/ExplicitLacFlushPolicy.java  |   6 +-
 .../client/LedgerFragmentReplicator.java           |   8 +-
 .../org/apache/bookkeeper/client/PendingAddOp.java |   3 +-
 .../bookkeeper/client/PendingWriteLacOp.java       |   7 +-
 .../org/apache/bookkeeper/proto/BookieClient.java  |  11 +-
 .../apache/bookkeeper/proto/BookieNettyServer.java |   4 +
 .../bookkeeper/proto/BookieProtoEncoding.java      |  13 +-
 .../apache/bookkeeper/proto/BookieProtocol.java    |  63 ++-
 .../bookkeeper/proto/PerChannelBookieClient.java   |  11 +-
 .../bookkeeper/proto/WriteEntryProcessor.java      |   4 +-
 .../bookkeeper/proto/checksum/DigestManager.java   |  14 +-
 .../org/apache/bookkeeper/util/ByteBufList.java    | 304 +++++++++++++++
 .../org/apache/bookkeeper/util/DoubleByteBuf.java  |  39 --
 .../org/apache/bookkeeper/client/ClientUtil.java   |  15 +-
 .../bookkeeper/client/LedgerHandleAdapter.java     |   9 +
 .../bookkeeper/client/MockBookKeeperTestCase.java  |  14 +-
 .../client/ParallelLedgerRecoveryTest.java         |   3 +-
 .../apache/bookkeeper/test/BookieClientTest.java   |   7 +-
 .../apache/bookkeeper/util/ByteBufListTest.java    | 422 +++++++++++++++++++++
 .../apache/bookkeeper/util/DoubleByteBufTest.java  | 197 ----------
 .../proto/checksum/DigestTypeBenchmark.java        |   9 +-
 22 files changed, 871 insertions(+), 299 deletions(-)

diff --git 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 08f548c..db8ccbf 100644
--- 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -186,7 +187,7 @@ public class BenchBookie {
             toSend.writeLong(entry);
             toSend.writerIndex(toSend.capacity());
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new 
byte[20],
-                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+                    entry, ByteBufList.get(toSend), tc, null, 
BookieProtocol.FLAG_NONE);
         }
         LOG.info("Waiting for warmup");
         tc.waitFor(warmUpCount);
@@ -203,7 +204,7 @@ public class BenchBookie {
             toSend.writerIndex(toSend.capacity());
             lc.resetComplete();
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new 
byte[20],
-                        entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
+                    entry, ByteBufList.get(toSend), lc, null, 
BookieProtocol.FLAG_NONE);
             lc.waitForComplete();
         }
         long endTime = System.nanoTime();
@@ -221,7 +222,7 @@ public class BenchBookie {
             toSend.writeLong(entry);
             toSend.writerIndex(toSend.capacity());
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new 
byte[20],
-                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+                    entry, ByteBufList.get(toSend), tc, null, 
BookieProtocol.FLAG_NONE);
         }
         tc.waitFor(throughputCount);
         endTime = System.currentTimeMillis();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index 5acaeae..c77843a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -20,13 +20,12 @@
  */
 package org.apache.bookkeeper.client;
 
-import io.netty.buffer.ByteBuf;
-
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.SyncCallbackUtils.LastAddConfirmedCallback;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,7 +138,8 @@ interface ExplicitLacFlushPolicy {
                 lh.bk.getMainWorkerPool().submit(new SafeRunnable() {
                     @Override
                     public void safeRun() {
-                        ByteBuf toSend = 
lh.macManager.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
+                        ByteBufList toSend = lh.macManager
+                                
.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
                         op.initiate(toSend);
                     }
                 });
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 31d8e21..5c2ff8c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -21,8 +21,8 @@ package org.apache.bookkeeper.client;
 
 import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashSet;
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -44,6 +45,7 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import 
org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.zookeeper.AsyncCallback;
 import org.slf4j.Logger;
@@ -313,13 +315,13 @@ public class LedgerFragmentReplicator {
                 final long dataLength = data.length;
                 numEntriesRead.inc();
                 numBytesRead.registerSuccessfulValue(dataLength);
-                ByteBuf toSend = lh.getDigestManager()
+                ByteBufList toSend = lh.getDigestManager()
                         .computeDigestAndPackageForSending(entryId,
                                 lh.getLastAddConfirmed(), entry.getLength(),
                                 Unpooled.wrappedBuffer(data, 0, data.length));
                 for (BookieSocketAddress newBookie : newBookies) {
                     bkc.getBookieClient().addEntry(newBookie, lh.getId(),
-                            lh.getLedgerKey(), entryId, toSend.retainedSlice(),
+                            lh.getLedgerKey(), entryId, 
ByteBufList.clone(toSend),
                             multiWriteCallback, dataLength, 
BookieProtocol.FLAG_RECOVERY_ADD);
                 }
                 toSend.release();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 639d042..adf67a8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
@@ -54,7 +55,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingAddOp.class);
 
     ByteBuf payload;
-    ByteBuf toSend;
+    ByteBufList toSend;
     AddCallbackWithLatency cb;
     Object ctx;
     long entryId;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index 950959b..af45e29 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -17,14 +17,13 @@
  */
 package org.apache.bookkeeper.client;
 
-import io.netty.buffer.ByteBuf;
-
 import java.util.BitSet;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory;
  */
 class PendingWriteLacOp implements WriteLacCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingWriteLacOp.class);
-    ByteBuf toSend;
+    ByteBufList toSend;
     AddLacCallback cb;
     long lac;
     Object ctx;
@@ -75,7 +74,7 @@ class PendingWriteLacOp implements WriteLacCallback {
                 lac, toSend, this, bookieIndex);
     }
 
-    void initiate(ByteBuf toSend) {
+    void initiate(ByteBufList toSend) {
         this.toSend = toSend;
         DistributionSchedule.WriteSet writeSet = 
lh.distributionSchedule.getWriteSet(lac);
         try {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 08ed194..1623324 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -60,6 +60,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
@@ -187,7 +188,7 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
     }
 
     public void writeLac(final BookieSocketAddress addr, final long ledgerId, 
final byte[] masterKey,
-            final long lac, final ByteBuf toSend, final WriteLacCallback cb, 
final Object ctx) {
+            final long lac, final ByteBufList toSend, final WriteLacCallback 
cb, final Object ctx) {
         closeLock.readLock().lock();
         try {
             final PerChannelBookieClientPool client = lookupClient(addr);
@@ -250,7 +251,7 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
                          final long ledgerId,
                          final byte[] masterKey,
                          final long entryId,
-                         final ByteBuf toSend,
+                         final ByteBufList toSend,
                          final WriteCallback cb,
                          final Object ctx,
                          final int options) {
@@ -300,7 +301,7 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
         private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
 
         private BookieClient bookieClient;
-        private ByteBuf toSend;
+        private ByteBufList toSend;
         private long ledgerId;
         private long entryId;
         private BookieSocketAddress addr;
@@ -310,7 +311,7 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
         private byte[] masterKey;
 
         static ChannelReadyForAddEntryCallback create(
-                BookieClient bookieClient, ByteBuf toSend, long ledgerId,
+                BookieClient bookieClient, ByteBufList toSend, long ledgerId,
                 long entryId, BookieSocketAddress addr, Object ctx,
                 WriteCallback cb, int options, byte[] masterKey) {
             ChannelReadyForAddEntryCallback callback = RECYCLER.get();
@@ -619,7 +620,7 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
 
         for (int i = 0; i < 100000; i++) {
             counter.inc();
-            bc.addEntry(addr, ledger, new byte[0], i, 
Unpooled.wrappedBuffer(hello), cb, counter, 0);
+            bc.addEntry(addr, ledger, new byte[0], i, 
ByteBufList.get(Unpooled.wrappedBuffer(hello)), cb, counter, 0);
         }
         counter.wait(0);
         System.out.println("Total = " + counter.total());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 3c5385c..83a3777 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -73,6 +73,7 @@ import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.commons.lang.SystemUtils;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -302,6 +303,9 @@ class BookieNettyServer {
                         new BookieSideConnectionPeerContextHandler();
                     ChannelPipeline pipeline = ch.pipeline();
 
+                    // For ByteBufList, skip the usual LengthFieldPrepender 
and have the encoder itself to add it
+                    pipeline.addLast("bytebufList", 
ByteBufList.ENCODER_WITH_SIZE);
+
                     pipeline.addLast("lengthbaseddecoder", new 
LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
                     pipeline.addLast("lengthprepender", new 
LengthFieldPrepender(4));
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 33bb675..24ef117 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -41,7 +41,7 @@ import java.util.List;
 
 import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
-import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,9 +111,10 @@ public class BookieProtoEncoding {
                 ByteBuf buf = allocator.buffer(totalHeaderSize);
                 buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), r.getFlags()));
                 buf.writeBytes(r.getMasterKey(), 0, 
BookieProtocol.MASTER_KEY_LENGTH);
-                ByteBuf data = ar.getData();
+                ByteBufList data = ar.getData();
                 ar.recycle();
-                return DoubleByteBuf.get(buf, data);
+                data.prepend(buf);
+                return data;
             } else if (r instanceof BookieProtocol.ReadRequest) {
                 int totalHeaderSize = 4 // for request type
                     + 8 // for ledgerId
@@ -166,7 +167,7 @@ public class BookieProtoEncoding {
                 // Read ledger and entry id without advancing the reader index
                 ledgerId = packet.getLong(packet.readerIndex());
                 entryId = packet.getLong(packet.readerIndex() + 8);
-                return BookieProtocol.AddRequest.create(
+                return BookieProtocol.ParsedAddRequest.create(
                         version, ledgerId, entryId, flags,
                         masterKey, packet.retain());
             }
@@ -246,7 +247,7 @@ public class BookieProtoEncoding {
 
                     BookieProtocol.ReadResponse rr = 
(BookieProtocol.ReadResponse) r;
                     if (rr.hasData()) {
-                        return DoubleByteBuf.get(buf, rr.getData());
+                        return ByteBufList.get(buf, rr.getData());
                     } else {
                         return buf;
                     }
@@ -258,7 +259,7 @@ public class BookieProtoEncoding {
                     return buf;
                 } else if (msg instanceof BookieProtocol.AuthResponse) {
                     BookkeeperProtocol.AuthMessage am = 
((BookieProtocol.AuthResponse) r).getAuthMessage();
-                    return DoubleByteBuf.get(buf, 
Unpooled.wrappedBuffer(am.toByteArray()));
+                    return ByteBufList.get(buf, 
Unpooled.wrappedBuffer(am.toByteArray()));
                 } else {
                     LOG.error("Cannot encode unknown response type {}", 
msg.getClass().getName());
                     return msg;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 4ad23d9..3136c57 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -26,6 +26,7 @@ import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ByteBufList;
 
 /**
  * The packets of the Bookie protocol all have a 4-byte integer indicating the
@@ -244,11 +245,11 @@ public interface BookieProtocol {
      * A Request that adds data.
      */
     class AddRequest extends Request {
-        ByteBuf data;
+        ByteBufList data;
 
         static AddRequest create(byte protocolVersion, long ledgerId,
                                  long entryId, short flags, byte[] masterKey,
-                                 ByteBuf data) {
+                                 ByteBufList data) {
             AddRequest add = RECYCLER.get();
             add.protocolVersion = protocolVersion;
             add.opCode = ADDENTRY;
@@ -260,8 +261,9 @@ public interface BookieProtocol {
             return add;
         }
 
-        ByteBuf getData() {
-            return data;
+        ByteBufList getData() {
+            // We need to have different ByteBufList instances for each bookie 
write
+            return ByteBufList.clone(data);
         }
 
         boolean isRecoveryAdd() {
@@ -294,6 +296,59 @@ public interface BookieProtocol {
     }
 
     /**
+     * This is similar to add request, but it used when processing the request 
on the bookie side.
+     */
+    class ParsedAddRequest extends Request {
+        ByteBuf data;
+
+        static ParsedAddRequest create(byte protocolVersion, long ledgerId, 
long entryId, short flags, byte[] masterKey,
+                ByteBuf data) {
+            ParsedAddRequest add = RECYCLER.get();
+            add.protocolVersion = protocolVersion;
+            add.opCode = ADDENTRY;
+            add.ledgerId = ledgerId;
+            add.entryId = entryId;
+            add.flags = flags;
+            add.masterKey = masterKey;
+            add.data = data.retain();
+            return add;
+        }
+
+        ByteBuf getData() {
+            // We need to have different ByteBufList instances for each bookie 
write
+            return data;
+        }
+
+        boolean isRecoveryAdd() {
+            return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
+        }
+
+        void release() {
+            data.release();
+        }
+
+        private final Handle<ParsedAddRequest> recyclerHandle;
+        private ParsedAddRequest(Handle<ParsedAddRequest> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<ParsedAddRequest> RECYCLER = new 
Recycler<ParsedAddRequest>() {
+            protected ParsedAddRequest newObject(Handle<ParsedAddRequest> 
handle) {
+                return new ParsedAddRequest(handle);
+            }
+        };
+
+        @Override
+        public void recycle() {
+            ledgerId = -1;
+            entryId = -1;
+            masterKey = null;
+            data = null;
+            recyclerHandle.recycle(this);
+        }
+    }
+
+    /**
      * A Request that reads data.
      */
     class ReadRequest extends Request {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 75b3193..666b644 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -116,6 +116,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -423,6 +424,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             protected void initChannel(Channel ch) throws Exception {
                 ChannelPipeline pipeline = ch.pipeline();
 
+                pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE);
                 pipeline.addLast("lengthbasedframedecoder",
                         new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 
0, 4));
                 pipeline.addLast("lengthprepender", new 
LengthFieldPrepender(4));
@@ -494,7 +496,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
 
     }
 
-    void writeLac(final long ledgerId, final byte[] masterKey, final long lac, 
ByteBuf toSend, WriteLacCallback cb,
+    void writeLac(final long ledgerId, final byte[] masterKey, final long lac, 
ByteBufList toSend, WriteLacCallback cb,
             Object ctx) {
         final long txnId = getTxnId();
         final CompletionKey completionKey = new V3CompletionKey(txnId,
@@ -513,7 +515,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 .setLedgerId(ledgerId)
                 .setLac(lac)
                 .setMasterKey(ByteString.copyFrom(masterKey))
-                .setBody(ByteString.copyFrom(toSend.nioBuffer()));
+                .setBody(ByteString.copyFrom(toSend.toArray()));
 
         final Request writeLacRequest = Request.newBuilder()
                 .setHeader(headerBuilder)
@@ -541,7 +543,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
      * @param options
      *          Add options
      */
-    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, 
ByteBuf toSend, WriteCallback cb,
+    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, 
ByteBufList toSend, WriteCallback cb,
                   Object ctx, final int options) {
         Object request = null;
         CompletionKey completionKey = null;
@@ -560,8 +562,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                     .setOperation(OperationType.ADD_ENTRY)
                     .setTxnId(txnId);
 
-            byte[] toSendArray = new byte[toSend.readableBytes()];
-            toSend.getBytes(toSend.readerIndex(), toSendArray);
+            byte[] toSendArray = toSend.toArray();
             AddRequest.Builder addBuilder = AddRequest.newBuilder()
                     .setLedgerId(ledgerId)
                     .setEntryId(entryId)
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 34e2c2c..164a11b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -56,8 +56,8 @@ class WriteEntryProcessor extends PacketProcessorBase 
implements WriteCallback {
 
     @Override
     protected void processPacket() {
-        assert (request instanceof BookieProtocol.AddRequest);
-        BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) request;
+        assert (request instanceof BookieProtocol.ParsedAddRequest);
+        BookieProtocol.ParsedAddRequest add = 
(BookieProtocol.ParsedAddRequest) request;
 
         if (requestProcessor.bookie.isReadOnly()) {
             LOG.warn("BookieServer is running in readonly mode,"
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index cf6b315..2f85e4f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.proto.checksum;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,6 +15,7 @@ package org.apache.bookkeeper.proto.checksum;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.proto.checksum;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -27,7 +26,7 @@ import java.security.GeneralSecurityException;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
-import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,7 +87,8 @@ public abstract class DigestManager {
      * @param data
      * @return
      */
-    public ByteBuf computeDigestAndPackageForSending(long entryId, long 
lastAddConfirmed, long length, ByteBuf data) {
+    public ByteBufList computeDigestAndPackageForSending(long entryId, long 
lastAddConfirmed, long length,
+            ByteBuf data) {
         ByteBuf headersBuffer = 
PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
         headersBuffer.writeLong(ledgerId);
         headersBuffer.writeLong(entryId);
@@ -99,7 +99,7 @@ public abstract class DigestManager {
         update(data);
         populateValueAndReset(headersBuffer);
 
-        return DoubleByteBuf.get(headersBuffer, data);
+        return ByteBufList.get(headersBuffer, data);
     }
 
     /**
@@ -109,7 +109,7 @@ public abstract class DigestManager {
      * @return
      */
 
-    public ByteBuf computeDigestAndPackageForSendingLac(long lac) {
+    public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
         ByteBuf headersBuffer = 
PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
         headersBuffer.writeLong(ledgerId);
         headersBuffer.writeLong(lac);
@@ -117,7 +117,7 @@ public abstract class DigestManager {
         update(headersBuffer);
         populateValueAndReset(headersBuffer);
 
-        return headersBuffer;
+        return ByteBufList.get(headersBuffer);
     }
 
     private void verifyDigest(ByteBuf dataReceived) throws 
BKDigestMatchException {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
new file mode 100644
index 0000000..37d4c72
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
+
+import java.util.ArrayList;
+
+/**
+ * ByteBufList is a holder of a sequence of {@link ByteBuf} objects.
+ *
+ * <p>This class doesn't trying to mimic the {@link ByteBuf}, but rather 
exposes itself just like a regular object which
+ * will need to be encoded on the channel. There are 2 utility encoders:
+ * <ul>
+ * <li>{@link #ENCODER}: regular encode that will write all the buffers in the 
{@link ByteBufList} on the channel</li>
+ * <li>{@link #ENCODER_WITH_SIZE}: similar to the previous one, but also 
prepend a 4 bytes size header, once, carrying
+ * the size of the readable bytes across all the buffers contained in the 
{@link ByteBufList}</li>
+ * </ul>
+ *
+ * <p>Example:
+ *
+ * <pre>
+ * bootstrap.handler(new ChannelInitializer&lt;SocketChannel&gt;() {
+ *     public void initChannel(SocketChannel ch) throws Exception {
+ *         ChannelPipeline pipeline = ch.pipeline();
+ *         pipeline.addLast("bytebufList", ByteBufList.ENCODER);
+ *         pipeline.addLast("mainhandler", MyHandler.class);
+ *     }
+ * });
+ * </pre>
+ *
+ * <p>ByteBufList is pooling the instances and uses ref-counting to release 
them.
+ */
+public class ByteBufList extends AbstractReferenceCounted {
+    private final ArrayList<ByteBuf> buffers;
+    private final Handle<ByteBufList> recyclerHandle;
+
+    private static final int INITIAL_LIST_SIZE = 4;
+
+    private static final Recycler<ByteBufList> RECYCLER = new 
Recycler<ByteBufList>() {
+        @Override
+        protected ByteBufList newObject(Recycler.Handle<ByteBufList> handle) {
+            return new ByteBufList(handle);
+        }
+    };
+
+    private ByteBufList(Handle<ByteBufList> recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+        this.buffers = new ArrayList<>(INITIAL_LIST_SIZE);
+    }
+
+    /**
+     * Get a new {@link ByteBufList} from the pool and assign 2 buffers to it.
+     *
+     * <p>The buffers b1 and b2 lifecycles are now managed by the ByteBufList: 
when the {@link ByteBufList} is
+     * deallocated, b1 and b2 will be released as well.
+     *
+     * @param b1
+     *            first buffer
+     * @param b2
+     *            second buffer
+     * @return a {@link ByteBufList} instance from the pool
+     */
+    public static ByteBufList get(ByteBuf b1, ByteBuf b2) {
+        ByteBufList buf = get();
+        buf.add(b1);
+        buf.add(b2);
+        return buf;
+    }
+
+    /**
+     * Get a new {@link ByteBufList} from the pool and assign 1 buffer to it.
+     *
+     * <p>The buffer b1 lifecycle is now managed by the ByteBufList: when the 
{@link ByteBufList} is
+     * deallocated, b1 will be released as well.
+     *
+     * @param b1
+     *            first buffer
+     * @return a {@link ByteBufList} instance from the pool
+     */
+    public static ByteBufList get(ByteBuf b1) {
+        ByteBufList buf = get();
+        buf.add(b1);
+        return buf;
+    }
+
+    /**
+     * Get a new {@link ByteBufList} instance from the pool that is the clone 
of an already existing instance.
+     */
+    public static ByteBufList clone(ByteBufList other) {
+        ByteBufList buf = get();
+        for (int i = 0; i < other.buffers.size(); i++) {
+            // Create a duplicate of the buffer so that there is no 
interference from other threads
+            buf.add(other.buffers.get(i).retainedDuplicate());
+        }
+        return buf;
+    }
+
+    private static ByteBufList get() {
+        ByteBufList buf = RECYCLER.get();
+        buf.setRefCnt(1);
+        return buf;
+    }
+
+    /**
+     * Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
+     */
+    public void add(ByteBuf buf) {
+        buffers.add(buf);
+    }
+
+    /**
+     * Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
+     */
+    public void prepend(ByteBuf buf) {
+        buffers.add(0, buf);
+    }
+
+    /**
+     * @return the total amount of readable bytes across all the {@link 
ByteBuf} included in the list
+     */
+    public int readableBytes() {
+        int readableBytes = 0;
+        for (int i = 0; i < buffers.size(); i++) {
+            readableBytes += buffers.get(i).readableBytes();
+        }
+        return readableBytes;
+    }
+
+    /**
+     * Get access to a particular buffer in the list.
+     *
+     * @param index
+     *            the index of the buffer
+     * @return the buffer
+     */
+    public ByteBuf getBuffer(int index) {
+        return buffers.get(index);
+    }
+
+    /**
+     * @return the number of buffers included in the {@link ByteBufList}
+     */
+    public int size() {
+        return buffers.size();
+    }
+
+    /**
+     * Write bytes from the current {@link ByteBufList} into a byte array.
+     *
+     * <p>This won't modify the reader index of the internal buffers.
+     *
+     * @param dst
+     *            the destination byte array
+     * @return the number of copied bytes
+     */
+    public int getBytes(byte[] dst) {
+        int copied = 0;
+        for (int idx = 0; idx < buffers.size() && copied < dst.length; idx++) {
+            ByteBuf b = buffers.get(idx);
+            int len = Math.min(b.readableBytes(), dst.length - copied);
+            b.getBytes(b.readerIndex(), dst, copied, len);
+
+            copied += len;
+        }
+
+        return copied;
+    }
+
+    /**
+     * @return an array containing all the internal buffers content
+     */
+    public byte[] toArray() {
+        byte[] a = new byte[readableBytes()];
+        getBytes(a);
+        return a;
+    }
+
+    /**
+     * @return a single buffer with the content of both individual buffers
+     */
+    @VisibleForTesting
+    public static ByteBuf coalesce(ByteBufList list) {
+        ByteBuf res = Unpooled.buffer(list.readableBytes());
+        for (int i = 0; i < list.buffers.size(); i++) {
+            ByteBuf b = list.buffers.get(i);
+            res.writeBytes(b, b.readerIndex(), b.readableBytes());
+        }
+
+        return res;
+    }
+
+    @Override
+    public ByteBufList retain() {
+        super.retain();
+        return this;
+    }
+
+    @Override
+    protected void deallocate() {
+        for (int i = 0; i < buffers.size(); i++) {
+            buffers.get(i).release();
+        }
+
+        buffers.clear();
+        recyclerHandle.recycle(this);
+    }
+
+    @Override
+    public ReferenceCounted touch(Object hint) {
+        for (int i = 0; i < buffers.size(); i++) {
+            buffers.get(i).touch(hint);
+        }
+        return this;
+    }
+
+    /**
+     * Encoder for the {@link ByteBufList} that doesn't prepend any size 
header.
+     */
+    public static final Encoder ENCODER = new Encoder(false);
+
+    /**
+     * Encoder for the {@link ByteBufList} that will prepend a 4 byte header 
with the size of the whole
+     * {@link ByteBufList} readable bytes.
+     */
+    public static final Encoder ENCODER_WITH_SIZE = new Encoder(true);
+
+    /**
+     * {@link ByteBufList} encoder.
+     */
+    @Sharable
+    public static class Encoder extends ChannelOutboundHandlerAdapter {
+
+        private final boolean prependSize;
+
+        public Encoder(boolean prependSize) {
+            this.prependSize = prependSize;
+        }
+
+        @Override
+        public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception {
+            if (msg instanceof ByteBufList) {
+                ByteBufList b = (ByteBufList) msg;
+
+                try {
+                    if (prependSize) {
+                        // Prepend the frame size before writing the buffer 
list, so that we only have 1 single size
+                        // header
+                        ByteBuf sizeBuffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(4, 4);
+                        sizeBuffer.writeInt(b.readableBytes());
+                        ctx.write(sizeBuffer, ctx.voidPromise());
+                    }
+
+                    // Write each buffer individually on the socket. The 
retain() here is needed to preserve the fact
+                    // that ByteBuf are automatically released after a write. 
If the ByteBufPair ref count is increased
+                    // and it gets written multiple times, the individual 
buffers refcount should be reflected as well.
+                    int buffersCount = b.buffers.size();
+                    for (int i = 0; i < buffersCount; i++) {
+                        ByteBuf bx = b.buffers.get(i);
+                        // Last buffer will carry on the final promise to 
notify when everything was written on the
+                        // socket
+                        ctx.write(bx.retainedDuplicate(), i == (buffersCount - 
1) ? promise : ctx.voidPromise());
+                    }
+                } finally {
+                    ReferenceCountUtil.safeRelease(b);
+                }
+            } else {
+                ctx.write(msg, promise);
+            }
+        }
+    }
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
deleted file mode 100644
index 7a95368..0000000
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.bookkeeper.util;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
-
-/**
- * Removed custom implementation of DoubleByteBuf, just relying on straight 
regular CompositeByteBuf.
- */
-public class DoubleByteBuf extends CompositeByteBuf {
-
-    public DoubleByteBuf(ByteBufAllocator alloc) {
-        super(alloc, true, 2);
-    }
-
-    public static DoubleByteBuf get(ByteBuf b1, ByteBuf b2) {
-        DoubleByteBuf cbb = new DoubleByteBuf(b1.alloc());
-        cbb.addComponent(true, b1);
-        cbb.addComponent(true, b2);
-        return cbb;
-    }
-}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 25842f1..8e2dd7d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -17,11 +17,14 @@
  */
 package org.apache.bookkeeper.client;
 
-import static 
org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType.CRC32;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+
 import java.security.GeneralSecurityException;
+
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.ByteBufList;
 
 /**
  * Client utilities.
@@ -32,11 +35,11 @@ public class ClientUtil {
         return generatePacket(ledgerId, entryId, lastAddConfirmed, length, 
data, 0, data.length);
     }
 
-    public static ByteBuf generatePacket(long ledgerId, long entryId, long 
lastAddConfirmed, long length,
-                                         byte[] data, int offset, int len) 
throws GeneralSecurityException {
-        DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], 
CRC32);
-        return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, 
length,
-                                                    
Unpooled.wrappedBuffer(data, offset, len));
+    public static ByteBuf generatePacket(long ledgerId, long entryId, long 
lastAddConfirmed, long length, byte[] data,
+            int offset, int len) throws GeneralSecurityException {
+        DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], 
DigestType.CRC32);
+        return 
ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId, 
lastAddConfirmed, length,
+                Unpooled.wrappedBuffer(data, offset, len)));
     }
 
     /**
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
index 2c3fb7d..7098f4e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
@@ -19,6 +19,10 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBuf;
+
+import org.apache.bookkeeper.util.ByteBufList;
+
 /**
  * Adapter for tests to get the public access from LedgerHandle for its default
  * scope.
@@ -31,4 +35,9 @@ public class LedgerHandleAdapter {
     public static LedgerMetadata getLedgerMetadata(LedgerHandle lh) {
         return lh.getLedgerMetadata();
     }
+
+    public static ByteBufList toSend(LedgerHandle lh, long entryId, ByteBuf 
data) {
+        return 
lh.getDigestManager().computeDigestAndPackageForSending(entryId, 
lh.getLastAddConfirmed(),
+                lh.addToLength(data.readableBytes()), data);
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 9a89c70..f177648 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -60,6 +60,7 @@ import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
@@ -416,9 +417,10 @@ public abstract class MockBookKeeperTestCase {
                 if (mockEntry != null) {
                     LOG.info("readEntryAndFenceLedger - found mock entry {}@{} 
at {}", entryId, ledgerId,
                             bookieSocketAddress);
-                    ByteBuf entry = 
macManager.computeDigestAndPackageForSending(entryId, 
mockEntry.lastAddConfirmed,
-                        mockEntry.payload.length, 
Unpooled.wrappedBuffer(mockEntry.payload));
-                    callback.readEntryComplete(BKException.Code.OK, ledgerId, 
entryId, Unpooled.copiedBuffer(entry),
+                    ByteBufList entry = 
macManager.computeDigestAndPackageForSending(entryId,
+                            mockEntry.lastAddConfirmed, 
mockEntry.payload.length,
+                            Unpooled.wrappedBuffer(mockEntry.payload));
+                    callback.readEntryComplete(BKException.Code.OK, ledgerId, 
entryId, ByteBufList.coalesce(entry),
                             args[5]);
                     entry.release();
                 } else {
@@ -456,10 +458,10 @@ public abstract class MockBookKeeperTestCase {
                 }
                 if (mockEntry != null) {
                     LOG.info("readEntry - found mock entry {}@{} at {}", 
entryId, ledgerId, bookieSocketAddress);
-                    ByteBuf entry = 
macManager.computeDigestAndPackageForSending(entryId,
+                    ByteBufList entry = 
macManager.computeDigestAndPackageForSending(entryId,
                         mockEntry.lastAddConfirmed, mockEntry.payload.length,
                         Unpooled.wrappedBuffer(mockEntry.payload));
-                    callback.readEntryComplete(BKException.Code.OK, ledgerId, 
entryId, Unpooled.copiedBuffer(entry),
+                    callback.readEntryComplete(BKException.Code.OK, ledgerId, 
entryId, ByteBufList.coalesce(entry),
                             args[4]);
                     entry.release();
                 } else {
@@ -532,7 +534,7 @@ public abstract class MockBookKeeperTestCase {
             return null;
         }).when(bookieClient).addEntry(any(BookieSocketAddress.class),
             anyLong(), any(byte[].class),
-            anyLong(), any(ByteBuf.class),
+            anyLong(), any(ByteBufList.class),
             any(BookkeeperInternalCallbacks.WriteCallback.class),
             any(), anyInt());
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 8e87921..8c49531 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -54,6 +54,7 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
@@ -360,7 +361,7 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
         long entryId = 14;
         long lac = 8;
         byte[] data = "recovery-on-entry-gap-gap".getBytes(UTF_8);
-        ByteBuf toSend =
+        ByteBufList toSend =
                 lh.macManager.computeDigestAndPackageForSending(
                         entryId, lac, lh.getLength() + 100, 
Unpooled.wrappedBuffer(data, 0, data.length));
         final CountDownLatch addLatch = new CountDownLatch(1);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 1e0750f..4cf2591 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -52,6 +52,7 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
@@ -156,7 +157,7 @@ public class BookieClientTest {
 
         BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor,
                                            scheduler, 
NullStatsLogger.INSTANCE);
-        ByteBuf bb = createByteBuffer(1, 1, 1);
+        ByteBufList bb = createByteBuffer(1, 1, 1);
         bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, 
BookieProtocol.FLAG_NONE);
         synchronized (arc) {
             arc.wait(1000);
@@ -242,13 +243,13 @@ public class BookieClientTest {
         }
     }
 
-    private ByteBuf createByteBuffer(int i, long lid, long eid) {
+    private ByteBufList createByteBuffer(int i, long lid, long eid) {
         ByteBuf bb = Unpooled.buffer(4 + 24);
         bb.writeLong(lid);
         bb.writeLong(eid);
         bb.writeLong(eid - 1);
         bb.writeInt(i);
-        return bb;
+        return ByteBufList.get(bb);
     }
 
     @Test
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
new file mode 100644
index 0000000..19c841b
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.EventExecutor;
+
+import java.net.SocketAddress;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link ByteBufList}.
+ */
+public class ByteBufListTest {
+    @Test
+    public void testSingle() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBufList buf = ByteBufList.get(b1);
+
+        assertEquals(1, buf.size());
+        assertEquals(128, buf.readableBytes());
+        assertEquals(b1, buf.getBuffer(0));
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+    }
+
+    @Test
+    public void testDouble() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+        ByteBufList buf = ByteBufList.get(b1, b2);
+
+        assertEquals(2, buf.size());
+        assertEquals(256, buf.readableBytes());
+        assertEquals(b1, buf.getBuffer(0));
+        assertEquals(b2, buf.getBuffer(1));
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+        assertEquals(b2.refCnt(), 1);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+        assertEquals(b2.refCnt(), 0);
+    }
+
+    @Test
+    public void testClone() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+        ByteBufList buf = ByteBufList.get(b1, b2);
+
+        ByteBufList clone = ByteBufList.clone(buf);
+
+        assertEquals(2, buf.size());
+        assertEquals(256, buf.readableBytes());
+        assertEquals(b1, buf.getBuffer(0));
+        assertEquals(b2, buf.getBuffer(1));
+
+        assertEquals(2, clone.size());
+        assertEquals(256, clone.readableBytes());
+        assertEquals(b1, clone.getBuffer(0));
+        assertEquals(b2, clone.getBuffer(1));
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(clone.refCnt(), 1);
+        assertEquals(b1.refCnt(), 2);
+        assertEquals(b2.refCnt(), 2);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(clone.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+        assertEquals(b2.refCnt(), 1);
+
+        clone.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(clone.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+        assertEquals(b2.refCnt(), 0);
+    }
+
+    @Test
+    public void testGetBytes() throws Exception {
+        ByteBufList buf = 
ByteBufList.get(Unpooled.wrappedBuffer("hello".getBytes()),
+                Unpooled.wrappedBuffer("world".getBytes()));
+
+        assertArrayEquals("helloworld".getBytes(), buf.toArray());
+
+        buf.prepend(Unpooled.wrappedBuffer("prefix-".getBytes()));
+        assertArrayEquals("prefix-helloworld".getBytes(), buf.toArray());
+
+        // Bigger buffer
+        byte[] buf100 = new byte[100];
+        int res = buf.getBytes(buf100);
+
+        assertEquals("prefix-helloworld".length(), res);
+
+        // Smaller buffer
+        byte[] buf4 = new byte[4];
+        res = buf.getBytes(buf4);
+
+        assertEquals(4, res);
+        assertEquals("pref", new String(buf4));
+    }
+
+    @Test
+    public void testCoalesce() throws Exception {
+        ByteBufList buf = 
ByteBufList.get(Unpooled.wrappedBuffer("hello".getBytes()),
+                Unpooled.wrappedBuffer("world".getBytes()));
+
+        assertEquals(Unpooled.wrappedBuffer("helloworld".getBytes()), 
ByteBufList.coalesce(buf));
+    }
+
+    @Test
+    public void testRetain() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBufList buf = ByteBufList.get(b1);
+
+        assertEquals(1, buf.size());
+        assertEquals(128, buf.readableBytes());
+        assertEquals(b1, buf.getBuffer(0));
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+
+        buf.retain();
+
+        assertEquals(buf.refCnt(), 2);
+        assertEquals(b1.refCnt(), 1);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+    }
+
+    @Test
+    public void testEncoder() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+        ByteBufList buf = ByteBufList.get(b1, b2);
+
+        ChannelHandlerContext ctx = new MockChannelHandlerContext();
+
+        ByteBufList.ENCODER.write(ctx, buf, null);
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+        assertEquals(b2.refCnt(), 0);
+    }
+
+    class MockChannelHandlerContext implements ChannelHandlerContext {
+        @Override
+        public ChannelFuture bind(SocketAddress localAddress) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, 
SocketAddress localAddress) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture disconnect() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture close() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture deregister() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise 
promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, 
ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, 
SocketAddress localAddress, ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture disconnect(ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture close(ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture deregister(ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture write(Object msg) {
+            ReferenceCountUtil.safeRelease(msg);
+            return null;
+        }
+
+        @Override
+        public ChannelFuture write(Object msg, ChannelPromise promise) {
+            ReferenceCountUtil.safeRelease(msg);
+            return null;
+        }
+
+        @Override
+        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) 
{
+            ReferenceCountUtil.safeRelease(msg);
+            return null;
+        }
+
+        @Override
+        public ChannelFuture writeAndFlush(Object msg) {
+            ReferenceCountUtil.safeRelease(msg);
+            return null;
+        }
+
+        @Override
+        public ChannelPromise newPromise() {
+            return null;
+        }
+
+        @Override
+        public ChannelProgressivePromise newProgressivePromise() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture newSucceededFuture() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture newFailedFuture(Throwable cause) {
+            return null;
+        }
+
+        @Override
+        public ChannelPromise voidPromise() {
+            return null;
+        }
+
+        @Override
+        public Channel channel() {
+            return null;
+        }
+
+        @Override
+        public EventExecutor executor() {
+            return null;
+        }
+
+        @Override
+        public String name() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandler handler() {
+            return null;
+        }
+
+        @Override
+        public boolean isRemoved() {
+            return false;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelRegistered() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelUnregistered() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelActive() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelInactive() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireUserEventTriggered(Object evt) {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelRead(Object msg) {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelReadComplete() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelWritabilityChanged() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext read() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext flush() {
+            return null;
+        }
+
+        @Override
+        public ChannelPipeline pipeline() {
+            return null;
+        }
+
+        @Override
+        public ByteBufAllocator alloc() {
+            return null;
+        }
+
+        @Override
+        @Deprecated
+        public <T> Attribute<T> attr(AttributeKey<T> key) {
+            return null;
+        }
+
+        @Override
+        @Deprecated
+        public <T> boolean hasAttr(AttributeKey<T> key) {
+            return false;
+        }
+
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
deleted file mode 100644
index 82e706f..0000000
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.bookkeeper.util;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-
-/**
- * Test the Double byte buffer.
- */
-public class DoubleByteBufTest {
-
-    @Test
-    public void testGetBytes() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 4, 5, 6 });
-        doTest(b1, b2);
-    }
-
-    @Test
-    public void testGetBytesWithDoubleByteBufAssource() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
-        ByteBuf b3 = Unpooled.wrappedBuffer(new byte[] { 5, 6 });
-
-        ByteBuf b23 = DoubleByteBuf.get(b2, b3);
-        doTest(b1, b23);
-    }
-
-    @Test
-    public void testGetBytesWithIndex() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 9, 9, 4, 5, 6 });
-
-        // Skip the two '9' from b2
-        b2.readByte();
-        b2.readByte();
-
-        doTest(b1, b2);
-    }
-
-    private void doTest(ByteBuf b1, ByteBuf b2) {
-        ByteBuf buf = DoubleByteBuf.get(b1, b2);
-
-        assertEquals(6, buf.readableBytes());
-        assertEquals(0, buf.writableBytes());
-
-        ByteBuf dst1 = Unpooled.buffer(6);
-        buf.getBytes(0, dst1);
-        assertEquals(6, dst1.readableBytes());
-        assertEquals(0, dst1.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6 }), 
dst1);
-
-        ByteBuf dst2 = Unpooled.buffer(6);
-        buf.getBytes(0, dst2, 4);
-        assertEquals(4, dst2.readableBytes());
-        assertEquals(2, dst2.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4 }), dst2);
-
-        ByteBuf dst3 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
-        buf.getBytes(0, dst3, 1, 4);
-        assertEquals(6, dst3.readableBytes());
-        assertEquals(0, dst3.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 0 }), 
dst3);
-
-        ByteBuf dst4 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
-        buf.getBytes(2, dst4, 1, 3);
-        assertEquals(6, dst4.readableBytes());
-        assertEquals(0, dst4.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 3, 4, 5, 0, 0 }), 
dst4);
-
-        ByteBuf dst5 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
-        buf.getBytes(3, dst5, 1, 3);
-        assertEquals(6, dst5.readableBytes());
-        assertEquals(0, dst5.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 4, 5, 6, 0, 0 }), 
dst5);
-    }
-
-    @Test
-    public void testCopyToArray() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-
-        byte[] a1 = new byte[4];
-        b.getBytes(0, a1);
-        assertArrayEquals(new byte[] { 1, 2, 3, 4 }, a1);
-
-        byte[] a2 = new byte[3];
-        b.getBytes(1, a2);
-        assertArrayEquals(new byte[] { 2, 3, 4 }, a2);
-    }
-
-    @Test
-    public void testToByteBuffer() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-
-        assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }), 
b.nioBuffer());
-    }
-
-    @Test
-    public void testNonDirectNioBuffer() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-        assertFalse(b1.isDirect());
-        assertFalse(b2.isDirect());
-        assertFalse(b.isDirect());
-        ByteBuffer nioBuffer = b.nioBuffer();
-        assertFalse(nioBuffer.isDirect());
-    }
-
-    @Test
-    public void testNonDirectPlusDirectNioBuffer() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.directBuffer(2);
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-        assertFalse(b1.isDirect());
-        assertTrue(b2.isDirect());
-        assertFalse(b.isDirect());
-        ByteBuffer nioBuffer = b.nioBuffer();
-        assertFalse(nioBuffer.isDirect());
-    }
-
-    @Test
-    public void testDirectPlusNonDirectNioBuffer() {
-        ByteBuf b1 = Unpooled.directBuffer(2);
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-        assertTrue(b1.isDirect());
-        assertFalse(b2.isDirect());
-        assertFalse(b.isDirect());
-        ByteBuffer nioBuffer = b.nioBuffer();
-        assertFalse(nioBuffer.isDirect());
-    }
-
-    @Test
-    public void testDirectNioBuffer() {
-        ByteBuf b1 = Unpooled.directBuffer(2);
-        ByteBuf b2 = Unpooled.directBuffer(2);
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-        assertTrue(b1.isDirect());
-        assertTrue(b2.isDirect());
-        assertTrue(b.isDirect());
-    }
-
-    /**
-     * Verify that readableBytes() returns writerIndex - readerIndex. In this 
case writerIndex is the end of the buffer
-     * and readerIndex is increased by 64.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testReadableBytes() throws Exception {
-        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
-        b1.writerIndex(b1.capacity());
-        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
-        b2.writerIndex(b2.capacity());
-        ByteBuf buf = DoubleByteBuf.get(b1, b2);
-
-        assertEquals(buf.readerIndex(), 0);
-        assertEquals(buf.writerIndex(), 256);
-        assertEquals(buf.readableBytes(), 256);
-
-        for (int i = 0; i < 4; ++i) {
-            buf.skipBytes(64);
-            assertEquals(buf.readableBytes(), 256 - 64 * (i + 1));
-        }
-    }
-}
diff --git 
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
 
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
index 339f5d2..3995ea8 100644
--- 
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
+++ 
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
@@ -23,11 +23,11 @@ package org.apache.bookkeeper.proto.checksum;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
-import org.apache.bookkeeper.util.DoubleByteBuf;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -97,7 +97,7 @@ public class DigestTypeBenchmark {
         private DigestManager mac;
 
         private ByteBuf arrayBackedBuffer;
-        private ByteBuf notArrayBackedBuffer;
+        private CompositeByteBuf notArrayBackedBuffer;
         private ByteBuf byteBufDefaultAlloc;
 
         public ByteBuf digestBuf;
@@ -119,8 +119,9 @@ public class DigestTypeBenchmark {
             arrayBackedBuffer = Unpooled.wrappedBuffer(randomBytes(entrySize));
 
             final int headerSize = 32 + 
getDigestManager(digest).getMacCodeLength();
-            notArrayBackedBuffer = 
DoubleByteBuf.get(Unpooled.wrappedBuffer(randomBytes(headerSize)),
-                    Unpooled.wrappedBuffer((randomBytes(entrySize - 
headerSize))));
+            notArrayBackedBuffer = new 
CompositeByteBuf(ByteBufAllocator.DEFAULT, true, 2);
+            
notArrayBackedBuffer.addComponent(Unpooled.wrappedBuffer(randomBytes(headerSize)));
+            
notArrayBackedBuffer.addComponent(Unpooled.wrappedBuffer((randomBytes(entrySize 
- headerSize))));
 
             byteBufDefaultAlloc = ByteBufAllocator.DEFAULT.buffer(entrySize, 
entrySize);
             byteBufDefaultAlloc.writeBytes(randomBytes(entrySize));

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to