This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new f5e4a983e4 Improve DefaultEntryLogger read performance. (#4038)
f5e4a983e4 is described below
commit f5e4a983e48f90e40dad4115be43505d47434e0a
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Feb 12 09:20:14 2024 +0800
Improve DefaultEntryLogger read performance. (#4038)
* Avoid system call to improve read performance.
* Fix ci.
* Add comments for getCurrentWritingLogId
* Fix ci.
* Consider compacting log.
* Fix checkstyle.
* Address the comment.
* Address comment.
* Address the comments.
* Add tests.
* Fix checkstyle.
* address the comments.
* Fix concurrency problem.
---
.../bookkeeper/bookie/BufferedReadChannel.java | 27 +++++++++++-
.../bookkeeper/bookie/DefaultEntryLogger.java | 13 +++++-
.../bookkeeper/bookie/EntryLogManagerBase.java | 6 ++-
.../bookie/EntryLogManagerForSingleEntryLog.java | 5 ++-
.../bookkeeper/bookie/EntryLoggerAllocator.java | 27 +++++++++---
.../bookie/TransactionalEntryLogCompactor.java | 11 +++++
.../bookkeeper/bookie/DefaultEntryLogTest.java | 48 ++++++++++++++++++++++
7 files changed, 125 insertions(+), 12 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
index 22f5a81690..4de3890e08 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
@@ -30,7 +30,7 @@ import java.nio.channels.FileChannel;
/**
* A Buffered channel without a write buffer. Only reads are buffered.
*/
-public class BufferedReadChannel extends BufferedChannelBase {
+public class BufferedReadChannel extends BufferedChannelBase {
// The capacity of the read buffer.
protected final int readCapacity;
@@ -43,9 +43,16 @@ public class BufferedReadChannel extends BufferedChannelBase
{
long invocationCount = 0;
long cacheHitCount = 0;
+ private volatile long fileSize = -1;
+ final boolean sealed;
public BufferedReadChannel(FileChannel fileChannel, int readCapacity) {
+ this(fileChannel, readCapacity, false);
+ }
+
+ public BufferedReadChannel(FileChannel fileChannel, int readCapacity,
boolean sealed) {
super(fileChannel);
+ this.sealed = sealed;
this.readCapacity = readCapacity;
this.readBuffer = Unpooled.buffer(readCapacity);
}
@@ -64,10 +71,26 @@ public class BufferedReadChannel extends
BufferedChannelBase {
return read(dest, pos, dest.writableBytes());
}
+ @Override
+ public long size() throws IOException {
+ if (sealed) {
+ if (fileSize == -1) {
+ synchronized (this) {
+ if (fileSize == -1) {
+ fileSize = validateAndGetFileChannel().size();
+ }
+ }
+ }
+ return fileSize;
+ } else {
+ return validateAndGetFileChannel().size();
+ }
+ }
+
public synchronized int read(ByteBuf dest, long pos, int length) throws
IOException {
invocationCount++;
long currentPosition = pos;
- long eof = validateAndGetFileChannel().size();
+ long eof = size();
// return -1 if the given position is greater than or equal to the
file's current size.
if (pos >= eof) {
return -1;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
index d02ede52fb..c47c0411c2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
@@ -587,6 +587,10 @@ public class DefaultEntryLogger implements EntryLogger {
}
}
+ void clearCompactingLogId() {
+ entryLoggerAllocator.clearCompactingLogId();
+ }
+
/**
* Flushes all rotated log channels. After log channels are flushed,
* move leastUnflushedLogId ptr to current logId.
@@ -894,7 +898,8 @@ public class DefaultEntryLogger implements EntryLogger {
}
}
- private BufferedReadChannel getChannelForLogId(long entryLogId) throws
IOException {
+ @VisibleForTesting
+ BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException
{
BufferedReadChannel fc = getFromChannels(entryLogId);
if (fc != null) {
return fc;
@@ -910,7 +915,11 @@ public class DefaultEntryLogger implements EntryLogger {
}
// We set the position of the write buffer of this buffered channel to
Long.MAX_VALUE
// so that there are no overlaps with the write buffer while reading
- fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
+ if (entryLogManager instanceof EntryLogManagerForSingleEntryLog) {
+ fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(),
entryLoggerAllocator.isSealed(entryLogId));
+ } else {
+ fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(),
false);
+ }
putInReadChannels(entryLogId, fc);
return fc;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
index 36ce928a08..e997906c23 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
@@ -161,6 +161,7 @@ abstract class EntryLogManagerBase implements
EntryLogManager {
logChannel.appendLedgersMap();
BufferedLogChannel newLogChannel =
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
+ entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId());
setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
log.info("Flushing entry logger {} back to filesystem, pending for
syncing entry loggers : {}.",
logChannel.getLogId(), rotatedLogChannels);
@@ -168,8 +169,9 @@ abstract class EntryLogManagerBase implements
EntryLogManager {
listener.onRotateEntryLog();
}
} else {
- setCurrentLogForLedgerAndAddToRotate(ledgerId,
-
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()));
+ BufferedLogChannel newLogChannel =
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
+ entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId());
+ setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
index 59bcc02a57..b784511868 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
@@ -262,6 +262,9 @@ class EntryLogManagerForSingleEntryLog extends
EntryLogManagerBase {
@Override
public DefaultEntryLogger.BufferedLogChannel createNewLogForCompaction()
throws IOException {
- return
entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog());
+ BufferedLogChannel newLogForCompaction =
entryLoggerAllocator.createNewLogForCompaction(
+ selectDirForNextEntryLog());
+
entryLoggerAllocator.setWritingCompactingLogId(newLogForCompaction.getLogId());
+ return newLogForCompaction;
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
index 68fc1eb3ca..aec2fb1cd0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -64,6 +64,8 @@ class EntryLoggerAllocator {
private final boolean entryLogPreAllocationEnabled;
private final ByteBufAllocator byteBufAllocator;
final ByteBuf logfileHeader =
Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE);
+ private volatile long writingLogId = -1;
+ private volatile long writingCompactingLogId = -1;
EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager
ledgerDirsManager,
DefaultEntryLogger.RecentEntryLogsStatus
recentlyCreatedEntryLogsStatus, long logId,
@@ -91,16 +93,19 @@ class EntryLoggerAllocator {
return preallocatedLogId;
}
+ public boolean isSealed(long logId) {
+ return logId != writingLogId && logId != writingCompactingLogId;
+ }
+
BufferedLogChannel createNewLog(File dirForNextEntryLog) throws
IOException {
synchronized (createEntryLogLock) {
BufferedLogChannel bc;
- if (!entryLogPreAllocationEnabled){
+ if (!entryLogPreAllocationEnabled) {
// create a new log directly
- bc = allocateNewLog(dirForNextEntryLog);
- return bc;
+ return allocateNewLog(dirForNextEntryLog);
} else {
// allocate directly to response request
- if (null == preallocation){
+ if (null == preallocation) {
bc = allocateNewLog(dirForNextEntryLog);
} else {
// has a preallocated entry log
@@ -116,7 +121,7 @@ class EntryLoggerAllocator {
throw new IOException("Task to allocate a new entry
log is cancelled.", ce);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- throw new IOException("Intrrupted when waiting a new
entry log to be allocated.", ie);
+ throw new IOException("Interrupted when waiting a new
entry log to be allocated.", ie);
}
}
// preallocate a new log in background upon every call
@@ -132,6 +137,18 @@ class EntryLoggerAllocator {
}
}
+ void setWritingLogId(long lodId) {
+ this.writingLogId = lodId;
+ }
+
+ void setWritingCompactingLogId(long logId) {
+ this.writingCompactingLogId = logId;
+ }
+
+ void clearCompactingLogId() {
+ writingCompactingLogId = -1;
+ }
+
private synchronized BufferedLogChannel allocateNewLog(File
dirForNextEntryLog) throws IOException {
return allocateNewLog(dirForNextEntryLog, ".log");
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
index 2b6fca30c1..9a27bcccd8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
@@ -199,6 +199,7 @@ public class TransactionalEntryLogCompactor extends
AbstractLogCompactor {
LOG.info("No valid entry is found in entry log after scan,
removing entry log now.");
logRemovalListener.removeEntryLog(metadata.getEntryLogId());
compactionLog.abort();
+ compactingLogWriteDone();
return false;
}
return true;
@@ -209,6 +210,13 @@ public class TransactionalEntryLogCompactor extends
AbstractLogCompactor {
offsets.clear();
// since we haven't flushed yet, we only need to delete the
unflushed compaction file.
compactionLog.abort();
+ compactingLogWriteDone();
+ }
+ }
+
+ private void compactingLogWriteDone() {
+ if (entryLogger instanceof DefaultEntryLogger) {
+ ((DefaultEntryLogger) entryLogger).clearCompactingLogId();
}
}
@@ -241,6 +249,8 @@ public class TransactionalEntryLogCompactor extends
AbstractLogCompactor {
} catch (IOException ioe) {
LOG.warn("Error marking compaction as done", ioe);
return false;
+ } finally {
+ compactingLogWriteDone();
}
}
@@ -249,6 +259,7 @@ public class TransactionalEntryLogCompactor extends
AbstractLogCompactor {
offsets.clear();
// remove compaction log file and its hardlink
compactionLog.abort();
+ compactingLogWriteDone();
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
index 38a9ebaf21..3048ef33a8 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
@@ -67,6 +67,7 @@ import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirExcepti
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
@@ -154,6 +155,53 @@ public class DefaultEntryLogTest {
assertEquals(0L, entryLogManager.getCurrentLogId());
}
+ @Test
+ public void testEntryLogIsSealedWithPerLedgerDisabled() throws Exception {
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setEntryLogPerLedgerEnabled(false);
+ conf.setEntryLogFilePreAllocationEnabled(true);
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsProvider.TestStatsLogger statsLogger =
+
statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
+ DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr,
null, statsLogger,
+ UnpooledByteBufAllocator.DEFAULT);
+ EntryLogManagerBase entrylogManager = (EntryLogManagerBase)
entryLogger.getEntryLogManager();
+ entrylogManager.createNewLog(0);
+ BufferedReadChannel channel = entryLogger.getChannelForLogId(0);
+ assertFalse(channel.sealed);
+ entrylogManager.createNewLog(1);
+ channel = entryLogger.getChannelForLogId(0);
+ assertFalse(channel.sealed);
+ entrylogManager.createNewLog(2);
+ channel = entryLogger.getChannelForLogId(1);
+ assertTrue(channel.sealed);
+ }
+
+ @Test
+ public void testEntryLogIsSealedWithPerLedgerEnabled() throws Exception {
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ //If entryLogPerLedgerEnabled is true, the buffer channel sealed flag
always false.
+ conf.setEntryLogPerLedgerEnabled(true);
+ conf.setEntryLogFilePreAllocationEnabled(true);
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsProvider.TestStatsLogger statsLogger =
+
statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
+ DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr,
null, statsLogger,
+ UnpooledByteBufAllocator.DEFAULT);
+ EntryLogManagerBase entrylogManager = (EntryLogManagerBase)
entryLogger.getEntryLogManager();
+ entrylogManager.createNewLog(0);
+ BufferedReadChannel channel = entryLogger.getChannelForLogId(0);
+ assertFalse(channel.sealed);
+ entrylogManager.createNewLog(1);
+ channel = entryLogger.getChannelForLogId(0);
+ assertFalse(channel.sealed);
+ entrylogManager.createNewLog(2);
+ channel = entryLogger.getChannelForLogId(1);
+ assertFalse(channel.sealed);
+ }
+
@Test
public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws
Exception {
entryLogger.close();