This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new b7bfac8 Refactored EntryLogger Buffered channel
b7bfac8 is described below
commit b7bfac88ffae8acd82cae05fe6236f5a7db3a796
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Dec 8 18:19:23 2017 -0800
Refactored EntryLogger Buffered channel
Porting
https://github.com/yahoo/bookkeeper/commit/ecce14449b07e0b40434835bbc01d75d0a36880b
from Yahoo branch.
Refactored entry logger to always use pooled `ByteBuf` when reading entries
or scanning the log.
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie
Guo <[email protected]>
This closes #824 from merlimat/entry-logger-refactor and squashes the
following commits:
6ddb31d6 [Matteo Merli] Fix DoubleByteBuf readableBytes()
a6df2974 [Matteo Merli] Fixed padding length and journal test
bbe65eb8 [Matteo Merli] Removed BufferedReadChannel close since the buffer
was not pooled anyway
c50f4683 [Matteo Merli] Added required changes to uses of BufferedChannel
from Journal
6746d038 [Matteo Merli] Refactored EntryLogger Buffered channel
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 41 +--
.../apache/bookkeeper/bookie/BufferedChannel.java | 97 +++----
.../bookkeeper/bookie/BufferedChannelBase.java | 2 +-
.../bookkeeper/bookie/BufferedReadChannel.java | 41 +--
.../bookkeeper/bookie/EntryLogCompactor.java | 11 +-
.../org/apache/bookkeeper/bookie/EntryLogger.java | 321 ++++++++++-----------
.../apache/bookkeeper/bookie/EntryMemTable.java | 2 +-
.../bookie/InterleavedLedgerStorage.java | 10 +-
.../java/org/apache/bookkeeper/bookie/Journal.java | 38 ++-
.../apache/bookkeeper/bookie/SkipListFlusher.java | 5 +-
.../bookkeeper/bookie/SortedLedgerStorage.java | 3 +-
.../bookie/TransactionalEntryLogCompactor.java | 19 +-
.../org/apache/bookkeeper/util/DoubleByteBuf.java | 5 -
.../bookkeeper/bookie/BookieJournalTest.java | 75 +++--
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 24 +-
.../bookkeeper/bookie/TestEntryMemTable.java | 13 +-
.../org/apache/bookkeeper/bookie/UpgradeTest.java | 5 +-
.../apache/bookkeeper/util/DoubleByteBufTest.java | 25 ++
18 files changed, 374 insertions(+), 363 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index fa4d410..1d19a30 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -23,6 +23,10 @@ import static com.google.common.base.Charsets.UTF_8;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractFuture;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -52,6 +56,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
@@ -102,6 +107,7 @@ import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Bookie Shell is to provide utilities for users to administer a bookkeeper
cluster.
*/
@@ -2449,7 +2455,7 @@ public class BookieShell implements Tool {
return true;
}
@Override
- public void process(long ledgerId, long startPos, ByteBuffer
entry) {
+ public void process(long ledgerId, long startPos, ByteBuf entry) {
formatEntry(startPos, entry, printMsg);
}
});
@@ -2476,10 +2482,9 @@ public class BookieShell implements Tool {
}
@Override
- public void process(long ledgerId, long startPos, ByteBuffer
entry) {
- long entrysLedgerId = entry.getLong();
- long entrysEntryId = entry.getLong();
- entry.rewind();
+ public void process(long ledgerId, long startPos, ByteBuf entry) {
+ long entrysLedgerId = entry.getLong(entry.readerIndex());
+ long entrysEntryId = entry.getLong(entry.readerIndex() + 8);
if ((ledgerId == entrysLedgerId) && ((entrysEntryId ==
entryId)) || (entryId == -1)) {
entryFound.setValue(true);
formatEntry(startPos, entry, printMsg);
@@ -2515,12 +2520,12 @@ public class BookieShell implements Tool {
}
@Override
- public void process(long ledgerId, long entryStartPos, ByteBuffer
entry) {
+ public void process(long ledgerId, long entryStartPos, ByteBuf
entry) {
if (!stopScanning.booleanValue()) {
if ((rangeEndPos != -1) && (entryStartPos > rangeEndPos)) {
stopScanning.setValue(true);
} else {
- int entrySize = entry.limit();
+ int entrySize = entry.readableBytes();
/**
* entrySize of an entry (inclusive of payload and
* header) value is stored as int value in log file,
but
@@ -2562,7 +2567,7 @@ public class BookieShell implements Tool {
System.out.println("Journal Version : " + journalVersion);
printJournalVersion = true;
}
- formatEntry(offset, entry, printMsg);
+ formatEntry(offset, Unpooled.wrappedBuffer(entry), printMsg);
}
});
}
@@ -2608,17 +2613,17 @@ public class BookieShell implements Tool {
* @param printMsg
* Whether printing the message body
*/
- private void formatEntry(long pos, ByteBuffer recBuff, boolean printMsg) {
- long ledgerId = recBuff.getLong();
- long entryId = recBuff.getLong();
- int entrySize = recBuff.limit();
+ private void formatEntry(long pos, ByteBuf recBuff, boolean printMsg) {
+ int entrySize = recBuff.readableBytes();
+ long ledgerId = recBuff.readLong();
+ long entryId = recBuff.readLong();
System.out.println("--------- Lid=" + ledgerId + ", Eid=" + entryId
+ ", ByteOffset=" + pos + ", EntrySize=" + entrySize
+ " ---------");
if (entryId == Bookie.METAENTRY_ID_LEDGER_KEY) {
- int masterKeyLen = recBuff.getInt();
+ int masterKeyLen = recBuff.readInt();
byte[] masterKey = new byte[masterKeyLen];
- recBuff.get(masterKey);
+ recBuff.readBytes(masterKey);
System.out.println("Type: META");
System.out.println("MasterKey: " + bytes2Hex(masterKey));
System.out.println();
@@ -2631,7 +2636,7 @@ public class BookieShell implements Tool {
return;
}
// process a data entry
- long lastAddConfirmed = recBuff.getLong();
+ long lastAddConfirmed = recBuff.readLong();
System.out.println("Type: DATA");
System.out.println("LastConfirmed: " + lastAddConfirmed);
if (!printMsg) {
@@ -2639,12 +2644,12 @@ public class BookieShell implements Tool {
return;
}
// skip digest checking
- recBuff.position(32 + 8);
+ recBuff.skipBytes(8);
System.out.println("Data:");
System.out.println();
try {
- byte[] ret = new byte[recBuff.remaining()];
- recBuff.get(ret);
+ byte[] ret = new byte[recBuff.readableBytes()];
+ recBuff.readBytes(ret);
formatter.formatEntry(ret);
} catch (Exception e) {
System.out.println("N/A. Corrupted.");
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 10affae..0d21d41 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -21,22 +21,25 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.bookkeeper.util.ZeroBuffer;
/**
* Provides a buffering layer in front of a FileChannel.
*/
-public class BufferedChannel extends BufferedReadChannel {
+public class BufferedChannel extends BufferedReadChannel implements Closeable {
// The capacity of the write buffer.
protected final int writeCapacity;
// The position of the file channel's write pointer.
protected AtomicLong writeBufferStartPosition = new AtomicLong(0);
// The buffer used to write operations.
- protected final ByteBuffer writeBuffer;
+ protected final ByteBuf writeBuffer;
// The absolute position of the next write operation.
protected volatile long position;
@@ -48,12 +51,15 @@ public class BufferedChannel extends BufferedReadChannel {
public BufferedChannel(FileChannel fc, int writeCapacity, int
readCapacity) throws IOException {
super(fc, readCapacity);
- // Set the read buffer's limit to readCapacity.
- this.readBuffer.limit(readCapacity);
this.writeCapacity = writeCapacity;
this.position = fc.position();
this.writeBufferStartPosition.set(position);
- this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
+ this.writeBuffer =
ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+ }
+
+ @Override
+ public void close() throws IOException {
+ writeBuffer.release();
}
/**
@@ -64,19 +70,16 @@ public class BufferedChannel extends BufferedReadChannel {
* @param src The source ByteBuffer which contains the data to be written.
* @throws IOException if a write operation fails.
*/
- public synchronized void write(ByteBuffer src) throws IOException {
+ public synchronized void write(ByteBuf src) throws IOException {
int copied = 0;
- while (src.remaining() > 0) {
- int truncated = 0;
- if (writeBuffer.remaining() < src.remaining()) {
- truncated = src.remaining() - writeBuffer.remaining();
- src.limit(src.limit() - truncated);
- }
- copied += src.remaining();
- writeBuffer.put(src);
- src.limit(src.limit() + truncated);
+ int len = src.readableBytes();
+ while (copied < len) {
+ int bytesToCopy = Math.min(src.readableBytes() - copied,
writeBuffer.writableBytes());
+ writeBuffer.writeBytes(src, src.readerIndex() + copied,
bytesToCopy);
+ copied += bytesToCopy;
+
// if we have run out of buffer space, we should flush to the file
- if (writeBuffer.remaining() == 0) {
+ if (!writeBuffer.isWritable()) {
flushInternal();
}
}
@@ -121,10 +124,10 @@ public class BufferedChannel extends BufferedReadChannel {
* @throws IOException if the write fails.
*/
private void flushInternal() throws IOException {
- writeBuffer.flip();
+ ByteBuffer toWrite = writeBuffer.internalNioBuffer(0,
writeBuffer.writerIndex());
do {
- fileChannel.write(writeBuffer);
- } while (writeBuffer.hasRemaining());
+ fileChannel.write(toWrite);
+ } while (toWrite.hasRemaining());
writeBuffer.clear();
writeBufferStartPosition.set(fileChannel.position());
}
@@ -140,57 +143,41 @@ public class BufferedChannel extends BufferedReadChannel {
}
@Override
- public synchronized int read(ByteBuffer dest, long pos) throws IOException
{
+ public synchronized int read(ByteBuf dest, long pos, int length) throws
IOException {
long prevPos = pos;
- while (dest.remaining() > 0) {
+ while (length > 0) {
// check if it is in the write buffer
if (writeBuffer != null && writeBufferStartPosition.get() <= pos) {
- long positionInBuffer = pos - writeBufferStartPosition.get();
- long bytesToCopy = writeBuffer.position() - positionInBuffer;
- if (bytesToCopy > dest.remaining()) {
- bytesToCopy = dest.remaining();
- }
+ int positionInBuffer = (int) (pos -
writeBufferStartPosition.get());
+ int bytesToCopy = Math.min(writeBuffer.writerIndex() -
positionInBuffer, dest.writableBytes());
+
if (bytesToCopy == 0) {
throw new IOException("Read past EOF");
}
- ByteBuffer src = writeBuffer.duplicate();
- src.position((int) positionInBuffer);
- src.limit((int) (positionInBuffer + bytesToCopy));
- dest.put(src);
+
+ dest.writeBytes(writeBuffer, positionInBuffer, bytesToCopy);
pos += bytesToCopy;
+ length -= bytesToCopy;
} else if (writeBuffer == null && writeBufferStartPosition.get()
<= pos) {
// here we reach the end
break;
// first check if there is anything we can grab from the
readBuffer
- } else if (readBufferStartPosition <= pos && pos <
readBufferStartPosition + readBuffer.capacity()) {
- long positionInBuffer = pos - readBufferStartPosition;
- long bytesToCopy = readBuffer.capacity() - positionInBuffer;
- if (bytesToCopy > dest.remaining()) {
- bytesToCopy = dest.remaining();
- }
- ByteBuffer src = readBuffer.duplicate();
- src.position((int) positionInBuffer);
- src.limit((int) (positionInBuffer + bytesToCopy));
- dest.put(src);
+ } else if (readBufferStartPosition <= pos && pos <
readBufferStartPosition + readBuffer.writerIndex()) {
+ int positionInBuffer = (int) (pos - readBufferStartPosition);
+ int bytesToCopy = Math.min(readBuffer.writerIndex() -
positionInBuffer, dest.writableBytes());
+ dest.writeBytes(readBuffer, positionInBuffer, bytesToCopy);
pos += bytesToCopy;
+ length -= bytesToCopy;
// let's read it
} else {
readBufferStartPosition = pos;
- readBuffer.clear();
- // make sure that we don't overlap with the write buffer
- if (readBufferStartPosition + readBuffer.capacity() >=
writeBufferStartPosition.get()) {
- readBufferStartPosition = writeBufferStartPosition.get() -
readBuffer.capacity();
- if (readBufferStartPosition < 0) {
- ZeroBuffer.put(readBuffer, (int)
-readBufferStartPosition);
- }
- }
- while (readBuffer.remaining() > 0) {
- if (fileChannel.read(readBuffer, readBufferStartPosition +
readBuffer.position()) <= 0) {
- throw new IOException("Short read");
- }
+
+ int readBytes =
fileChannel.read(readBuffer.internalNioBuffer(0, readCapacity),
+ readBufferStartPosition);
+ if (readBytes <= 0) {
+ throw new IOException("Reading from filechannel returned a
non-positive value. Short read.");
}
- ZeroBuffer.put(readBuffer);
- readBuffer.clear();
+ readBuffer.writerIndex(readBytes);
}
}
return (int) (pos - prevPos);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
index ef4d6f3..87e1d43 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
/**
- * A {@code BufferedChannelBase} adds functionlity to an existing file
channel, the ability
+ * A {@code BufferedChannelBase} adds functionality to an existing file
channel, the ability
* to buffer the input and output data. This class is a base class for
wrapping the {@link FileChannel}.
*/
public abstract class BufferedChannelBase {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
index 64557d1..96dea6f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
@@ -21,19 +21,24 @@
package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* A Buffered channel without a write buffer. Only reads are buffered.
*/
-public class BufferedReadChannel extends BufferedChannelBase {
+public class BufferedReadChannel extends BufferedChannelBase {
// The capacity of the read buffer.
protected final int readCapacity;
+
// The buffer for read operations.
- protected ByteBuffer readBuffer;
+ protected final ByteBuf readBuffer;
+
// The starting position of the data currently in the read buffer.
protected long readBufferStartPosition = Long.MIN_VALUE;
@@ -43,8 +48,7 @@ public class BufferedReadChannel extends BufferedChannelBase {
public BufferedReadChannel(FileChannel fileChannel, int readCapacity)
throws IOException {
super(fileChannel);
this.readCapacity = readCapacity;
- this.readBuffer = ByteBuffer.allocateDirect(readCapacity);
- this.readBuffer.limit(0);
+ this.readBuffer = Unpooled.buffer(readCapacity);
}
/**
@@ -57,7 +61,11 @@ public class BufferedReadChannel extends BufferedChannelBase
{
* -1 if the given position is greater than or equal to the file's
current size.
* @throws IOException if I/O error occurs
*/
- public synchronized int read(ByteBuffer dest, long pos) throws IOException
{
+ public int read(ByteBuf dest, long pos) throws IOException {
+ return read(dest, pos, dest.writableBytes());
+ }
+
+ public synchronized int read(ByteBuf dest, long pos, int length) throws
IOException {
invocationCount++;
long currentPosition = pos;
long eof = validateAndGetFileChannel().size();
@@ -65,30 +73,28 @@ public class BufferedReadChannel extends
BufferedChannelBase {
if (pos >= eof) {
return -1;
}
- while (dest.remaining() > 0) {
+ while (length > 0) {
// Check if the data is in the buffer, if so, copy it.
if (readBufferStartPosition <= currentPosition
- && currentPosition < readBufferStartPosition +
readBuffer.limit()) {
- long posInBuffer = currentPosition - readBufferStartPosition;
- long bytesToCopy = Math.min(dest.remaining(),
readBuffer.limit() - posInBuffer);
- ByteBuffer rbDup = readBuffer.duplicate();
- rbDup.position((int) posInBuffer);
- rbDup.limit((int) (posInBuffer + bytesToCopy));
- dest.put(rbDup);
+ && currentPosition < readBufferStartPosition +
readBuffer.readableBytes()) {
+ int posInBuffer = (int) (currentPosition -
readBufferStartPosition);
+ int bytesToCopy = Math.min(length, readBuffer.readableBytes()
- posInBuffer);
+ dest.writeBytes(readBuffer, posInBuffer, bytesToCopy);
currentPosition += bytesToCopy;
+ length -= bytesToCopy;
cacheHitCount++;
} else if (currentPosition >= eof) {
// here we reached eof.
break;
} else {
// We don't have it in the buffer, so put necessary data in
the buffer
- readBuffer.clear();
readBufferStartPosition = currentPosition;
int readBytes = 0;
- if ((readBytes = validateAndGetFileChannel().read(readBuffer,
currentPosition)) <= 0) {
+ if ((readBytes =
validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity),
+ currentPosition)) <= 0) {
throw new IOException("Reading from filechannel returned a
non-positive value. Short read.");
}
- readBuffer.limit(readBytes);
+ readBuffer.writerIndex(readBytes);
}
}
return (int) (currentPosition - pos);
@@ -96,7 +102,6 @@ public class BufferedReadChannel extends BufferedChannelBase
{
public synchronized void clear() {
readBuffer.clear();
- readBuffer.limit(0);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
index c31b989..88e22bf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
@@ -21,8 +21,9 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -85,15 +86,13 @@ public class EntryLogCompactor extends AbstractLogCompactor
{
}
@Override
- public void process(final long ledgerId, long offset,
ByteBuffer entry) throws IOException {
- throttler.acquire(entry.remaining());
+ public void process(final long ledgerId, long offset, ByteBuf
entry) throws IOException {
+ throttler.acquire(entry.readableBytes());
if (offsets.size() > maxOutstandingRequests) {
flush();
}
- entry.getLong(); // discard ledger id, we already have it
- long entryId = entry.getLong();
- entry.rewind();
+ long entryId = entry.getLong(entry.readerIndex() + 8);
long newoffset = entryLogger.addEntry(ledgerId, entry);
offsets.add(new EntryLocation(ledgerId, entryId,
newoffset));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index c00c84c..d229151 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -28,9 +28,10 @@ import static
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
import com.google.common.collect.MapMaker;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.FastThreadLocal;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@@ -60,7 +61,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -164,7 +164,7 @@ public class EntryLogger {
* </pre>
*/
static final int LOGFILE_HEADER_SIZE = 1024;
- final ByteBuffer logfileHeader = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
+ final ByteBuf logfileHeader = Unpooled.buffer(LOGFILE_HEADER_SIZE);
static final int HEADER_VERSION_POSITION = 4;
static final int LEDGERS_MAP_OFFSET_POSITION = HEADER_VERSION_POSITION + 4;
@@ -222,10 +222,10 @@ public class EntryLogger {
* @param offset
* File offset of this entry.
* @param entry
- * Entry ByteBuffer
+ * Entry ByteBuf
* @throws IOException
*/
- void process(long ledgerId, long offset, ByteBuffer entry) throws
IOException;
+ void process(long ledgerId, long offset, ByteBuf entry) throws
IOException;
}
/**
@@ -265,8 +265,9 @@ public class EntryLogger {
// within the same JVM. All of these Bookie instances access this
header
// so there can be race conditions when entry logs are rolled over and
// this header buffer is cleared before writing it into the new
logChannel.
- logfileHeader.put("BKLO".getBytes(UTF_8));
- logfileHeader.putInt(HEADER_CURRENT_VERSION);
+ logfileHeader.writeBytes("BKLO".getBytes(UTF_8));
+ logfileHeader.writeInt(HEADER_CURRENT_VERSION);
+ logfileHeader.writerIndex(LOGFILE_HEADER_SIZE);
// Find the largest logId
long logId = INVALID_LID;
@@ -307,13 +308,13 @@ public class EntryLogger {
* @param pos The starting position from where we want to read.
* @return
*/
- private int readFromLogChannel(long entryLogId, BufferedReadChannel
channel, ByteBuffer buff, long pos)
+ private int readFromLogChannel(long entryLogId, BufferedReadChannel
channel, ByteBuf buff, long pos)
throws IOException {
BufferedLogChannel bc = logChannel;
if (null != bc) {
if (entryLogId == bc.getLogId()) {
synchronized (bc) {
- if (pos + buff.remaining() >= bc.getFileChannelPosition())
{
+ if (pos + buff.writableBytes() >=
bc.getFileChannelPosition()) {
return bc.read(buff, pos);
}
}
@@ -513,9 +514,9 @@ public class EntryLogger {
int numberOfLedgers = (int) ledgersMap.size();
// Write the ledgers map into several batches
- final AtomicLong currentOffset = new AtomicLong(ledgerMapOffset);
+
final int maxMapSize = LEDGERS_MAP_HEADER_SIZE +
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
- final ByteBuffer serializedMap = ByteBuffer.allocate(maxMapSize);
+ final ByteBuf serializedMap =
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
try {
ledgersMap.forEach(new BiConsumerLong() {
@@ -530,25 +531,23 @@ public class EntryLogger {
int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE +
LEDGERS_MAP_ENTRY_SIZE * batchSize;
serializedMap.clear();
- serializedMap.putInt(ledgerMapSize - 4);
- serializedMap.putLong(INVALID_LID);
- serializedMap.putLong(LEDGERS_MAP_ENTRY_ID);
- serializedMap.putInt(batchSize);
+ serializedMap.writeInt(ledgerMapSize - 4);
+ serializedMap.writeLong(INVALID_LID);
+ serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+ serializedMap.writeInt(batchSize);
startNewBatch = false;
remainingInBatch = batchSize;
}
// Dump the ledger in the current batch
- serializedMap.putLong(ledgerId);
- serializedMap.putLong(size);
+ serializedMap.writeLong(ledgerId);
+ serializedMap.writeLong(size);
--remainingLedgers;
if (--remainingInBatch == 0) {
// Close current batch
- serializedMap.flip();
try {
- int written =
entryLogChannel.fileChannel.write(serializedMap, currentOffset.get());
- currentOffset.addAndGet(written);
+ entryLogChannel.write(serializedMap);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -563,6 +562,8 @@ public class EntryLogger {
} else {
throw e;
}
+ } finally {
+ serializedMap.release();
}
// Update the headers with the map offset and count of ledgers
@@ -662,7 +663,8 @@ public class EntryLogger {
FileChannel channel = new RandomAccessFile(newLogFile,
"rw").getChannel();
BufferedLogChannel logChannel = new BufferedLogChannel(channel,
conf.getWriteBufferBytes(), conf.getReadBufferBytes(),
preallocatedLogId, newLogFile);
- logChannel.write((ByteBuffer) logfileHeader.clear());
+ logfileHeader.readerIndex(0);
+ logChannel.write(logfileHeader);
for (File f : list) {
setLastLogId(f, preallocatedLogId);
@@ -851,11 +853,22 @@ public class EntryLogger {
}
long addEntry(long ledger, ByteBuffer entry) throws IOException {
+ return addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
+ }
+
+ long addEntry(long ledger, ByteBuf entry) throws IOException {
return addEntry(ledger, entry, true);
}
- synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog)
throws IOException {
- int entrySize = entry.remaining() + 4;
+ private final FastThreadLocal<ByteBuf> sizeBuffer = new
FastThreadLocal<ByteBuf>() {
+ @Override
+ protected ByteBuf initialValue() throws Exception {
+ return Unpooled.buffer(4);
+ }
+ };
+
+ synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog)
throws IOException {
+ int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to
prepend the size
boolean reachEntryLogLimit =
rollLog ? reachEntryLogLimit(entrySize) :
readEntryLogHardLimit(entrySize);
// Create new log if logSizeLimit reached or current disk is full
@@ -871,12 +884,11 @@ public class EntryLogger {
}
}
- // Get a buffer from recyclable pool to store the size
- RecyclableByteBuffer recyclableBuffer = RecyclableByteBuffer.get();
- recyclableBuffer.buffer.putInt(entry.remaining());
- recyclableBuffer.buffer.flip();
- logChannel.write(recyclableBuffer.buffer);
- recyclableBuffer.recycle();
+ // Get a buffer from thread local to store the size
+ ByteBuf sizeBuffer = this.sizeBuffer.get();
+ sizeBuffer.clear();
+ sizeBuffer.writeInt(entry.readableBytes());
+ logChannel.write(sizeBuffer);
long pos = logChannel.position();
logChannel.write(entry);
@@ -887,16 +899,18 @@ public class EntryLogger {
return (logChannel.getLogId() << 32L) | pos;
}
- long addEntryForCompaction(long ledgerId, ByteBuffer entry) throws
IOException {
+ long addEntryForCompaction(long ledgerId, ByteBuf entry) throws
IOException {
synchronized (compactionLogLock) {
- int entrySize = entry.remaining() + 4;
+ int entrySize = entry.readableBytes() + 4;
if (compactionLogChannel == null) {
createNewCompactionLog();
}
- ByteBuffer buff = ByteBuffer.allocate(4);
- buff.putInt(entry.remaining());
- buff.flip();
- compactionLogChannel.write(buff);
+
+ ByteBuf sizeBuffer = this.sizeBuffer.get();
+ sizeBuffer.clear();
+ sizeBuffer.writeInt(entry.readableBytes());
+ compactionLogChannel.write(sizeBuffer);
+
long pos = compactionLogChannel.position();
compactionLogChannel.write(entry);
compactionLogChannel.registerWrittenEntry(ledgerId, entrySize);
@@ -950,32 +964,6 @@ public class EntryLogger {
}
- private static final class RecyclableByteBuffer {
- private static final Recycler<RecyclableByteBuffer> RECYCLER = new
Recycler<RecyclableByteBuffer>() {
- @Override
- protected RecyclableByteBuffer
newObject(Handle<RecyclableByteBuffer> handle) {
- return new RecyclableByteBuffer(handle);
- }
- };
-
- private final ByteBuffer buffer;
- private final Handle<RecyclableByteBuffer> handle;
- public RecyclableByteBuffer(Handle<RecyclableByteBuffer> handle) {
- this.buffer = ByteBuffer.allocate(4);
- this.handle = handle;
- }
-
- public static RecyclableByteBuffer get() {
- return RECYCLER.get();
- }
-
- public void recycle() {
- buffer.rewind();
- handle.recycle(this);
- }
- }
-
-
private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws
IOException {
if (!doRegularFlushes) {
return;
@@ -1001,7 +989,8 @@ public class EntryLogger {
ByteBuf readEntry(long ledgerId, long entryId, long location) throws
IOException, Bookie.NoEntryException {
long entryLogId = logIdForOffset(location);
long pos = location & 0xffffffffL;
- RecyclableByteBuffer sizeBuff = RecyclableByteBuffer.get();
+ ByteBuf sizeBuff = sizeBuffer.get();
+ sizeBuff.clear();
pos -= 4; // we want to get the ledgerId and length to check
BufferedReadChannel fc;
try {
@@ -1013,14 +1002,12 @@ public class EntryLogger {
throw newe;
}
- if (readFromLogChannel(entryLogId, fc, sizeBuff.buffer, pos) !=
sizeBuff.buffer.capacity()) {
+ if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) !=
sizeBuff.capacity()) {
throw new Bookie.NoEntryException("Short read from entrylog " +
entryLogId,
ledgerId, entryId);
}
pos += 4;
- sizeBuff.buffer.flip();
- int entrySize = sizeBuff.buffer.getInt();
- sizeBuff.recycle();
+ int entrySize = sizeBuff.readInt();
// entrySize does not include the ledgerId
if (entrySize > maxSaneEntrySize) {
@@ -1033,7 +1020,7 @@ public class EntryLogger {
}
ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entrySize,
entrySize);
- int rc = readFromLogChannel(entryLogId, fc, data.nioBuffer(0,
entrySize), pos);
+ int rc = readFromLogChannel(entryLogId, fc, data, pos);
if (rc != entrySize) {
// Note that throwing NoEntryException here instead of IOException
is not
// without risk. If all bookies in a quorum throw this same
exception
@@ -1042,6 +1029,7 @@ public class EntryLogger {
// could have occurred, where the length of the entry was
corrupted on all
// replicas. However, the chance of this happening is very very
low, so
// returning NoEntryException is mostly safe.
+ data.release();
throw new Bookie.NoEntryException("Short read for " + ledgerId +
"@"
+ entryId + " in " + entryLogId
+ "@"
+ pos + "(" + rc + "!=" +
entrySize + ")", ledgerId, entryId);
@@ -1049,11 +1037,13 @@ public class EntryLogger {
data.writerIndex(entrySize);
long thisLedgerId = data.getLong(0);
if (thisLedgerId != ledgerId) {
+ data.release();
throw new IOException("problem found in " + entryLogId + "@" +
entryId + " at position + " + pos
+ " entry belongs to " + thisLedgerId + " not " +
ledgerId);
}
long thisEntryId = data.getLong(8);
if (thisEntryId != entryId) {
+ data.release();
throw new IOException("problem found in " + entryLogId + "@" +
entryId + " at position + " + pos
+ " entry is " + thisEntryId + " not " + entryId);
}
@@ -1068,21 +1058,24 @@ public class EntryLogger {
BufferedReadChannel bc = getChannelForLogId(entryLogId);
// Allocate buffer to read (version, ledgersMapOffset, ledgerCount)
- ByteBuffer headers = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
- bc.read(headers, 0);
- headers.flip();
+ ByteBuf headers =
PooledByteBufAllocator.DEFAULT.directBuffer(LOGFILE_HEADER_SIZE);
+ try {
+ bc.read(headers, 0);
- // Skip marker string "BKLO"
- headers.getInt();
+ // Skip marker string "BKLO"
+ headers.readInt();
- int headerVersion = headers.getInt();
- if (headerVersion < HEADER_V0 || headerVersion >
HEADER_CURRENT_VERSION) {
- LOG.info("Unknown entry log header version for log {}: {}",
entryLogId, headerVersion);
- }
+ int headerVersion = headers.readInt();
+ if (headerVersion < HEADER_V0 || headerVersion >
HEADER_CURRENT_VERSION) {
+ LOG.info("Unknown entry log header version for log {}: {}",
entryLogId, headerVersion);
+ }
- long ledgersMapOffset = headers.getLong();
- int ledgersCount = headers.getInt();
- return new Header(headerVersion, ledgersMapOffset, ledgersCount);
+ long ledgersMapOffset = headers.readLong();
+ int ledgersCount = headers.readInt();
+ return new Header(headerVersion, ledgersMapOffset, ledgersCount);
+ } finally {
+ headers.release();
+ }
}
private BufferedReadChannel getChannelForLogId(long entryLogId) throws
IOException {
@@ -1137,8 +1130,8 @@ public class EntryLogger {
* @throws IOException
*/
protected void scanEntryLog(long entryLogId, EntryLogScanner scanner)
throws IOException {
- ByteBuffer sizeBuff = ByteBuffer.allocate(4);
- ByteBuffer lidBuff = ByteBuffer.allocate(8);
+ // Buffer where to read the entrySize (4 bytes) and the ledgerId (8
bytes)
+ ByteBuf headerBuffer = Unpooled.buffer(4 + 8);
BufferedReadChannel bc;
// Get the BufferedChannel for the current entry log file
try {
@@ -1151,49 +1144,50 @@ public class EntryLogger {
// the header where all of the ledger entries are.
long pos = LOGFILE_HEADER_SIZE;
- // Read through the entry log file and extract the ledger ID's.
- while (true) {
- // Check if we've finished reading the entry log file.
- if (pos >= bc.size()) {
- break;
- }
- if (readFromLogChannel(entryLogId, bc, sizeBuff, pos) !=
sizeBuff.capacity()) {
- LOG.warn("Short read for entry size from entrylog {}",
entryLogId);
- return;
- }
- long offset = pos;
- pos += 4;
- sizeBuff.flip();
- int entrySize = sizeBuff.getInt();
-
- sizeBuff.clear();
- // try to read ledger id first
- if (readFromLogChannel(entryLogId, bc, lidBuff, pos) !=
lidBuff.capacity()) {
- LOG.warn("Short read for ledger id from entrylog {}",
entryLogId);
- return;
- }
- lidBuff.flip();
- long lid = lidBuff.getLong();
- lidBuff.clear();
- if (lid == INVALID_LID || !scanner.accept(lid)) {
- // skip this entry
+ // Start with a reasonably sized buffer size
+ ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(1024 *
1024);
+
+ try {
+
+ // Read through the entry log file and extract the ledger ID's.
+ while (true) {
+ // Check if we've finished reading the entry log file.
+ if (pos >= bc.size()) {
+ break;
+ }
+ if (readFromLogChannel(entryLogId, bc, headerBuffer, pos) !=
headerBuffer.capacity()) {
+ LOG.warn("Short read for entry size from entrylog {}",
entryLogId);
+ return;
+ }
+ long offset = pos;
+ pos += 4;
+ int entrySize = headerBuffer.readInt();
+ long ledgerId = headerBuffer.readLong();
+ headerBuffer.clear();
+
+ if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) {
+ // skip this entry
+ pos += entrySize;
+ continue;
+ }
+ // read the entry
+
+ data.clear();
+ data.capacity(entrySize);
+ int rc = readFromLogChannel(entryLogId, bc, data, pos);
+ if (rc != entrySize) {
+ LOG.warn("Short read for ledger entry from entryLog {}@{}
({} != {})",
+ new Object[] { entryLogId, pos, rc, entrySize });
+ return;
+ }
+ // process the entry
+ scanner.process(ledgerId, offset, data);
+
+ // Advance position to the next entry
pos += entrySize;
- continue;
- }
- // read the entry
- byte data[] = new byte[entrySize];
- ByteBuffer buff = ByteBuffer.wrap(data);
- int rc = readFromLogChannel(entryLogId, bc, buff, pos);
- if (rc != data.length) {
- LOG.warn("Short read for ledger entry from entryLog {}@{} ({}
!= {})", new Object[] { entryLogId, pos,
- rc, data.length });
- return;
}
- buff.flip();
- // process the entry
- scanner.process(lid, offset, buff);
- // Advance position to the next entry
- pos += entrySize;
+ } finally {
+ data.release();
}
}
@@ -1232,53 +1226,55 @@ public class EntryLogger {
long offset = header.ledgersMapOffset;
EntryLogMetadata meta = new EntryLogMetadata(entryLogId);
- while (offset < bc.size()) {
- // Read ledgers map size
- ByteBuffer sizeBuf = ByteBuffer.allocate(4);
- bc.read(sizeBuf, offset);
- sizeBuf.flip();
+ final int maxMapSize = LEDGERS_MAP_HEADER_SIZE +
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+ ByteBuf ledgersMap = ByteBufAllocator.DEFAULT.directBuffer(maxMapSize);
- int ledgersMapSize = sizeBuf.getInt();
+ try {
+ while (offset < bc.size()) {
+ // Read ledgers map size
+ sizeBuffer.get().clear();
+ bc.read(sizeBuffer.get(), offset);
- // Read the index into a buffer
- ByteBuffer ledgersMapBuffer = ByteBuffer.allocate(ledgersMapSize);
- bc.read(ledgersMapBuffer, offset + 4);
- ledgersMapBuffer.flip();
+ int ledgersMapSize = sizeBuffer.get().readInt();
- // Discard ledgerId and entryId
- long lid = ledgersMapBuffer.getLong();
- if (lid != INVALID_LID) {
- throw new IOException("Cannot deserialize ledgers map from
ledger " + lid + " -- entryLogId: "
- + entryLogId);
- }
+ // Read the index into a buffer
+ ledgersMap.clear();
+ bc.read(ledgersMap, offset + 4, ledgersMapSize);
- long entryId = ledgersMapBuffer.getLong();
- if (entryId != LEDGERS_MAP_ENTRY_ID) {
- throw new IOException("Cannot deserialize ledgers map from
ledger " + lid + ":" + entryId
- + " -- entryLogId: " + entryLogId);
- }
+ // Discard ledgerId and entryId
+ long lid = ledgersMap.readLong();
+ if (lid != INVALID_LID) {
+ throw new IOException("Cannot deserialize ledgers map from
ledger " + lid);
+ }
+
+ long entryId = ledgersMap.readLong();
+ if (entryId != LEDGERS_MAP_ENTRY_ID) {
+ throw new IOException("Cannot deserialize ledgers map from
entryId " + entryId);
+ }
- // Read the number of ledgers in the current entry batch
- int ledgersCount = ledgersMapBuffer.getInt();
+ // Read the number of ledgers in the current entry batch
+ int ledgersCount = ledgersMap.readInt();
- // Extract all (ledger,size) tuples from buffer
- for (int i = 0; i < ledgersCount; i++) {
- long ledgerId = ledgersMapBuffer.getLong();
- long size = ledgersMapBuffer.getLong();
+ // Extract all (ledger,size) tuples from buffer
+ for (int i = 0; i < ledgersCount; i++) {
+ long ledgerId = ledgersMap.readLong();
+ long size = ledgersMap.readLong();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recovering ledgers maps for log {} -- Found
ledger: {} with size: {}",
- new Object[] { entryLogId, ledgerId, size });
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovering ledgers maps for log {} -- Found
ledger: {} with size: {}",
+ new Object[] { entryLogId, ledgerId, size });
+ }
+ meta.addLedgerSize(ledgerId, size);
+ }
+ if (ledgersMap.isReadable()) {
+ throw new IOException("Invalid entry size when reading
ledgers map");
}
- meta.addLedgerSize(ledgerId, size);
- }
- if (ledgersMapBuffer.hasRemaining()) {
- throw new IOException("Invalid entry size when reading ledgers
map on entryLogId: " + entryLogId);
+ // Move to next entry, if any
+ offset += ledgersMapSize + 4;
}
-
- // Move to next entry, if any
- offset += ledgersMapSize + 4;
+ } finally {
+ ledgersMap.release();
}
if (meta.getLedgersMap().size() != header.ledgersCount) {
@@ -1295,9 +1291,9 @@ public class EntryLogger {
// Read through the entry log file and extract the entry log meta
scanEntryLog(entryLogId, new EntryLogScanner() {
@Override
- public void process(long ledgerId, long offset, ByteBuffer entry)
throws IOException {
+ public void process(long ledgerId, long offset, ByteBuf entry)
throws IOException {
// add new entry size of a ledger to entry log meta
- meta.addLedgerSize(ledgerId, entry.limit() + 4);
+ meta.addLedgerSize(ledgerId, entry.readableBytes() + 4);
}
@Override
@@ -1351,6 +1347,7 @@ public class EntryLogger {
if (null == channel) {
return;
}
+
FileChannel fileChannel = channel.getFileChannel();
if (null != fileChannel) {
fileChannel.close();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index c8f9483..50e93a3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -248,7 +248,7 @@ public class EntryMemTable {
ledger = kv.getLedgerId();
if (ledgerGC != ledger) {
try {
- flusher.process(ledger, kv.getEntryId(),
kv.getValueAsByteBuffer().nioBuffer());
+ flusher.process(ledger, kv.getEntryId(),
kv.getValueAsByteBuffer());
} catch (NoLedgerException exception) {
ledgerGC = ledger;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6db3e6d..2b357ee 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -26,16 +26,18 @@ import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENT
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
import com.google.common.collect.Lists;
+
import io.netty.buffer.ByteBuf;
+
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -263,7 +265,7 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
long entryId = entry.getLong(entry.readerIndex() + 8);
long lac = entry.getLong(entry.readerIndex() + 16);
- processEntry(ledgerId, entryId, entry.nioBuffer());
+ processEntry(ledgerId, entryId, entry);
ledgerCache.updateLastAddConfirmed(ledgerId, lac);
return entryId;
@@ -409,11 +411,11 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
ledgerDeletionListeners.add(listener);
}
- protected void processEntry(long ledgerId, long entryId, ByteBuffer entry)
throws IOException {
+ protected void processEntry(long ledgerId, long entryId, ByteBuf entry)
throws IOException {
processEntry(ledgerId, entryId, entry, true);
}
- protected synchronized void processEntry(long ledgerId, long entryId,
ByteBuffer entry, boolean rollLog)
+ protected synchronized void processEntry(long ledgerId, long entryId,
ByteBuf entry, boolean rollLog)
throws IOException {
/*
* Touch dirty flag
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index d3c2144..faac1b4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -22,6 +22,7 @@
package org.apache.bookkeeper.bookie;
import com.google.common.base.Stopwatch;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
@@ -39,6 +40,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -49,7 +51,6 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DaemonThreadFactory;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ZeroBuffer;
import org.apache.bookkeeper.util.collections.GrowableArrayBlockingQueue;
import org.apache.bookkeeper.util.collections.RecyclableArrayList;
import org.slf4j.Logger;
@@ -291,7 +292,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
static QueueEntry create(ByteBuf entry, long ledgerId, long entryId,
WriteCallback cb, Object ctx,
long enqueueTime, OpStatsLogger journalAddEntryStats) {
QueueEntry qe = RECYCLER.get();
- qe.entry = entry.duplicate();
+ qe.entry = entry;
qe.cb = cb;
qe.ctx = ctx;
qe.ledgerId = ledgerId;
@@ -508,7 +509,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
static final int PADDING_MASK = -0x100;
- static void writePaddingBytes(JournalChannel jc, ByteBuffer paddingBuffer,
int journalAlignSize)
+ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer,
int journalAlignSize)
throws IOException {
int bytesToAlign = (int) (jc.bc.position() % journalAlignSize);
if (0 != bytesToAlign) {
@@ -520,14 +521,13 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
paddingBuffer.clear();
// padding mask
- paddingBuffer.putInt(PADDING_MASK);
+ paddingBuffer.writeInt(PADDING_MASK);
// padding len
- paddingBuffer.putInt(paddingBytes);
+ paddingBuffer.writeInt(paddingBytes);
// padding bytes
- paddingBuffer.position(8 + paddingBytes);
+ paddingBuffer.writerIndex(paddingBuffer.writerIndex() +
paddingBytes);
- paddingBuffer.flip();
- jc.preAllocIfNeeded(paddingBuffer.limit());
+ jc.preAllocIfNeeded(paddingBuffer.readableBytes());
// write padding bytes
jc.bc.write(paddingBuffer);
}
@@ -860,9 +860,9 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
LOG.info("Starting journal on {}", journalDirectory);
RecyclableArrayList<QueueEntry> toFlush =
entryListRecycler.newInstance();
- ByteBuffer lenBuff = ByteBuffer.allocate(4);
- ByteBuffer paddingBuff = ByteBuffer.allocate(2 *
conf.getJournalAlignmentSize());
- ZeroBuffer.put(paddingBuff);
+ ByteBuf lenBuff = Unpooled.buffer(4);
+ ByteBuf paddingBuff = Unpooled.buffer(2 *
conf.getJournalAlignmentSize());
+ paddingBuff.writeZero(paddingBuff.capacity());
final int journalFormatVersionToWrite =
conf.getJournalFormatVersionToWrite();
final int journalAlignmentSize = conf.getJournalAlignmentSize();
JournalChannel logFile = null;
@@ -1020,24 +1020,20 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
continue;
}
- journalWriteBytes.add(qe.entry.readableBytes());
+ int entrySize = qe.entry.readableBytes();
+ journalWriteBytes.add(entrySize);
journalQueueSize.dec();
- batchSize += (4 + qe.entry.readableBytes());
+ batchSize += (4 + entrySize);
lenBuff.clear();
- lenBuff.putInt(qe.entry.readableBytes());
- lenBuff.flip();
+ lenBuff.writeInt(entrySize);
// preAlloc based on size
- logFile.preAllocIfNeeded(4 + qe.entry.readableBytes());
+ logFile.preAllocIfNeeded(4 + entrySize);
- //
- // we should be doing the following, but then we run out of
- // direct byte buffers
- // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
bc.write(lenBuff);
- bc.write(qe.entry.nioBuffer());
+ bc.write(qe.entry);
qe.entry.release();
toFlush.add(qe);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
index 96fbbe1..abfb264 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
@@ -21,8 +21,9 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
/**
* Flush entries from skip list.
@@ -36,5 +37,5 @@ public interface SkipListFlusher {
* @param entry Entry ByteBuffer
* @throws IOException
*/
- void process(long ledgerId, long entryId, ByteBuffer entry) throws
IOException;
+ void process(long ledgerId, long entryId, ByteBuf entry) throws
IOException;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 972f085..b0c6bad 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -177,7 +176,7 @@ public class SortedLedgerStorage extends
InterleavedLedgerStorage
@Override
public void process(long ledgerId, long entryId,
- ByteBuffer buffer) throws IOException {
+ ByteBuf buffer) throws IOException {
processEntry(ledgerId, entryId, buffer, false);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
index e5fef35..67c1106 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
@@ -21,9 +21,10 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -172,18 +173,17 @@ public class TransactionalEntryLogCompactor extends
AbstractLogCompactor {
}
@Override
- public void process(long ledgerId, long offset, ByteBuffer
entry) throws IOException {
- throttler.acquire(entry.remaining());
+ public void process(long ledgerId, long offset, ByteBuf entry)
throws IOException {
+ throttler.acquire(entry.readableBytes());
synchronized (TransactionalEntryLogCompactor.this) {
- long lid = entry.getLong();
- long entryId = entry.getLong();
+ long lid = entry.getLong(entry.readerIndex());
+ long entryId = entry.getLong(entry.readerIndex() + 8);
if (lid != ledgerId || entryId < -1) {
LOG.warn("Scanning expected ledgerId {}, but found
invalid entry "
+ "with ledgerId {} entryId {} at offset
{}",
new Object[]{ledgerId, lid, entryId, offset});
throw new IOException("Invalid entry found @
offset " + offset);
}
- entry.rewind();
long newOffset =
entryLogger.addEntryForCompaction(ledgerId, entry);
offsets.add(new EntryLocation(ledgerId, entryId,
newOffset));
@@ -356,16 +356,15 @@ public class TransactionalEntryLogCompactor extends
AbstractLogCompactor {
}
@Override
- public void process(long ledgerId, long offset, ByteBuffer
entry) throws IOException {
- long lid = entry.getLong();
- long entryId = entry.getLong();
+ public void process(long ledgerId, long offset, ByteBuf entry)
throws IOException {
+ long lid = entry.getLong(entry.readerIndex());
+ long entryId = entry.getLong(entry.readerIndex() + 8);
if (lid != ledgerId || entryId < -1) {
LOG.warn("Scanning expected ledgerId {}, but found
invalid entry "
+ "with ledgerId {} entryId {} at offset {}",
new Object[]{ledgerId, lid, entryId, offset});
throw new IOException("Invalid entry found @ offset "
+ offset);
}
- entry.rewind();
long location = (compactedLogId << 32L) | (offset + 4);
offsets.add(new EntryLocation(lid, entryId, location));
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
index d3ae746..e55dca2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
@@ -121,11 +121,6 @@ public final class DoubleByteBuf extends
AbstractReferenceCountedByteBuf {
}
@Override
- public int readableBytes() {
- return b1.readableBytes() + b2.readableBytes();
- }
-
- @Override
public int writableBytes() {
return 0;
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 8332b34..6f75bbe 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -20,6 +20,11 @@
*/
package org.apache.bookkeeper.bookie;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -29,8 +34,8 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
-import java.util.List;
import java.util.Arrays;
+import java.util.List;
import java.util.Random;
import org.apache.bookkeeper.client.ClientUtil;
@@ -38,14 +43,11 @@ import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.ZeroBuffer;
import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.junit.Test;
-import org.junit.After;
-
-import static org.junit.Assert.*;
public class BookieJournalTest {
private final static Logger LOG =
LoggerFactory.getLogger(BookieJournalTest.class);
@@ -105,11 +107,10 @@ public class BookieJournalTest {
/**
* Generate fence entry
*/
- private ByteBuffer generateFenceEntry(long ledgerId) {
- ByteBuffer bb = ByteBuffer.allocate(8 + 8);
- bb.putLong(ledgerId);
- bb.putLong(Bookie.METAENTRY_ID_FENCE_KEY);
- bb.flip();
+ private ByteBuf generateFenceEntry(long ledgerId) {
+ ByteBuf bb = Unpooled.buffer();
+ bb.writeLong(ledgerId);
+ bb.writeLong(Bookie.METAENTRY_ID_FENCE_KEY);
return bb;
}
@@ -117,13 +118,12 @@ public class BookieJournalTest {
* Generate meta entry with given master key
*/
private ByteBuf generateMetaEntry(long ledgerId, byte[] masterKey) {
- ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length);
- bb.putLong(ledgerId);
- bb.putLong(Bookie.METAENTRY_ID_LEDGER_KEY);
- bb.putInt(masterKey.length);
- bb.put(masterKey);
- bb.flip();
- return Unpooled.wrappedBuffer(bb);
+ ByteBuf bb = Unpooled.buffer();
+ bb.writeLong(ledgerId);
+ bb.writeLong(Bookie.METAENTRY_ID_LEDGER_KEY);
+ bb.writeInt(masterKey.length);
+ bb.writeBytes(masterKey);
+ return bb;
}
private void writeJunkJournal(File journalDir) throws Exception {
@@ -204,8 +204,8 @@ public class BookieJournalTest {
lenBuff.putInt(packet.readableBytes());
lenBuff.flip();
- bc.write(lenBuff);
- bc.write(packet.nioBuffer());
+ bc.write(Unpooled.wrappedBuffer(lenBuff));
+ bc.write(packet);
packet.release();
}
bc.flush(true);
@@ -238,8 +238,8 @@ public class BookieJournalTest {
lenBuff.putInt(packet.readableBytes());
lenBuff.flip();
- bc.write(lenBuff);
- bc.write(packet.nioBuffer());
+ bc.write(Unpooled.wrappedBuffer(lenBuff));
+ bc.write(packet);
packet.release();
}
bc.flush(true);
@@ -271,15 +271,14 @@ public class BookieJournalTest {
ByteBuffer lenBuff = ByteBuffer.allocate(4);
lenBuff.putInt(packet.readableBytes());
lenBuff.flip();
- bc.write(lenBuff);
- bc.write(packet.nioBuffer());
+ bc.write(Unpooled.wrappedBuffer(lenBuff));
+ bc.write(packet);
packet.release();
}
// write fence key
- ByteBuffer packet = generateFenceEntry(1);
- ByteBuffer lenBuf = ByteBuffer.allocate(4);
- lenBuf.putInt(packet.remaining());
- lenBuf.flip();
+ ByteBuf packet = generateFenceEntry(1);
+ ByteBuf lenBuf = Unpooled.buffer();
+ lenBuf.writeInt(packet.readableBytes());
bc.write(lenBuf);
bc.write(packet);
bc.flush(true);
@@ -293,8 +292,8 @@ public class BookieJournalTest {
BufferedChannel bc = jc.getBufferedChannel();
- ByteBuffer paddingBuff = ByteBuffer.allocateDirect(2 *
JournalChannel.SECTOR_SIZE);
- ZeroBuffer.put(paddingBuff);
+ ByteBuf paddingBuff = Unpooled.buffer();
+ paddingBuff.writeZero(2 * JournalChannel.SECTOR_SIZE);
byte[] data = new byte[4 * 1024 * 1024];
Arrays.fill(data, (byte)'X');
long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
@@ -308,19 +307,17 @@ public class BookieJournalTest {
}
lastConfirmed = i;
length += i;
- ByteBuffer lenBuff = ByteBuffer.allocate(4);
- lenBuff.putInt(packet.readableBytes());
- lenBuff.flip();
+ ByteBuf lenBuff = Unpooled.buffer();
+ lenBuff.writeInt(packet.readableBytes());
bc.write(lenBuff);
- bc.write(packet.nioBuffer());
+ bc.write(packet);
packet.release();
Journal.writePaddingBytes(jc, paddingBuff,
JournalChannel.SECTOR_SIZE);
}
// write fence key
- ByteBuffer packet = generateFenceEntry(1);
- ByteBuffer lenBuf = ByteBuffer.allocate(4);
- lenBuf.putInt(packet.remaining());
- lenBuf.flip();
+ ByteBuf packet = generateFenceEntry(1);
+ ByteBuf lenBuf = Unpooled.buffer();
+ lenBuf.writeInt(packet.readableBytes());
bc.write(lenBuf);
bc.write(packet);
Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE);
@@ -519,7 +516,7 @@ public class BookieJournalTest {
Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
JournalChannel jc =
writeV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
-
jc.getBufferedChannel().write(ByteBuffer.wrap("JunkJunkJunk".getBytes()));
+
jc.getBufferedChannel().write(Unpooled.wrappedBuffer("JunkJunkJunk".getBytes()));
jc.getBufferedChannel().flush(true);
writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 8de2bfc..144871c 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -21,12 +21,21 @@
package org.apache.bookkeeper.bookie;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -35,15 +44,8 @@ import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.SnapshotMap;
import org.apache.bookkeeper.util.IOUtils;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.bookkeeper.util.SnapshotMap;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
@@ -52,8 +54,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-
/**
* LedgerCache related test cases
*/
@@ -499,7 +499,7 @@ public class LedgerCacheTest {
}
@Override
- public void process(long ledgerId, long entryId, ByteBuffer buffer)
throws IOException {
+ public void process(long ledgerId, long entryId, ByteBuf buffer)
throws IOException {
if (injectFlushException.get()) {
throw new IOException("Injected Exception");
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index ec60799..7606c91 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -22,17 +22,20 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.List;
import java.util.HashSet;
+import java.util.List;
import java.util.Random;
+
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Test;
import org.junit.Before;
+import org.junit.Test;
public class TestEntryMemTable implements CacheCallback, SkipListFlusher,
CheckpointSource {
@@ -97,7 +100,7 @@ public class TestEntryMemTable implements CacheCallback,
SkipListFlusher, Checkp
// No-op
}
- public void process(long ledgerId, long entryId, ByteBuffer entry)
+ public void process(long ledgerId, long entryId, ByteBuf entry)
throws IOException {
// No-op
}
@@ -135,7 +138,7 @@ public class TestEntryMemTable implements CacheCallback,
SkipListFlusher, Checkp
}
@Override
- public void process(long ledgerId, long entryId, ByteBuffer entry)
throws IOException {
+ public void process(long ledgerId, long entryId, ByteBuf entry) throws
IOException {
assertTrue(ledgerId + ":" + entryId + " is duplicate in store!",
keyValues.add(new EntryKeyValue(ledgerId, entryId,
entry.array())));
}
@@ -143,7 +146,7 @@ public class TestEntryMemTable implements CacheCallback,
SkipListFlusher, Checkp
private class NoLedgerFLusher implements SkipListFlusher {
@Override
- public void process(long ledgerId, long entryId, ByteBuffer entry)
throws IOException {
+ public void process(long ledgerId, long entryId, ByteBuf entry) throws
IOException {
throw new NoLedgerException(ledgerId);
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
index fba9594..00e3d15 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import java.io.BufferedWriter;
import java.io.File;
@@ -96,8 +97,8 @@ public class UpgradeTest extends BookKeeperClusterTestCase {
lenBuff.putInt(packet.readableBytes());
lenBuff.flip();
- bc.write(lenBuff);
- bc.write(packet.nioBuffer());
+ bc.write(Unpooled.wrappedBuffer(lenBuff));
+ bc.write(packet);
packet.release();
}
bc.flush(true);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
index 824f72f..270a60f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
@@ -121,4 +122,28 @@ public class DoubleByteBufTest {
assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }),
b.nioBuffer());
}
+
+ /**
+ * Verify that readableBytes() returns writerIndex - readerIndex. In this
case writerIndex is the end of the buffer
+ * and readerIndex is increased by 64.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReadableBytes() throws Exception {
+ ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b1.writerIndex(b1.capacity());
+ ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ b2.writerIndex(b2.capacity());
+ ByteBuf buf = DoubleByteBuf.get(b1, b2);
+
+ assertEquals(buf.readerIndex(), 0);
+ assertEquals(buf.writerIndex(), 256);
+ assertEquals(buf.readableBytes(), 256);
+
+ for (int i = 0; i < 4; ++i) {
+ buf.skipBytes(64);
+ assertEquals(buf.readableBytes(), 256 - 64 * (i + 1));
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].