This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new df71247 Replace DoubleByteBuf with ByteBufList
df71247 is described below
commit df71247a8c9644bbeda7d18d833fde4bef88fb3d
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Feb 18 04:08:20 2018 -0800
Replace DoubleByteBuf with ByteBufList
This PR aims at a better approach to fix the issue reported in #1108
Instead of using `CompositeByteBuf` which has the downside of needing
multiple object allocations per each usage (~7-8), we should have a way to
aggregate multiple buffers into a single "logical" frame with no additional
overhead.
This PR introduces a `ByteBufList` class which is just a holder of a list
of buffers, without imitating the `ByteBuf` API. You can pass a `ByteBufList`
to the channel and have the encoder take care of the multiple writes.
Instances of ByteBufList are ref-counted and released to a pool after the
write is completed. There are no object allocations or memory copies involved.
Author: Matteo Merli <[email protected]>
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Venkateswararao Jujjuri
(JV) <None>
This closes #1141 from merlimat/bytebuf-list-master
---
.../apache/bookkeeper/benchmark/BenchBookie.java | 7 +-
.../bookkeeper/client/ExplicitLacFlushPolicy.java | 6 +-
.../client/LedgerFragmentReplicator.java | 8 +-
.../org/apache/bookkeeper/client/PendingAddOp.java | 3 +-
.../bookkeeper/client/PendingWriteLacOp.java | 7 +-
.../org/apache/bookkeeper/proto/BookieClient.java | 11 +-
.../apache/bookkeeper/proto/BookieNettyServer.java | 4 +
.../bookkeeper/proto/BookieProtoEncoding.java | 13 +-
.../apache/bookkeeper/proto/BookieProtocol.java | 63 ++-
.../bookkeeper/proto/PerChannelBookieClient.java | 11 +-
.../bookkeeper/proto/WriteEntryProcessor.java | 4 +-
.../bookkeeper/proto/checksum/DigestManager.java | 14 +-
.../org/apache/bookkeeper/util/ByteBufList.java | 304 +++++++++++++++
.../org/apache/bookkeeper/util/DoubleByteBuf.java | 39 --
.../org/apache/bookkeeper/client/ClientUtil.java | 15 +-
.../bookkeeper/client/LedgerHandleAdapter.java | 9 +
.../bookkeeper/client/MockBookKeeperTestCase.java | 14 +-
.../client/ParallelLedgerRecoveryTest.java | 3 +-
.../apache/bookkeeper/test/BookieClientTest.java | 7 +-
.../apache/bookkeeper/util/ByteBufListTest.java | 422 +++++++++++++++++++++
.../apache/bookkeeper/util/DoubleByteBufTest.java | 197 ----------
.../proto/checksum/DigestTypeBenchmark.java | 9 +-
22 files changed, 871 insertions(+), 299 deletions(-)
diff --git
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 08f548c..db8ccbf 100644
---
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -186,7 +187,7 @@ public class BenchBookie {
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
bc.addEntry(new BookieSocketAddress(addr, port), ledger, new
byte[20],
- entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+ entry, ByteBufList.get(toSend), tc, null,
BookieProtocol.FLAG_NONE);
}
LOG.info("Waiting for warmup");
tc.waitFor(warmUpCount);
@@ -203,7 +204,7 @@ public class BenchBookie {
toSend.writerIndex(toSend.capacity());
lc.resetComplete();
bc.addEntry(new BookieSocketAddress(addr, port), ledger, new
byte[20],
- entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
+ entry, ByteBufList.get(toSend), lc, null,
BookieProtocol.FLAG_NONE);
lc.waitForComplete();
}
long endTime = System.nanoTime();
@@ -221,7 +222,7 @@ public class BenchBookie {
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
bc.addEntry(new BookieSocketAddress(addr, port), ledger, new
byte[20],
- entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+ entry, ByteBufList.get(toSend), tc, null,
BookieProtocol.FLAG_NONE);
}
tc.waitFor(throughputCount);
endTime = System.currentTimeMillis();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index 5acaeae..c77843a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -20,13 +20,12 @@
*/
package org.apache.bookkeeper.client;
-import io.netty.buffer.ByteBuf;
-
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.SyncCallbackUtils.LastAddConfirmedCallback;
+import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,7 +138,8 @@ interface ExplicitLacFlushPolicy {
lh.bk.getMainWorkerPool().submit(new SafeRunnable() {
@Override
public void safeRun() {
- ByteBuf toSend =
lh.macManager.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
+ ByteBufList toSend = lh.macManager
+
.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
op.initiate(toSend);
}
});
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 31d8e21..5c2ff8c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -21,8 +21,8 @@ package org.apache.bookkeeper.client;
import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
-import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -44,6 +45,7 @@ import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
import
org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
@@ -313,13 +315,13 @@ public class LedgerFragmentReplicator {
final long dataLength = data.length;
numEntriesRead.inc();
numBytesRead.registerSuccessfulValue(dataLength);
- ByteBuf toSend = lh.getDigestManager()
+ ByteBufList toSend = lh.getDigestManager()
.computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
for (BookieSocketAddress newBookie : newBookies) {
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
- lh.getLedgerKey(), entryId, toSend.retainedSlice(),
+ lh.getLedgerKey(), entryId,
ByteBufList.clone(toSend),
multiWriteCallback, dataLength,
BookieProtocol.FLAG_RECOVERY_ADD);
}
toSend.release();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 639d042..adf67a8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
@@ -54,7 +55,7 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback {
private static final Logger LOG =
LoggerFactory.getLogger(PendingAddOp.class);
ByteBuf payload;
- ByteBuf toSend;
+ ByteBufList toSend;
AddCallbackWithLatency cb;
Object ctx;
long entryId;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index 950959b..af45e29 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -17,14 +17,13 @@
*/
package org.apache.bookkeeper.client;
-import io.netty.buffer.ByteBuf;
-
import java.util.BitSet;
import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
import org.apache.bookkeeper.net.BookieSocketAddress;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory;
*/
class PendingWriteLacOp implements WriteLacCallback {
private static final Logger LOG =
LoggerFactory.getLogger(PendingWriteLacOp.class);
- ByteBuf toSend;
+ ByteBufList toSend;
AddLacCallback cb;
long lac;
Object ctx;
@@ -75,7 +74,7 @@ class PendingWriteLacOp implements WriteLacCallback {
lac, toSend, this, bookieIndex);
}
- void initiate(ByteBuf toSend) {
+ void initiate(ByteBufList toSend) {
this.toSend = toSend;
DistributionSchedule.WriteSet writeSet =
lh.distributionSchedule.getWriteSet(lac);
try {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 08ed194..1623324 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -60,6 +60,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
+import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
@@ -187,7 +188,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
}
public void writeLac(final BookieSocketAddress addr, final long ledgerId,
final byte[] masterKey,
- final long lac, final ByteBuf toSend, final WriteLacCallback cb,
final Object ctx) {
+ final long lac, final ByteBufList toSend, final WriteLacCallback
cb, final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
@@ -250,7 +251,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
final long ledgerId,
final byte[] masterKey,
final long entryId,
- final ByteBuf toSend,
+ final ByteBufList toSend,
final WriteCallback cb,
final Object ctx,
final int options) {
@@ -300,7 +301,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
private BookieClient bookieClient;
- private ByteBuf toSend;
+ private ByteBufList toSend;
private long ledgerId;
private long entryId;
private BookieSocketAddress addr;
@@ -310,7 +311,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
private byte[] masterKey;
static ChannelReadyForAddEntryCallback create(
- BookieClient bookieClient, ByteBuf toSend, long ledgerId,
+ BookieClient bookieClient, ByteBufList toSend, long ledgerId,
long entryId, BookieSocketAddress addr, Object ctx,
WriteCallback cb, int options, byte[] masterKey) {
ChannelReadyForAddEntryCallback callback = RECYCLER.get();
@@ -619,7 +620,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
for (int i = 0; i < 100000; i++) {
counter.inc();
- bc.addEntry(addr, ledger, new byte[0], i,
Unpooled.wrappedBuffer(hello), cb, counter, 0);
+ bc.addEntry(addr, ledger, new byte[0], i,
ByteBufList.get(Unpooled.wrappedBuffer(hello)), cb, counter, 0);
}
counter.wait(0);
System.out.println("Total = " + counter.total());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 3c5385c..83a3777 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -73,6 +73,7 @@ import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.bookkeeper.util.ByteBufList;
import org.apache.commons.lang.SystemUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -302,6 +303,9 @@ class BookieNettyServer {
new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();
+ // For ByteBufList, skip the usual LengthFieldPrepender
and have the encoder itself to add it
+ pipeline.addLast("bytebufList",
ByteBufList.ENCODER_WITH_SIZE);
+
pipeline.addLast("lengthbaseddecoder", new
LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new
LengthFieldPrepender(4));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 33bb675..24ef117 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -41,7 +41,7 @@ import java.util.List;
import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
import org.apache.bookkeeper.proto.checksum.MacDigestManager;
-import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,9 +111,10 @@ public class BookieProtoEncoding {
ByteBuf buf = allocator.buffer(totalHeaderSize);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), r.getFlags()));
buf.writeBytes(r.getMasterKey(), 0,
BookieProtocol.MASTER_KEY_LENGTH);
- ByteBuf data = ar.getData();
+ ByteBufList data = ar.getData();
ar.recycle();
- return DoubleByteBuf.get(buf, data);
+ data.prepend(buf);
+ return data;
} else if (r instanceof BookieProtocol.ReadRequest) {
int totalHeaderSize = 4 // for request type
+ 8 // for ledgerId
@@ -166,7 +167,7 @@ public class BookieProtoEncoding {
// Read ledger and entry id without advancing the reader index
ledgerId = packet.getLong(packet.readerIndex());
entryId = packet.getLong(packet.readerIndex() + 8);
- return BookieProtocol.AddRequest.create(
+ return BookieProtocol.ParsedAddRequest.create(
version, ledgerId, entryId, flags,
masterKey, packet.retain());
}
@@ -246,7 +247,7 @@ public class BookieProtoEncoding {
BookieProtocol.ReadResponse rr =
(BookieProtocol.ReadResponse) r;
if (rr.hasData()) {
- return DoubleByteBuf.get(buf, rr.getData());
+ return ByteBufList.get(buf, rr.getData());
} else {
return buf;
}
@@ -258,7 +259,7 @@ public class BookieProtoEncoding {
return buf;
} else if (msg instanceof BookieProtocol.AuthResponse) {
BookkeeperProtocol.AuthMessage am =
((BookieProtocol.AuthResponse) r).getAuthMessage();
- return DoubleByteBuf.get(buf,
Unpooled.wrappedBuffer(am.toByteArray()));
+ return ByteBufList.get(buf,
Unpooled.wrappedBuffer(am.toByteArray()));
} else {
LOG.error("Cannot encode unknown response type {}",
msg.getClass().getName());
return msg;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 4ad23d9..3136c57 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -26,6 +26,7 @@ import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ByteBufList;
/**
* The packets of the Bookie protocol all have a 4-byte integer indicating the
@@ -244,11 +245,11 @@ public interface BookieProtocol {
* A Request that adds data.
*/
class AddRequest extends Request {
- ByteBuf data;
+ ByteBufList data;
static AddRequest create(byte protocolVersion, long ledgerId,
long entryId, short flags, byte[] masterKey,
- ByteBuf data) {
+ ByteBufList data) {
AddRequest add = RECYCLER.get();
add.protocolVersion = protocolVersion;
add.opCode = ADDENTRY;
@@ -260,8 +261,9 @@ public interface BookieProtocol {
return add;
}
- ByteBuf getData() {
- return data;
+ ByteBufList getData() {
+ // We need to have different ByteBufList instances for each bookie
write
+ return ByteBufList.clone(data);
}
boolean isRecoveryAdd() {
@@ -294,6 +296,59 @@ public interface BookieProtocol {
}
/**
+ * This is similar to add request, but it used when processing the request
on the bookie side.
+ */
+ class ParsedAddRequest extends Request {
+ ByteBuf data;
+
+ static ParsedAddRequest create(byte protocolVersion, long ledgerId,
long entryId, short flags, byte[] masterKey,
+ ByteBuf data) {
+ ParsedAddRequest add = RECYCLER.get();
+ add.protocolVersion = protocolVersion;
+ add.opCode = ADDENTRY;
+ add.ledgerId = ledgerId;
+ add.entryId = entryId;
+ add.flags = flags;
+ add.masterKey = masterKey;
+ add.data = data.retain();
+ return add;
+ }
+
+ ByteBuf getData() {
+ // We need to have different ByteBufList instances for each bookie
write
+ return data;
+ }
+
+ boolean isRecoveryAdd() {
+ return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
+ }
+
+ void release() {
+ data.release();
+ }
+
+ private final Handle<ParsedAddRequest> recyclerHandle;
+ private ParsedAddRequest(Handle<ParsedAddRequest> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<ParsedAddRequest> RECYCLER = new
Recycler<ParsedAddRequest>() {
+ protected ParsedAddRequest newObject(Handle<ParsedAddRequest>
handle) {
+ return new ParsedAddRequest(handle);
+ }
+ };
+
+ @Override
+ public void recycle() {
+ ledgerId = -1;
+ entryId = -1;
+ masterKey = null;
+ data = null;
+ recyclerHandle.recycle(this);
+ }
+ }
+
+ /**
* A Request that reads data.
*/
class ReadRequest extends Request {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 75b3193..666b644 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -116,6 +116,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
+import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
@@ -423,6 +424,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4,
0, 4));
pipeline.addLast("lengthprepender", new
LengthFieldPrepender(4));
@@ -494,7 +496,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
- void writeLac(final long ledgerId, final byte[] masterKey, final long lac,
ByteBuf toSend, WriteLacCallback cb,
+ void writeLac(final long ledgerId, final byte[] masterKey, final long lac,
ByteBufList toSend, WriteLacCallback cb,
Object ctx) {
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId,
@@ -513,7 +515,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.setLedgerId(ledgerId)
.setLac(lac)
.setMasterKey(ByteString.copyFrom(masterKey))
- .setBody(ByteString.copyFrom(toSend.nioBuffer()));
+ .setBody(ByteString.copyFrom(toSend.toArray()));
final Request writeLacRequest = Request.newBuilder()
.setHeader(headerBuilder)
@@ -541,7 +543,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
* @param options
* Add options
*/
- void addEntry(final long ledgerId, byte[] masterKey, final long entryId,
ByteBuf toSend, WriteCallback cb,
+ void addEntry(final long ledgerId, byte[] masterKey, final long entryId,
ByteBufList toSend, WriteCallback cb,
Object ctx, final int options) {
Object request = null;
CompletionKey completionKey = null;
@@ -560,8 +562,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.setOperation(OperationType.ADD_ENTRY)
.setTxnId(txnId);
- byte[] toSendArray = new byte[toSend.readableBytes()];
- toSend.getBytes(toSend.readerIndex(), toSendArray);
+ byte[] toSendArray = toSend.toArray();
AddRequest.Builder addBuilder = AddRequest.newBuilder()
.setLedgerId(ledgerId)
.setEntryId(entryId)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 34e2c2c..164a11b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -56,8 +56,8 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
@Override
protected void processPacket() {
- assert (request instanceof BookieProtocol.AddRequest);
- BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) request;
+ assert (request instanceof BookieProtocol.ParsedAddRequest);
+ BookieProtocol.ParsedAddRequest add =
(BookieProtocol.ParsedAddRequest) request;
if (requestProcessor.bookie.isReadOnly()) {
LOG.warn("BookieServer is running in readonly mode,"
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index cf6b315..2f85e4f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.proto.checksum;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,6 +15,7 @@ package org.apache.bookkeeper.proto.checksum;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.bookkeeper.proto.checksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
@@ -27,7 +26,7 @@ import java.security.GeneralSecurityException;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
-import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +87,8 @@ public abstract class DigestManager {
* @param data
* @return
*/
- public ByteBuf computeDigestAndPackageForSending(long entryId, long
lastAddConfirmed, long length, ByteBuf data) {
+ public ByteBufList computeDigestAndPackageForSending(long entryId, long
lastAddConfirmed, long length,
+ ByteBuf data) {
ByteBuf headersBuffer =
PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
headersBuffer.writeLong(ledgerId);
headersBuffer.writeLong(entryId);
@@ -99,7 +99,7 @@ public abstract class DigestManager {
update(data);
populateValueAndReset(headersBuffer);
- return DoubleByteBuf.get(headersBuffer, data);
+ return ByteBufList.get(headersBuffer, data);
}
/**
@@ -109,7 +109,7 @@ public abstract class DigestManager {
* @return
*/
- public ByteBuf computeDigestAndPackageForSendingLac(long lac) {
+ public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
ByteBuf headersBuffer =
PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
headersBuffer.writeLong(ledgerId);
headersBuffer.writeLong(lac);
@@ -117,7 +117,7 @@ public abstract class DigestManager {
update(headersBuffer);
populateValueAndReset(headersBuffer);
- return headersBuffer;
+ return ByteBufList.get(headersBuffer);
}
private void verifyDigest(ByteBuf dataReceived) throws
BKDigestMatchException {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
new file mode 100644
index 0000000..37d4c72
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
+
+import java.util.ArrayList;
+
+/**
+ * ByteBufList is a holder of a sequence of {@link ByteBuf} objects.
+ *
+ * <p>This class doesn't trying to mimic the {@link ByteBuf}, but rather
exposes itself just like a regular object which
+ * will need to be encoded on the channel. There are 2 utility encoders:
+ * <ul>
+ * <li>{@link #ENCODER}: regular encode that will write all the buffers in the
{@link ByteBufList} on the channel</li>
+ * <li>{@link #ENCODER_WITH_SIZE}: similar to the previous one, but also
prepend a 4 bytes size header, once, carrying
+ * the size of the readable bytes across all the buffers contained in the
{@link ByteBufList}</li>
+ * </ul>
+ *
+ * <p>Example:
+ *
+ * <pre>
+ * bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+ * public void initChannel(SocketChannel ch) throws Exception {
+ * ChannelPipeline pipeline = ch.pipeline();
+ * pipeline.addLast("bytebufList", ByteBufList.ENCODER);
+ * pipeline.addLast("mainhandler", MyHandler.class);
+ * }
+ * });
+ * </pre>
+ *
+ * <p>ByteBufList is pooling the instances and uses ref-counting to release
them.
+ */
+public class ByteBufList extends AbstractReferenceCounted {
+ private final ArrayList<ByteBuf> buffers;
+ private final Handle<ByteBufList> recyclerHandle;
+
+ private static final int INITIAL_LIST_SIZE = 4;
+
+ private static final Recycler<ByteBufList> RECYCLER = new
Recycler<ByteBufList>() {
+ @Override
+ protected ByteBufList newObject(Recycler.Handle<ByteBufList> handle) {
+ return new ByteBufList(handle);
+ }
+ };
+
+ private ByteBufList(Handle<ByteBufList> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ this.buffers = new ArrayList<>(INITIAL_LIST_SIZE);
+ }
+
+ /**
+ * Get a new {@link ByteBufList} from the pool and assign 2 buffers to it.
+ *
+ * <p>The buffers b1 and b2 lifecycles are now managed by the ByteBufList:
when the {@link ByteBufList} is
+ * deallocated, b1 and b2 will be released as well.
+ *
+ * @param b1
+ * first buffer
+ * @param b2
+ * second buffer
+ * @return a {@link ByteBufList} instance from the pool
+ */
+ public static ByteBufList get(ByteBuf b1, ByteBuf b2) {
+ ByteBufList buf = get();
+ buf.add(b1);
+ buf.add(b2);
+ return buf;
+ }
+
+ /**
+ * Get a new {@link ByteBufList} from the pool and assign 1 buffer to it.
+ *
+ * <p>The buffer b1 lifecycle is now managed by the ByteBufList: when the
{@link ByteBufList} is
+ * deallocated, b1 will be released as well.
+ *
+ * @param b1
+ * first buffer
+ * @return a {@link ByteBufList} instance from the pool
+ */
+ public static ByteBufList get(ByteBuf b1) {
+ ByteBufList buf = get();
+ buf.add(b1);
+ return buf;
+ }
+
+ /**
+ * Get a new {@link ByteBufList} instance from the pool that is the clone
of an already existing instance.
+ */
+ public static ByteBufList clone(ByteBufList other) {
+ ByteBufList buf = get();
+ for (int i = 0; i < other.buffers.size(); i++) {
+ // Create a duplicate of the buffer so that there is no
interference from other threads
+ buf.add(other.buffers.get(i).retainedDuplicate());
+ }
+ return buf;
+ }
+
+ private static ByteBufList get() {
+ ByteBufList buf = RECYCLER.get();
+ buf.setRefCnt(1);
+ return buf;
+ }
+
+ /**
+ * Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
+ */
+ public void add(ByteBuf buf) {
+ buffers.add(buf);
+ }
+
+ /**
+ * Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
+ */
+ public void prepend(ByteBuf buf) {
+ buffers.add(0, buf);
+ }
+
+ /**
+ * @return the total amount of readable bytes across all the {@link
ByteBuf} included in the list
+ */
+ public int readableBytes() {
+ int readableBytes = 0;
+ for (int i = 0; i < buffers.size(); i++) {
+ readableBytes += buffers.get(i).readableBytes();
+ }
+ return readableBytes;
+ }
+
+ /**
+ * Get access to a particular buffer in the list.
+ *
+ * @param index
+ * the index of the buffer
+ * @return the buffer
+ */
+ public ByteBuf getBuffer(int index) {
+ return buffers.get(index);
+ }
+
+ /**
+ * @return the number of buffers included in the {@link ByteBufList}
+ */
+ public int size() {
+ return buffers.size();
+ }
+
+ /**
+ * Write bytes from the current {@link ByteBufList} into a byte array.
+ *
+ * <p>This won't modify the reader index of the internal buffers.
+ *
+ * @param dst
+ * the destination byte array
+ * @return the number of copied bytes
+ */
+ public int getBytes(byte[] dst) {
+ int copied = 0;
+ for (int idx = 0; idx < buffers.size() && copied < dst.length; idx++) {
+ ByteBuf b = buffers.get(idx);
+ int len = Math.min(b.readableBytes(), dst.length - copied);
+ b.getBytes(b.readerIndex(), dst, copied, len);
+
+ copied += len;
+ }
+
+ return copied;
+ }
+
+ /**
+ * @return an array containing all the internal buffers content
+ */
+ public byte[] toArray() {
+ byte[] a = new byte[readableBytes()];
+ getBytes(a);
+ return a;
+ }
+
+ /**
+ * @return a single buffer with the content of both individual buffers
+ */
+ @VisibleForTesting
+ public static ByteBuf coalesce(ByteBufList list) {
+ ByteBuf res = Unpooled.buffer(list.readableBytes());
+ for (int i = 0; i < list.buffers.size(); i++) {
+ ByteBuf b = list.buffers.get(i);
+ res.writeBytes(b, b.readerIndex(), b.readableBytes());
+ }
+
+ return res;
+ }
+
+ @Override
+ public ByteBufList retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ protected void deallocate() {
+ for (int i = 0; i < buffers.size(); i++) {
+ buffers.get(i).release();
+ }
+
+ buffers.clear();
+ recyclerHandle.recycle(this);
+ }
+
+ @Override
+ public ReferenceCounted touch(Object hint) {
+ for (int i = 0; i < buffers.size(); i++) {
+ buffers.get(i).touch(hint);
+ }
+ return this;
+ }
+
+ /**
+ * Encoder for the {@link ByteBufList} that doesn't prepend any size
header.
+ */
+ public static final Encoder ENCODER = new Encoder(false);
+
+ /**
+ * Encoder for the {@link ByteBufList} that will prepend a 4 byte header
with the size of the whole
+ * {@link ByteBufList} readable bytes.
+ */
+ public static final Encoder ENCODER_WITH_SIZE = new Encoder(true);
+
+ /**
+ * {@link ByteBufList} encoder.
+ */
+ @Sharable
+ public static class Encoder extends ChannelOutboundHandlerAdapter {
+
+ private final boolean prependSize;
+
+ public Encoder(boolean prependSize) {
+ this.prependSize = prependSize;
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
+ if (msg instanceof ByteBufList) {
+ ByteBufList b = (ByteBufList) msg;
+
+ try {
+ if (prependSize) {
+ // Prepend the frame size before writing the buffer
list, so that we only have 1 single size
+ // header
+ ByteBuf sizeBuffer =
PooledByteBufAllocator.DEFAULT.directBuffer(4, 4);
+ sizeBuffer.writeInt(b.readableBytes());
+ ctx.write(sizeBuffer, ctx.voidPromise());
+ }
+
+ // Write each buffer individually on the socket. The
retain() here is needed to preserve the fact
+ // that ByteBuf are automatically released after a write.
If the ByteBufPair ref count is increased
+ // and it gets written multiple times, the individual
buffers refcount should be reflected as well.
+ int buffersCount = b.buffers.size();
+ for (int i = 0; i < buffersCount; i++) {
+ ByteBuf bx = b.buffers.get(i);
+ // Last buffer will carry on the final promise to
notify when everything was written on the
+ // socket
+ ctx.write(bx.retainedDuplicate(), i == (buffersCount -
1) ? promise : ctx.voidPromise());
+ }
+ } finally {
+ ReferenceCountUtil.safeRelease(b);
+ }
+ } else {
+ ctx.write(msg, promise);
+ }
+ }
+ }
+
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
deleted file mode 100644
index 7a95368..0000000
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.bookkeeper.util;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
-
-/**
- * Removed custom implementation of DoubleByteBuf, just relying on straight
regular CompositeByteBuf.
- */
-public class DoubleByteBuf extends CompositeByteBuf {
-
- public DoubleByteBuf(ByteBufAllocator alloc) {
- super(alloc, true, 2);
- }
-
- public static DoubleByteBuf get(ByteBuf b1, ByteBuf b2) {
- DoubleByteBuf cbb = new DoubleByteBuf(b1.alloc());
- cbb.addComponent(true, b1);
- cbb.addComponent(true, b2);
- return cbb;
- }
-}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 25842f1..8e2dd7d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -17,11 +17,14 @@
*/
package org.apache.bookkeeper.client;
-import static
org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType.CRC32;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+
import java.security.GeneralSecurityException;
+
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.ByteBufList;
/**
* Client utilities.
@@ -32,11 +35,11 @@ public class ClientUtil {
return generatePacket(ledgerId, entryId, lastAddConfirmed, length,
data, 0, data.length);
}
- public static ByteBuf generatePacket(long ledgerId, long entryId, long
lastAddConfirmed, long length,
- byte[] data, int offset, int len)
throws GeneralSecurityException {
- DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2],
CRC32);
- return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed,
length,
-
Unpooled.wrappedBuffer(data, offset, len));
+ public static ByteBuf generatePacket(long ledgerId, long entryId, long
lastAddConfirmed, long length, byte[] data,
+ int offset, int len) throws GeneralSecurityException {
+ DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2],
DigestType.CRC32);
+ return
ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId,
lastAddConfirmed, length,
+ Unpooled.wrappedBuffer(data, offset, len)));
}
/**
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
index 2c3fb7d..7098f4e 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
@@ -19,6 +19,10 @@
*/
package org.apache.bookkeeper.client;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.bookkeeper.util.ByteBufList;
+
/**
* Adapter for tests to get the public access from LedgerHandle for its default
* scope.
@@ -31,4 +35,9 @@ public class LedgerHandleAdapter {
public static LedgerMetadata getLedgerMetadata(LedgerHandle lh) {
return lh.getLedgerMetadata();
}
+
+ public static ByteBufList toSend(LedgerHandle lh, long entryId, ByteBuf
data) {
+ return
lh.getDigestManager().computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(),
+ lh.addToLength(data.readableBytes()), data);
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 9a89c70..f177648 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -60,6 +60,7 @@ import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
import org.junit.Before;
@@ -416,9 +417,10 @@ public abstract class MockBookKeeperTestCase {
if (mockEntry != null) {
LOG.info("readEntryAndFenceLedger - found mock entry {}@{}
at {}", entryId, ledgerId,
bookieSocketAddress);
- ByteBuf entry =
macManager.computeDigestAndPackageForSending(entryId,
mockEntry.lastAddConfirmed,
- mockEntry.payload.length,
Unpooled.wrappedBuffer(mockEntry.payload));
- callback.readEntryComplete(BKException.Code.OK, ledgerId,
entryId, Unpooled.copiedBuffer(entry),
+ ByteBufList entry =
macManager.computeDigestAndPackageForSending(entryId,
+ mockEntry.lastAddConfirmed,
mockEntry.payload.length,
+ Unpooled.wrappedBuffer(mockEntry.payload));
+ callback.readEntryComplete(BKException.Code.OK, ledgerId,
entryId, ByteBufList.coalesce(entry),
args[5]);
entry.release();
} else {
@@ -456,10 +458,10 @@ public abstract class MockBookKeeperTestCase {
}
if (mockEntry != null) {
LOG.info("readEntry - found mock entry {}@{} at {}",
entryId, ledgerId, bookieSocketAddress);
- ByteBuf entry =
macManager.computeDigestAndPackageForSending(entryId,
+ ByteBufList entry =
macManager.computeDigestAndPackageForSending(entryId,
mockEntry.lastAddConfirmed, mockEntry.payload.length,
Unpooled.wrappedBuffer(mockEntry.payload));
- callback.readEntryComplete(BKException.Code.OK, ledgerId,
entryId, Unpooled.copiedBuffer(entry),
+ callback.readEntryComplete(BKException.Code.OK, ledgerId,
entryId, ByteBufList.coalesce(entry),
args[4]);
entry.release();
} else {
@@ -532,7 +534,7 @@ public abstract class MockBookKeeperTestCase {
return null;
}).when(bookieClient).addEntry(any(BookieSocketAddress.class),
anyLong(), any(byte[].class),
- anyLong(), any(ByteBuf.class),
+ anyLong(), any(ByteBufList.class),
any(BookkeeperInternalCallbacks.WriteCallback.class),
any(), anyInt());
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 8e87921..8c49531 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -54,6 +54,7 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
@@ -360,7 +361,7 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
long entryId = 14;
long lac = 8;
byte[] data = "recovery-on-entry-gap-gap".getBytes(UTF_8);
- ByteBuf toSend =
+ ByteBufList toSend =
lh.macManager.computeDigestAndPackageForSending(
entryId, lac, lh.getLength() + 100,
Unpooled.wrappedBuffer(data, 0, data.length));
final CountDownLatch addLatch = new CountDownLatch(1);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 1e0750f..4cf2591 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -52,6 +52,7 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
@@ -156,7 +157,7 @@ public class BookieClientTest {
BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor,
scheduler,
NullStatsLogger.INSTANCE);
- ByteBuf bb = createByteBuffer(1, 1, 1);
+ ByteBufList bb = createByteBuffer(1, 1, 1);
bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc,
BookieProtocol.FLAG_NONE);
synchronized (arc) {
arc.wait(1000);
@@ -242,13 +243,13 @@ public class BookieClientTest {
}
}
- private ByteBuf createByteBuffer(int i, long lid, long eid) {
+ private ByteBufList createByteBuffer(int i, long lid, long eid) {
ByteBuf bb = Unpooled.buffer(4 + 24);
bb.writeLong(lid);
bb.writeLong(eid);
bb.writeLong(eid - 1);
bb.writeInt(i);
- return bb;
+ return ByteBufList.get(bb);
}
@Test
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
new file mode 100644
index 0000000..19c841b
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.EventExecutor;
+
+import java.net.SocketAddress;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link ByteBufList}.
+ */
+public class ByteBufListTest {
+ @Test
+ public void testSingle() throws Exception {
+ ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b1.writerIndex(b1.capacity());
+ ByteBufList buf = ByteBufList.get(b1);
+
+ assertEquals(1, buf.size());
+ assertEquals(128, buf.readableBytes());
+ assertEquals(b1, buf.getBuffer(0));
+
+ assertEquals(buf.refCnt(), 1);
+ assertEquals(b1.refCnt(), 1);
+
+ buf.release();
+
+ assertEquals(buf.refCnt(), 0);
+ assertEquals(b1.refCnt(), 0);
+ }
+
+ @Test
+ public void testDouble() throws Exception {
+ ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b1.writerIndex(b1.capacity());
+ ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b2.writerIndex(b2.capacity());
+ ByteBufList buf = ByteBufList.get(b1, b2);
+
+ assertEquals(2, buf.size());
+ assertEquals(256, buf.readableBytes());
+ assertEquals(b1, buf.getBuffer(0));
+ assertEquals(b2, buf.getBuffer(1));
+
+ assertEquals(buf.refCnt(), 1);
+ assertEquals(b1.refCnt(), 1);
+ assertEquals(b2.refCnt(), 1);
+
+ buf.release();
+
+ assertEquals(buf.refCnt(), 0);
+ assertEquals(b1.refCnt(), 0);
+ assertEquals(b2.refCnt(), 0);
+ }
+
+ @Test
+ public void testClone() throws Exception {
+ ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b1.writerIndex(b1.capacity());
+ ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b2.writerIndex(b2.capacity());
+ ByteBufList buf = ByteBufList.get(b1, b2);
+
+ ByteBufList clone = ByteBufList.clone(buf);
+
+ assertEquals(2, buf.size());
+ assertEquals(256, buf.readableBytes());
+ assertEquals(b1, buf.getBuffer(0));
+ assertEquals(b2, buf.getBuffer(1));
+
+ assertEquals(2, clone.size());
+ assertEquals(256, clone.readableBytes());
+ assertEquals(b1, clone.getBuffer(0));
+ assertEquals(b2, clone.getBuffer(1));
+
+ assertEquals(buf.refCnt(), 1);
+ assertEquals(clone.refCnt(), 1);
+ assertEquals(b1.refCnt(), 2);
+ assertEquals(b2.refCnt(), 2);
+
+ buf.release();
+
+ assertEquals(buf.refCnt(), 0);
+ assertEquals(clone.refCnt(), 1);
+ assertEquals(b1.refCnt(), 1);
+ assertEquals(b2.refCnt(), 1);
+
+ clone.release();
+
+ assertEquals(buf.refCnt(), 0);
+ assertEquals(clone.refCnt(), 0);
+ assertEquals(b1.refCnt(), 0);
+ assertEquals(b2.refCnt(), 0);
+ }
+
+ @Test
+ public void testGetBytes() throws Exception {
+ ByteBufList buf =
ByteBufList.get(Unpooled.wrappedBuffer("hello".getBytes()),
+ Unpooled.wrappedBuffer("world".getBytes()));
+
+ assertArrayEquals("helloworld".getBytes(), buf.toArray());
+
+ buf.prepend(Unpooled.wrappedBuffer("prefix-".getBytes()));
+ assertArrayEquals("prefix-helloworld".getBytes(), buf.toArray());
+
+ // Bigger buffer
+ byte[] buf100 = new byte[100];
+ int res = buf.getBytes(buf100);
+
+ assertEquals("prefix-helloworld".length(), res);
+
+ // Smaller buffer
+ byte[] buf4 = new byte[4];
+ res = buf.getBytes(buf4);
+
+ assertEquals(4, res);
+ assertEquals("pref", new String(buf4));
+ }
+
+ @Test
+ public void testCoalesce() throws Exception {
+ ByteBufList buf =
ByteBufList.get(Unpooled.wrappedBuffer("hello".getBytes()),
+ Unpooled.wrappedBuffer("world".getBytes()));
+
+ assertEquals(Unpooled.wrappedBuffer("helloworld".getBytes()),
ByteBufList.coalesce(buf));
+ }
+
+ @Test
+ public void testRetain() throws Exception {
+ ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b1.writerIndex(b1.capacity());
+ ByteBufList buf = ByteBufList.get(b1);
+
+ assertEquals(1, buf.size());
+ assertEquals(128, buf.readableBytes());
+ assertEquals(b1, buf.getBuffer(0));
+
+ assertEquals(buf.refCnt(), 1);
+ assertEquals(b1.refCnt(), 1);
+
+ buf.retain();
+
+ assertEquals(buf.refCnt(), 2);
+ assertEquals(b1.refCnt(), 1);
+
+ buf.release();
+
+ assertEquals(buf.refCnt(), 1);
+ assertEquals(b1.refCnt(), 1);
+
+ buf.release();
+
+ assertEquals(buf.refCnt(), 0);
+ assertEquals(b1.refCnt(), 0);
+ }
+
+ @Test
+ public void testEncoder() throws Exception {
+ ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b1.writerIndex(b1.capacity());
+ ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b2.writerIndex(b2.capacity());
+ ByteBufList buf = ByteBufList.get(b1, b2);
+
+ ChannelHandlerContext ctx = new MockChannelHandlerContext();
+
+ ByteBufList.ENCODER.write(ctx, buf, null);
+
+ assertEquals(buf.refCnt(), 0);
+ assertEquals(b1.refCnt(), 0);
+ assertEquals(b2.refCnt(), 0);
+ }
+
+ class MockChannelHandlerContext implements ChannelHandlerContext {
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
SocketAddress localAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress, ChannelPromise
promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+ ReferenceCountUtil.safeRelease(msg);
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+ ReferenceCountUtil.safeRelease(msg);
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise)
{
+ ReferenceCountUtil.safeRelease(msg);
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+ ReferenceCountUtil.safeRelease(msg);
+ return null;
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+ return null;
+ }
+
+ @Override
+ public Channel channel() {
+ return null;
+ }
+
+ @Override
+ public EventExecutor executor() {
+ return null;
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandler handler() {
+ return null;
+ }
+
+ @Override
+ public boolean isRemoved() {
+ return false;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelRegistered() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelUnregistered() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelActive() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelInactive() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireUserEventTriggered(Object evt) {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelRead(Object msg) {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelReadComplete() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelWritabilityChanged() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext read() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext flush() {
+ return null;
+ }
+
+ @Override
+ public ChannelPipeline pipeline() {
+ return null;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return null;
+ }
+
+ @Override
+ @Deprecated
+ public <T> Attribute<T> attr(AttributeKey<T> key) {
+ return null;
+ }
+
+ @Override
+ @Deprecated
+ public <T> boolean hasAttr(AttributeKey<T> key) {
+ return false;
+ }
+
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
deleted file mode 100644
index 82e706f..0000000
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.bookkeeper.util;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-
-/**
- * Test the Double byte buffer.
- */
-public class DoubleByteBufTest {
-
- @Test
- public void testGetBytes() {
- ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 });
- ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 4, 5, 6 });
- doTest(b1, b2);
- }
-
- @Test
- public void testGetBytesWithDoubleByteBufAssource() {
- ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
- ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
- ByteBuf b3 = Unpooled.wrappedBuffer(new byte[] { 5, 6 });
-
- ByteBuf b23 = DoubleByteBuf.get(b2, b3);
- doTest(b1, b23);
- }
-
- @Test
- public void testGetBytesWithIndex() {
- ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 });
- ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 9, 9, 4, 5, 6 });
-
- // Skip the two '9' from b2
- b2.readByte();
- b2.readByte();
-
- doTest(b1, b2);
- }
-
- private void doTest(ByteBuf b1, ByteBuf b2) {
- ByteBuf buf = DoubleByteBuf.get(b1, b2);
-
- assertEquals(6, buf.readableBytes());
- assertEquals(0, buf.writableBytes());
-
- ByteBuf dst1 = Unpooled.buffer(6);
- buf.getBytes(0, dst1);
- assertEquals(6, dst1.readableBytes());
- assertEquals(0, dst1.writableBytes());
- assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6 }),
dst1);
-
- ByteBuf dst2 = Unpooled.buffer(6);
- buf.getBytes(0, dst2, 4);
- assertEquals(4, dst2.readableBytes());
- assertEquals(2, dst2.writableBytes());
- assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4 }), dst2);
-
- ByteBuf dst3 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
- buf.getBytes(0, dst3, 1, 4);
- assertEquals(6, dst3.readableBytes());
- assertEquals(0, dst3.writableBytes());
- assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 0 }),
dst3);
-
- ByteBuf dst4 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
- buf.getBytes(2, dst4, 1, 3);
- assertEquals(6, dst4.readableBytes());
- assertEquals(0, dst4.writableBytes());
- assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 3, 4, 5, 0, 0 }),
dst4);
-
- ByteBuf dst5 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
- buf.getBytes(3, dst5, 1, 3);
- assertEquals(6, dst5.readableBytes());
- assertEquals(0, dst5.writableBytes());
- assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 4, 5, 6, 0, 0 }),
dst5);
- }
-
- @Test
- public void testCopyToArray() {
- ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
- ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
- ByteBuf b = DoubleByteBuf.get(b1, b2);
-
- byte[] a1 = new byte[4];
- b.getBytes(0, a1);
- assertArrayEquals(new byte[] { 1, 2, 3, 4 }, a1);
-
- byte[] a2 = new byte[3];
- b.getBytes(1, a2);
- assertArrayEquals(new byte[] { 2, 3, 4 }, a2);
- }
-
- @Test
- public void testToByteBuffer() {
- ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
- ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
- ByteBuf b = DoubleByteBuf.get(b1, b2);
-
- assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }),
b.nioBuffer());
- }
-
- @Test
- public void testNonDirectNioBuffer() {
- ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
- ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
- ByteBuf b = DoubleByteBuf.get(b1, b2);
- assertFalse(b1.isDirect());
- assertFalse(b2.isDirect());
- assertFalse(b.isDirect());
- ByteBuffer nioBuffer = b.nioBuffer();
- assertFalse(nioBuffer.isDirect());
- }
-
- @Test
- public void testNonDirectPlusDirectNioBuffer() {
- ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
- ByteBuf b2 = Unpooled.directBuffer(2);
- ByteBuf b = DoubleByteBuf.get(b1, b2);
- assertFalse(b1.isDirect());
- assertTrue(b2.isDirect());
- assertFalse(b.isDirect());
- ByteBuffer nioBuffer = b.nioBuffer();
- assertFalse(nioBuffer.isDirect());
- }
-
- @Test
- public void testDirectPlusNonDirectNioBuffer() {
- ByteBuf b1 = Unpooled.directBuffer(2);
- ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
- ByteBuf b = DoubleByteBuf.get(b1, b2);
- assertTrue(b1.isDirect());
- assertFalse(b2.isDirect());
- assertFalse(b.isDirect());
- ByteBuffer nioBuffer = b.nioBuffer();
- assertFalse(nioBuffer.isDirect());
- }
-
- @Test
- public void testDirectNioBuffer() {
- ByteBuf b1 = Unpooled.directBuffer(2);
- ByteBuf b2 = Unpooled.directBuffer(2);
- ByteBuf b = DoubleByteBuf.get(b1, b2);
- assertTrue(b1.isDirect());
- assertTrue(b2.isDirect());
- assertTrue(b.isDirect());
- }
-
- /**
- * Verify that readableBytes() returns writerIndex - readerIndex. In this
case writerIndex is the end of the buffer
- * and readerIndex is increased by 64.
- *
- * @throws Exception
- */
- @Test
- public void testReadableBytes() throws Exception {
- ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
- b1.writerIndex(b1.capacity());
- ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
- b2.writerIndex(b2.capacity());
- ByteBuf buf = DoubleByteBuf.get(b1, b2);
-
- assertEquals(buf.readerIndex(), 0);
- assertEquals(buf.writerIndex(), 256);
- assertEquals(buf.readableBytes(), 256);
-
- for (int i = 0; i < 4; ++i) {
- buf.skipBytes(64);
- assertEquals(buf.readableBytes(), 256 - 64 * (i + 1));
- }
- }
-}
diff --git
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
index 339f5d2..3995ea8 100644
---
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
+++
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
@@ -23,11 +23,11 @@ package org.apache.bookkeeper.proto.checksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
-import org.apache.bookkeeper.util.DoubleByteBuf;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -97,7 +97,7 @@ public class DigestTypeBenchmark {
private DigestManager mac;
private ByteBuf arrayBackedBuffer;
- private ByteBuf notArrayBackedBuffer;
+ private CompositeByteBuf notArrayBackedBuffer;
private ByteBuf byteBufDefaultAlloc;
public ByteBuf digestBuf;
@@ -119,8 +119,9 @@ public class DigestTypeBenchmark {
arrayBackedBuffer = Unpooled.wrappedBuffer(randomBytes(entrySize));
final int headerSize = 32 +
getDigestManager(digest).getMacCodeLength();
- notArrayBackedBuffer =
DoubleByteBuf.get(Unpooled.wrappedBuffer(randomBytes(headerSize)),
- Unpooled.wrappedBuffer((randomBytes(entrySize -
headerSize))));
+ notArrayBackedBuffer = new
CompositeByteBuf(ByteBufAllocator.DEFAULT, true, 2);
+
notArrayBackedBuffer.addComponent(Unpooled.wrappedBuffer(randomBytes(headerSize)));
+
notArrayBackedBuffer.addComponent(Unpooled.wrappedBuffer((randomBytes(entrySize
- headerSize))));
byteBufDefaultAlloc = ByteBufAllocator.DEFAULT.buffer(entrySize,
entrySize);
byteBufDefaultAlloc.writeBytes(randomBytes(entrySize));
--
To stop receiving notification emails like this one, please contact
[email protected].