This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 1dc9789 Use ByteBuf for entrylogger reads
1dc9789 is described below
commit 1dc9789e89a6bf167ed965edb1ab51fbc449d897
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Oct 18 09:24:17 2017 +0200
Use ByteBuf for entrylogger reads
Original patch from merlimat in yahoo-4.3 branch
Author: Matteo Merli <[email protected]>
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Ivan Kelly
<[email protected]>
This patch had conflicts when merged, resolved by
Committer: Enrico Olivelli <[email protected]>
This closes #640 from ivankelly/refcount-for-read
---
.../org/apache/bookkeeper/bookie/EntryLogger.java | 35 ++++++++++++----------
.../bookie/InterleavedLedgerStorage.java | 4 +--
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 12 ++++----
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 6 ++--
4 files changed, 31 insertions(+), 26 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 61aa91b..717a8ae 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.bookie;
import static com.google.common.base.Charsets.UTF_8;
import static
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
@@ -854,10 +856,10 @@ public class EntryLogger {
return logChannel.position() + size > Integer.MAX_VALUE;
}
- byte[] readEntry(long ledgerId, long entryId, long location) throws
IOException, Bookie.NoEntryException {
+ ByteBuf readEntry(long ledgerId, long entryId, long location) throws
IOException, Bookie.NoEntryException {
long entryLogId = logIdForOffset(location);
long pos = location & 0xffffffffL;
- ByteBuffer sizeBuff = ByteBuffer.allocate(4);
+ RecyclableByteBuffer sizeBuff = RecyclableByteBuffer.get();
pos -= 4; // we want to get the ledgerId and length to check
BufferedReadChannel fc;
try {
@@ -868,26 +870,29 @@ public class EntryLogger {
newe.setStackTrace(e.getStackTrace());
throw newe;
}
- if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) !=
sizeBuff.capacity()) {
- throw new Bookie.NoEntryException("Short read from entrylog " +
entryLogId, ledgerId, entryId);
+
+ if (readFromLogChannel(entryLogId, fc, sizeBuff.buffer, pos) !=
sizeBuff.buffer.capacity()) {
+ throw new Bookie.NoEntryException("Short read from entrylog " +
entryLogId,
+ ledgerId, entryId);
}
pos += 4;
- sizeBuff.flip();
- int entrySize = sizeBuff.getInt();
+ sizeBuff.buffer.flip();
+ int entrySize = sizeBuff.buffer.getInt();
+ sizeBuff.recycle();
+
// entrySize does not include the ledgerId
if (entrySize > maxSaneEntrySize) {
LOG.warn("Sanity check failed for entry size of " + entrySize + "
at location " + pos + " in "
+ entryLogId);
-
}
if (entrySize < MIN_SANE_ENTRY_SIZE) {
LOG.error("Read invalid entry length {}", entrySize);
throw new IOException("Invalid entry length " + entrySize);
}
- byte data[] = new byte[entrySize];
- ByteBuffer buff = ByteBuffer.wrap(data);
- int rc = readFromLogChannel(entryLogId, fc, buff, pos);
- if (rc != data.length) {
+
+ ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entrySize,
entrySize);
+ int rc = readFromLogChannel(entryLogId, fc, data.nioBuffer(0,
entrySize), pos);
+ if (rc != entrySize) {
// Note that throwing NoEntryException here instead of IOException
is not
// without risk. If all bookies in a quorum throw this same
exception
// the client will assume that it has reached the end of the
ledger.
@@ -897,15 +902,15 @@ public class EntryLogger {
// returning NoEntryException is mostly safe.
throw new Bookie.NoEntryException("Short read for " + ledgerId +
"@"
+ entryId + " in " + entryLogId
+ "@"
- + pos + "(" + rc + "!=" +
data.length + ")", ledgerId, entryId);
+ + pos + "(" + rc + "!=" +
entrySize + ")", ledgerId, entryId);
}
- buff.flip();
- long thisLedgerId = buff.getLong();
+ data.writerIndex(entrySize);
+ long thisLedgerId = data.getLong(0);
if (thisLedgerId != ledgerId) {
throw new IOException("problem found in " + entryLogId + "@" +
entryId + " at position + " + pos
+ " entry belongs to " + thisLedgerId + " not " +
ledgerId);
}
- long thisEntryId = buff.getLong();
+ long thisEntryId = data.getLong(8);
if (thisEntryId != entryId) {
throw new IOException("problem found in " + entryLogId + "@" +
entryId + " at position + " + pos
+ " entry is " + thisEntryId + " not " + entryId);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index f773e05..19a6105 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -316,9 +316,9 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
startTimeNanos = MathUtils.nowInNano();
success = false;
try {
- byte[] retBytes = entryLogger.readEntry(ledgerId, entryId, offset);
+ ByteBuf retBytes = entryLogger.readEntry(ledgerId, entryId,
offset);
success = true;
- return Unpooled.wrappedBuffer(retBytes);
+ return retBytes;
} finally {
if (success) {
getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 95a5ed3..e939123 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -152,12 +152,12 @@ public class EntryLogTest {
for (int i=0; i<2*numLogs; i++) {
for (int j=0; j<numEntries; j++) {
String expectedValue = "ledger-" + i + "-" + j;
- byte[] value = newLogger.readEntry(i, j, positions[i][j]);
- ByteBuffer buf = ByteBuffer.wrap(value);
- long ledgerId = buf.getLong();
- long entryId = buf.getLong();
- byte[] data = new byte[buf.remaining()];
- buf.get(data);
+ ByteBuf value = newLogger.readEntry(i, j, positions[i][j]);
+ long ledgerId = value.readLong();
+ long entryId = value.readLong();
+ byte[] data = new byte[value.readableBytes()];
+ value.readBytes(data);
+ value.release();
assertEquals(i, ledgerId);
assertEquals(j, entryId);
assertEquals(expectedValue, new String(data));
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 3f5d565..5f62ecc 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -303,9 +303,9 @@ public class LedgerCacheTest {
assertFalse("After flush index file should be changed",
before.equals(after));
// Verify written entries
- Assert.assertArrayEquals(generateEntry(1, 1).array(),
ledgerStorage.getEntry(1, 1).array());
- Assert.assertArrayEquals(generateEntry(1, 2).array(),
ledgerStorage.getEntry(1, 2).array());
- Assert.assertArrayEquals(generateEntry(1, 3).array(),
ledgerStorage.getEntry(1, 3).array());
+ Assert.assertEquals(generateEntry(1, 1), ledgerStorage.getEntry(1, 1));
+ Assert.assertEquals(generateEntry(1, 2), ledgerStorage.getEntry(1, 2));
+ Assert.assertEquals(generateEntry(1, 3), ledgerStorage.getEntry(1, 3));
}
/**
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].