This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 1e63d3c Avoid throwing exception when doing
EntryLogger.internalReadEntry
1e63d3c is described below
commit 1e63d3c83e89d152cb603955e303b5377d8ab8e6
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Oct 2 01:05:42 2019 -0700
Avoid throwing exception when doing EntryLogger.internalReadEntry
### Motivation
In the refactoring part of #1819, the `internalReadEntry()` behavior was
changed into throwing an exception when reading an entry from a different
ledger.
This is causing a big performance issue when doing read-head from the
ledger storage, because we keep reading from the current entry log until we
find an entry from a different ledger.
Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli
<[email protected]>
This closes #2172 from merlimat/read-internal
---
.../org/apache/bookkeeper/bookie/EntryLogger.java | 80 ++++++++++++----------
.../ldb/SingleDirectoryDbLedgerStorage.java | 3 +-
2 files changed, 45 insertions(+), 38 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 6662d59..731275c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -756,37 +756,43 @@ public class EntryLogger {
}
}
- private static class EntryLogEntry {
- final int entrySize;
- final BufferedReadChannel fc;
-
- EntryLogEntry(int entrySize, BufferedReadChannel fc) {
- this.entrySize = entrySize;
- this.fc = fc;
- }
- }
-
- private EntryLogEntry getFCForEntryInternal(
+ private BufferedReadChannel getFCForEntryInternal(
long ledgerId, long entryId, long entryLogId, long pos)
throws EntryLookupException, IOException {
- ByteBuf sizeBuff = sizeBuffer.get();
- sizeBuff.clear();
- pos -= 4; // we want to get the entrySize as well as the ledgerId and
entryId
- BufferedReadChannel fc;
try {
- fc = getChannelForLogId(entryLogId);
+ return getChannelForLogId(entryLogId);
} catch (FileNotFoundException e) {
throw new EntryLookupException.MissingLogFileException(ledgerId,
entryId, entryLogId, pos);
}
+ }
+
+ private ByteBuf readEntrySize(long ledgerId, long entryId, long
entryLogId, long pos, BufferedReadChannel fc)
+ throws EntryLookupException, IOException {
+ ByteBuf sizeBuff = sizeBuffer.get();
+ sizeBuff.clear();
+
+ long entrySizePos = pos - 4; // we want to get the entrySize as well
as the ledgerId and entryId
try {
- if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) !=
sizeBuff.capacity()) {
- throw new EntryLookupException.MissingEntryException(ledgerId,
entryId, entryLogId, pos);
+ if (readFromLogChannel(entryLogId, fc, sizeBuff, entrySizePos) !=
sizeBuff.capacity()) {
+ throw new EntryLookupException.MissingEntryException(ledgerId,
entryId, entryLogId, entrySizePos);
}
} catch (BufferedChannelBase.BufferedChannelClosedException |
AsynchronousCloseException e) {
- throw new EntryLookupException.MissingLogFileException(ledgerId,
entryId, entryLogId, pos);
+ throw new EntryLookupException.MissingLogFileException(ledgerId,
entryId, entryLogId, entrySizePos);
}
- pos += 4;
+ return sizeBuff;
+ }
+
+ void checkEntry(long ledgerId, long entryId, long location) throws
EntryLookupException, IOException {
+ long entryLogId = logIdForOffset(location);
+ long pos = posForOffset(location);
+ BufferedReadChannel fc = getFCForEntryInternal(ledgerId, entryId,
entryLogId, pos);
+ ByteBuf sizeBuf = readEntrySize(ledgerId, entryId, entryLogId, pos,
fc);
+ validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuf);
+ }
+
+ private void validateEntry(long ledgerId, long entryId, long entryLogId,
long pos, ByteBuf sizeBuff)
+ throws IOException, EntryLookupException {
int entrySize = sizeBuff.readInt();
// entrySize does not include the ledgerId
@@ -805,23 +811,24 @@ public class EntryLogger {
throw new EntryLookupException.WrongEntryException(
thisEntryId, thisLedgerId, ledgerId, entryId, entryLogId,
pos);
}
- return new EntryLogEntry(entrySize, fc);
- }
-
- void checkEntry(long ledgerId, long entryId, long location) throws
EntryLookupException, IOException {
- long entryLogId = logIdForOffset(location);
- long pos = posForOffset(location);
- getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
}
- public ByteBuf internalReadEntry(long ledgerId, long entryId, long
location)
+ public ByteBuf internalReadEntry(long ledgerId, long entryId, long
location, boolean validateEntry)
throws IOException, Bookie.NoEntryException {
long entryLogId = logIdForOffset(location);
long pos = posForOffset(location);
- final EntryLogEntry entry;
+
+ BufferedReadChannel fc = null;
+ int entrySize = -1;
try {
- entry = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+ fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+
+ ByteBuf sizeBuff = readEntrySize(ledgerId, entryId, entryLogId,
pos, fc);
+ entrySize = sizeBuff.getInt(0);
+ if (validateEntry) {
+ validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuff);
+ }
} catch (EntryLookupException.MissingEntryException entryLookupError) {
throw new Bookie.NoEntryException("Short read from entrylog " +
entryLogId,
ledgerId, entryId);
@@ -829,9 +836,9 @@ public class EntryLogger {
throw new IOException(e.toString());
}
- ByteBuf data = allocator.buffer(entry.entrySize, entry.entrySize);
- int rc = readFromLogChannel(entryLogId, entry.fc, data, pos);
- if (rc != entry.entrySize) {
+ ByteBuf data = allocator.buffer(entrySize, entrySize);
+ int rc = readFromLogChannel(entryLogId, fc, data, pos);
+ if (rc != entrySize) {
// Note that throwing NoEntryException here instead of IOException
is not
// without risk. If all bookies in a quorum throw this same
exception
// the client will assume that it has reached the end of the
ledger.
@@ -842,16 +849,15 @@ public class EntryLogger {
data.release();
throw new Bookie.NoEntryException("Short read for " + ledgerId +
"@"
+ entryId + " in " + entryLogId
+ "@"
- + pos + "(" + rc + "!=" +
entry.entrySize + ")", ledgerId, entryId);
+ + pos + "(" + rc + "!=" +
entrySize + ")", ledgerId, entryId);
}
- data.writerIndex(entry.entrySize);
+ data.writerIndex(entrySize);
return data;
}
public ByteBuf readEntry(long ledgerId, long entryId, long location)
throws IOException, Bookie.NoEntryException {
- ByteBuf data = internalReadEntry(ledgerId, entryId, location);
- return data;
+ return internalReadEntry(ledgerId, entryId, location, true /*
validateEntry */);
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 58ce2be..e975f16 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -466,7 +466,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
long size = 0;
while (count < readAheadCacheBatchSize && currentEntryLogId ==
firstEntryLogId) {
- ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId,
-1, currentEntryLocation);
+ ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId,
-1, currentEntryLocation,
+ false /* validateEntry */);
try {
long currentEntryLedgerId = entry.getLong(0);