This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.16 by this push:
new e0f23cf021 [feature] [server] add
dbStorage_readAheadCacheBatchBytesSize properties when read ahead entries
(#3895) (#4463)
e0f23cf021 is described below
commit e0f23cf021ef9d241acb43a8ac2a06aa487d5538
Author: Hang Chen <[email protected]>
AuthorDate: Mon Jul 8 17:14:43 2024 +0800
[feature] [server] add dbStorage_readAheadCacheBatchBytesSize properties
when read ahead entries (#3895) (#4463)
* [feature] [server] add dbStorage_readAheadCacheBatchBytesSize properties
when read ahead entries
---------
Co-authored-by: lushiji <[email protected]>
(cherry picked from commit f5455f01584b1b0a592f020eed49d3cb774da0a9)
Co-authored-by: StevenLuMT <[email protected]>
---
.../bookie/storage/ldb/DbLedgerStorage.java | 11 +-
.../ldb/SingleDirectoryDbLedgerStorage.java | 25 +-
.../apache/bookkeeper/bookie/TestBookieImpl.java | 28 +-
.../storage/ldb/DbLedgerStorageReadCacheTest.java | 368 +++++++++++++++++++++
.../storage/ldb/DbLedgerStorageWriteCacheTest.java | 10 +-
conf/bk_server.conf | 3 +
6 files changed, 429 insertions(+), 16 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 8824b1cb6f..60f752e226 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -102,7 +102,10 @@ public class DbLedgerStorage implements LedgerStorage {
(long) (0.25 * PlatformDependent.estimateMaxDirectMemory()) / MB;
static final String READ_AHEAD_CACHE_BATCH_SIZE =
"dbStorage_readAheadCacheBatchSize";
+ static final String READ_AHEAD_CACHE_BATCH_BYTES_SIZE =
"dbStorage_readAheadCacheBatchBytesSize";
private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
+ // the default value is -1. this feature(limit of read ahead bytes) is
disabled
+ private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE = -1;
private static final long DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB =
(long) (0.125 * PlatformDependent.estimateMaxDirectMemory())
@@ -171,6 +174,8 @@ public class DbLedgerStorage implements LedgerStorage {
long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
int readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE,
DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
+ long readAheadCacheBatchBytesSize =
conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
+ DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE);
ledgerStorageList = Lists.newArrayList();
for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
@@ -237,7 +242,7 @@ public class DbLedgerStorage implements LedgerStorage {
idm, entrylogger,
statsLogger, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize,
- readAheadCacheBatchSize));
+ readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
@@ -276,11 +281,11 @@ public class DbLedgerStorage implements LedgerStorage {
protected SingleDirectoryDbLedgerStorage
newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
EntryLogger entryLogger, StatsLogger statsLogger, long
writeCacheSize, long readCacheSize,
- int readAheadCacheBatchSize)
+ int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize)
throws IOException {
return new SingleDirectoryDbLedgerStorage(conf, ledgerManager,
ledgerDirsManager, indexDirsManager, entryLogger,
statsLogger, allocator,
writeCacheSize, readCacheSize,
- readAheadCacheBatchSize);
+ readAheadCacheBatchSize,
readAheadCacheBatchBytesSize);
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 7f004c20e5..61aebd8e1a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -137,6 +137,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
private final long writeCacheMaxSize;
private final long readCacheMaxSize;
private final int readAheadCacheBatchSize;
+ private final long readAheadCacheBatchBytesSize;
private final long maxThrottleTimeNanos;
@@ -152,7 +153,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
public SingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
EntryLogger entryLogger, StatsLogger
statsLogger, ByteBufAllocator allocator,
- long writeCacheSize, long
readCacheSize, int readAheadCacheBatchSize)
+ long writeCacheSize, long
readCacheSize, int readAheadCacheBatchSize,
+ long readAheadCacheBatchBytesSize)
throws IOException {
checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
"Db implementation only allows for one storage dir");
@@ -182,6 +184,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
readCacheMaxSize = readCacheSize;
this.readAheadCacheBatchSize = readAheadCacheBatchSize;
+ this.readAheadCacheBatchBytesSize = readAheadCacheBatchBytesSize;
// Do not attempt to perform read-ahead more than half the total size
of the cache
maxReadAheadBytesSize = readCacheMaxSize / 2;
@@ -663,9 +666,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
long currentEntryLogId = firstEntryLogId;
long currentEntryLocation = firstEntryLocation;
- while (count < readAheadCacheBatchSize
- && size < maxReadAheadBytesSize
- && currentEntryLogId == firstEntryLogId) {
+ while (chargeReadAheadCache(count, size) && currentEntryLogId ==
firstEntryLogId) {
ByteBuf entry = entryLogger.readEntry(orginalLedgerId,
firstEntryId, currentEntryLocation);
@@ -703,6 +704,17 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
}
}
+ protected boolean chargeReadAheadCache(int currentReadAheadCount, long
currentReadAheadBytes) {
+ // compatible with old logic
+ boolean chargeSizeCondition = currentReadAheadCount <
readAheadCacheBatchSize
+ && currentReadAheadBytes < maxReadAheadBytesSize;
+ if (chargeSizeCondition && readAheadCacheBatchBytesSize > 0) {
+ // exact limits limit the size and count for each batch
+ chargeSizeCondition = currentReadAheadBytes <
readAheadCacheBatchBytesSize;
+ }
+ return chargeSizeCondition;
+ }
+
public ByteBuf getLastEntry(long ledgerId) throws IOException,
BookieException {
throwIfLimbo(ledgerId);
@@ -1278,4 +1290,9 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
}
}
}
+
+ @VisibleForTesting
+ DbLedgerStorageStats getDbLedgerStorageStats() {
+ return dbLedgerStorageStats;
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java
index a6bb99174e..cd0e967b61 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.NullMetadataBookieDriver;
import org.apache.bookkeeper.proto.SimpleBookieServiceInfoProvider;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,19 @@ public class TestBookieImpl extends BookieImpl {
this(new ResourceBuilder(conf).build());
}
+ public TestBookieImpl(Resources resources, StatsLogger statsLogger) throws
Exception {
+ super(resources.conf,
+ resources.registrationManager,
+ resources.storage,
+ resources.diskChecker,
+ resources.ledgerDirsManager,
+ resources.indexDirsManager,
+ statsLogger,
+ UnpooledByteBufAllocator.DEFAULT,
+ new SimpleBookieServiceInfoProvider(resources.conf));
+ this.resources = resources;
+ }
+
public TestBookieImpl(Resources resources) throws Exception {
super(resources.conf,
resources.registrationManager,
@@ -157,12 +171,16 @@ public class TestBookieImpl extends BookieImpl {
return this;
}
- Resources build() throws Exception {
+ public Resources build() throws Exception {
+ return build(NullStatsLogger.INSTANCE);
+ }
+
+ public Resources build(StatsLogger statsLogger) throws Exception {
if (metadataBookieDriver == null) {
if (conf.getMetadataServiceUri() == null) {
metadataBookieDriver = new NullMetadataBookieDriver();
} else {
- metadataBookieDriver =
BookieResources.createMetadataDriver(conf, NullStatsLogger.INSTANCE);
+ metadataBookieDriver =
BookieResources.createMetadataDriver(conf, statsLogger);
}
}
if (registrationManager == null) {
@@ -173,13 +191,13 @@ public class TestBookieImpl extends BookieImpl {
DiskChecker diskChecker = BookieResources.createDiskChecker(conf);
LedgerDirsManager ledgerDirsManager =
BookieResources.createLedgerDirsManager(
- conf, diskChecker, NullStatsLogger.INSTANCE);
+ conf, diskChecker, statsLogger);
LedgerDirsManager indexDirsManager =
BookieResources.createIndexDirsManager(
- conf, diskChecker, NullStatsLogger.INSTANCE,
ledgerDirsManager);
+ conf, diskChecker, statsLogger, ledgerDirsManager);
LedgerStorage storage = BookieResources.createLedgerStorage(
conf, ledgerManager, ledgerDirsManager, indexDirsManager,
- NullStatsLogger.INSTANCE,
UnpooledByteBufAllocator.DEFAULT);
+ statsLogger, UnpooledByteBufAllocator.DEFAULT);
return new Resources(conf,
metadataBookieDriver,
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java
new file mode 100644
index 0000000000..81ef7f9495
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java
@@ -0,0 +1,368 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.READ_AHEAD_CACHE_BATCH_BYTES_SIZE;
+import static
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.READ_AHEAD_CACHE_BATCH_SIZE;
+import static
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.List;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger;
+import org.apache.bookkeeper.bookie.TestBookieImpl;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for {@link DbLedgerStorage}.
+ */
+public class DbLedgerStorageReadCacheTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DbLedgerStorageReadCacheTest.class);
+
+ @Test
+ public void chargeReadAheadCacheRegressionTest() {
+ TestDB testDB = new TestDB();
+ try {
+ long readAheadCacheMaxSizeMb = 16L;
+ int readAheadCacheBatchSize = 1024;
+ long readAheadCacheBatchBytesSize = -1;
+ setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize,
readAheadCacheBatchBytesSize);
+ SingleDirectoryDbLedgerStorage sdb =
testDB.getStorage().getLedgerStorageList().get(0);
+ /**
+ * case1: currentReadAheadCount < readAheadCacheBatchSize
+ * currentReadAheadBytes < maxReadAheadBytesSize
+ * result: true
+ */
+ int currentReadAheadCount = 1;
+ long currentReadAheadBytes = 1;
+ assertTrue(sdb.chargeReadAheadCache(currentReadAheadCount,
currentReadAheadBytes));
+
+ /**
+ * case2: currentReadAheadCount > readAheadCacheBatchSize
+ * currentReadAheadBytes < maxReadAheadBytesSize
+ * result: false
+ */
+ currentReadAheadCount = readAheadCacheBatchSize + 1;
+ currentReadAheadBytes = 1;
+ assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount,
currentReadAheadBytes));
+
+ /**
+ * case3: currentReadAheadCount < readAheadCacheBatchSize
+ * currentReadAheadBytes > maxReadAheadBytesSize
+ * result: false
+ */
+ currentReadAheadCount = 1;
+ currentReadAheadBytes = readAheadCacheMaxSizeMb / 2 * 1024 * 1024
+ 1;
+ assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount,
currentReadAheadBytes));
+ } catch (Throwable e) {
+ LOGGER.error("readAheadCacheBatchSizeUnitTest run error", e);
+ } finally {
+ teardown(testDB.getStorage(), testDB.getTmpDir());
+ }
+ }
+
+ @Test
+ public void chargeReadAheadCacheUnitTest() {
+ TestDB testDB = new TestDB();
+ try {
+ long readAheadCacheMaxSizeMb = 16L;
+ int readAheadCacheBatchSize = 1024;
+ long readAheadCacheBatchBytesSize = 2 * 1024 * 1024;
+ setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize,
readAheadCacheBatchBytesSize);
+ SingleDirectoryDbLedgerStorage sdb =
testDB.getStorage().getLedgerStorageList().get(0);
+ /**
+ * case1: currentReadAheadCount < readAheadCacheBatchSize
+ * currentReadAheadBytes < readAheadCacheBatchBytesSize
+ * currentReadAheadBytes < readCacheMaxSize
+ * result: true
+ */
+ int currentReadAheadCount = 1;
+ long currentReadAheadBytes = 1;
+ assertTrue(sdb.chargeReadAheadCache(currentReadAheadCount,
currentReadAheadBytes));
+
+ /**
+ * case2: currentReadAheadCount > readAheadCacheBatchSize
+ * currentReadAheadBytes < readAheadCacheBatchBytesSize
+ * currentReadAheadBytes < readCacheMaxSize
+ * result: false
+ */
+ currentReadAheadCount = readAheadCacheBatchSize + 1;
+ currentReadAheadBytes = 1;
+ assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount,
currentReadAheadBytes));
+
+ /**
+ * case3: currentReadAheadCount < readAheadCacheBatchSize
+ * currentReadAheadBytes > readAheadCacheBatchBytesSize
+ * currentReadAheadBytes < readCacheMaxSize
+ * result: false
+ */
+ currentReadAheadCount = 1;
+ currentReadAheadBytes = readAheadCacheBatchBytesSize + 1;
+ assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount,
currentReadAheadBytes));
+ } catch (Throwable e) {
+ LOGGER.error("readAheadCacheBatchSizeUnitTest run error", e);
+ } finally {
+ teardown(testDB.getStorage(), testDB.getTmpDir());
+ }
+ }
+
+ @Test
+ public void compareDiffReadAheadPerfTest() {
+ /**
+ * case1(read ahead cache by limit bytes size):
+ * config: readAheadCacheMaxSizeMb = 2 * 8;
+ * readAheadCacheBatchSize = 1024;
+ * readAheadCacheBatchBytesSize = 2 * 1024 * 1024;
+ * case content:
+ * LedgerId:0, read 1024 pieces of entry,each piece of entry is 10KB
+ * LedgerId:1, read 1024 pieces of entry,each piece of entry is 10KB
+ * LedgerId:2, read 1024 pieces of entry,each piece of entry is 10KB
+ * LedgerId:3, read 1024 pieces of entry,each piece of entry is 10KB
+ */
+ CacheResult cacheBatchBytesSizeResult = readAheadCacheBatchBytesSize();
+
+ /**
+ * case2(read ahead cache by limit count):
+ * config: readAheadCacheMaxSizeMb = 2 * 8;
+ * readAheadCacheBatchSize = 1024;
+ * case content:
+ * LedgerId:0, read 1024 pieces of entry,each piece of entry is 10KB
+ * LedgerId:1, read 1024 pieces of entry,each piece of entry is 10KB
+ * LedgerId:2, read 1024 pieces of entry,each piece of entry is 10KB
+ * LedgerId:3, read 1024 pieces of entry,each piece of entry is 10KB
+ */
+ CacheResult cacheBatchSizeResult = readAheadCacheBatchSize();
+
+ /**
+ * result: case1(read ahead cache by limit bytes size) get less
cachemiss,
+ * it is suitable for large messages, reduce the pollution of
readAhead large messages to readCache
+ */
+ assertEquals(8, cacheBatchBytesSizeResult.getCacheMissCount());
+ assertEquals(132, cacheBatchSizeResult.getCacheMissCount());
+ assertTrue(cacheBatchBytesSizeResult.getCacheMissCount() <
cacheBatchSizeResult.getCacheMissCount());
+ assertEquals(
+ cacheBatchBytesSizeResult.getCacheMissCount() +
cacheBatchBytesSizeResult.getCacheHitCount(),
+ cacheBatchSizeResult.getCacheMissCount() +
cacheBatchSizeResult.getCacheHitCount());
+ }
+
+ public void setup(TestDB testDB, long readAheadCacheMaxSizeMb,
+ int readAheadCacheBatchSize, long
readAheadCacheBatchBytesSize) throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = BookieImpl.getCurrentDirectory(tmpDir);
+ BookieImpl.checkDirectoryStructure(curDir);
+
+ int gcWaitTime = 1000;
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerDirNames(new String[]{tmpDir.toString()});
+ if (readAheadCacheMaxSizeMb > 0) {
+ conf.setProperty(READ_AHEAD_CACHE_MAX_SIZE_MB,
readAheadCacheMaxSizeMb);
+ }
+ if (readAheadCacheBatchSize > 0) {
+ conf.setProperty(READ_AHEAD_CACHE_BATCH_SIZE,
readAheadCacheBatchSize);
+ }
+ if (readAheadCacheBatchBytesSize > 0) {
+ conf.setProperty(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
readAheadCacheBatchBytesSize);
+ }
+ TestStatsProvider.TestStatsLogger statsLogger = new
TestStatsProvider().getStatsLogger("test");
+ BookieImpl bookie = new TestBookieImpl(new
TestBookieImpl.ResourceBuilder(conf).build(statsLogger),
+ statsLogger);
+
+ DbLedgerStorage storage = (DbLedgerStorage) bookie.getLedgerStorage();
+
+ storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage
-> {
+ assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger()
instanceof DefaultEntryLogger);
+ });
+ testDB.setStorage(storage);
+ testDB.setTmpDir(tmpDir);
+ }
+
+ public void teardown(DbLedgerStorage storage, File tmpDir) {
+ if (storage != null) {
+ try {
+ storage.shutdown();
+ } catch (InterruptedException e) {
+ LOGGER.error("storage.shutdown has error", e);
+ }
+ }
+ if (tmpDir != null) {
+ tmpDir.delete();
+ }
+ }
+
+ private void addEntries(DbLedgerStorage storage, long minLedgerId, long
maxLedgerId,
+ long minEntryId, long maxEntryId) throws Exception
{
+ // Add entries
+ for (long lid = minLedgerId; lid < maxLedgerId; lid++) {
+ long lac = 0;
+ for (long eid = minEntryId; eid < maxEntryId; eid++) {
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(lid); // ledger id
+ entry.writeLong(eid); // entry id
+ entry.writeLong(lac); // lac
+ entry.writeBytes((get4KbMsg()).getBytes());
+ assertEquals(eid, storage.addEntry(entry));
+ lac++;
+ }
+ }
+ }
+
+ private String get4KbMsg() {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < 1024; i++) {
+ buffer.append("1234");
+ }
+ assertEquals(4 * 1024, buffer.toString().length());
+ return buffer.toString();
+ }
+
+ private CacheResult readAheadCacheBatchBytesSize() {
+ Long cacheMissCount;
+ TestDB testDB = new TestDB();
+ try {
+ long readAheadCacheMaxSizeMb = 2 * 8L;
+ int readAheadCacheBatchSize = 1024;
+ long readAheadCacheBatchBytesSize = 2 * 1024 * 1024;
+ long minEntryId = 0;
+ long maxEntryId = 1024;
+
+ setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize,
readAheadCacheBatchBytesSize);
+ addEntries(testDB.getStorage(), 0, 4, minEntryId, maxEntryId);
+
+ testDB.getStorage().flush();
+ assertEquals(false, testDB.getStorage().isFlushRequired());
+ // Read from db
+ for (long eid = minEntryId; eid < maxEntryId / 2; eid++) {
+ testDB.getStorage().getEntry(0, eid);
+ testDB.getStorage().getEntry(1, eid);
+ testDB.getStorage().getEntry(2, eid);
+ testDB.getStorage().getEntry(3, eid);
+ }
+ List<SingleDirectoryDbLedgerStorage> ledgerStorageList =
testDB.getStorage().getLedgerStorageList();
+ DbLedgerStorageStats ledgerStats =
ledgerStorageList.get(0).getDbLedgerStorageStats();
+ cacheMissCount = ledgerStats.getReadCacheMissCounter().get();
+ Long cacheHitCount = ledgerStats.getReadCacheHitCounter().get();
+ LOGGER.info("simple1.cacheMissCount={},cacheHitCount={}",
cacheMissCount, cacheHitCount);
+ return new CacheResult(cacheMissCount, cacheHitCount);
+ } catch (Throwable e) {
+ LOGGER.error("test case run error", e);
+ return new CacheResult(0, 0);
+ } finally {
+ teardown(testDB.getStorage(), testDB.getTmpDir());
+ }
+ }
+
+ public CacheResult readAheadCacheBatchSize() {
+ Long cacheMissCount;
+ TestDB testDB = new TestDB();
+ try {
+ long readAheadCacheMaxSizeMb = 2 * 8L;
+ int readAheadCacheBatchSize = 1024;
+ long readAheadCacheBatchBytesSize = -1;
+ long minEntryId = 0;
+ long maxEntryId = 1024;
+
+ setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize,
readAheadCacheBatchBytesSize);
+ addEntries(testDB.getStorage(), 0, 4, minEntryId, maxEntryId);
+
+ testDB.getStorage().flush();
+ assertEquals(false, testDB.getStorage().isFlushRequired());
+ // Read from db
+ for (long eid = minEntryId; eid < maxEntryId / 2; eid++) {
+ testDB.getStorage().getEntry(0, eid);
+ testDB.getStorage().getEntry(1, eid);
+ testDB.getStorage().getEntry(2, eid);
+ testDB.getStorage().getEntry(3, eid);
+ }
+ List<SingleDirectoryDbLedgerStorage> ledgerStorageList =
testDB.getStorage().getLedgerStorageList();
+ DbLedgerStorageStats ledgerStats =
ledgerStorageList.get(0).getDbLedgerStorageStats();
+ cacheMissCount = ledgerStats.getReadCacheMissCounter().get();
+ Long cacheHitCount = ledgerStats.getReadCacheHitCounter().get();
+ LOGGER.info("simple2.cacheMissCount={},cacheHitCount={}",
cacheMissCount, cacheHitCount);
+ return new CacheResult(cacheMissCount, cacheHitCount);
+ } catch (Throwable e) {
+ LOGGER.error("test case run error", e);
+ return new CacheResult(0, 0);
+ } finally {
+ teardown(testDB.getStorage(), testDB.getTmpDir());
+ }
+ }
+
+ private class TestDB {
+ private DbLedgerStorage storage;
+ private File tmpDir;
+
+ public DbLedgerStorage getStorage() {
+ return storage;
+ }
+
+ public void setStorage(DbLedgerStorage storage) {
+ this.storage = storage;
+ }
+
+ public File getTmpDir() {
+ return tmpDir;
+ }
+
+ public void setTmpDir(File tmpDir) {
+ this.tmpDir = tmpDir;
+ }
+ }
+
+ private class CacheResult {
+ private long cacheMissCount;
+ private long cacheHitCount;
+
+ private CacheResult(long cacheMissCount, long cacheHitCount) {
+ this.cacheMissCount = cacheMissCount;
+ this.cacheHitCount = cacheHitCount;
+ }
+
+ public long getCacheMissCount() {
+ return cacheMissCount;
+ }
+
+ public void setCacheMissCount(long cacheMissCount) {
+ this.cacheMissCount = cacheMissCount;
+ }
+
+ public long getCacheHitCount() {
+ return cacheHitCount;
+ }
+
+ public void setCacheHitCount(long cacheHitCount) {
+ this.cacheHitCount = cacheHitCount;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index a9bee08cb2..102f7f5add 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -56,11 +56,11 @@ public class DbLedgerStorageWriteCacheTest {
protected SingleDirectoryDbLedgerStorage
newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
EntryLogger entryLogger, StatsLogger statsLogger,
- long writeCacheSize, long readCacheSize, int
readAheadCacheBatchSize)
+ long writeCacheSize, long readCacheSize, int
readAheadCacheBatchSize, long readAheadCacheBatchBytesSize)
throws IOException {
return new MockedSingleDirectoryDbLedgerStorage(conf,
ledgerManager, ledgerDirsManager, indexDirsManager,
entryLogger, statsLogger, allocator, writeCacheSize,
- readCacheSize, readAheadCacheBatchSize);
+ readCacheSize, readAheadCacheBatchSize,
readAheadCacheBatchBytesSize);
}
private static class MockedSingleDirectoryDbLedgerStorage extends
SingleDirectoryDbLedgerStorage {
@@ -68,9 +68,11 @@ public class DbLedgerStorageWriteCacheTest {
LedgerDirsManager ledgerDirsManager, LedgerDirsManager
indexDirsManager, EntryLogger entryLogger,
StatsLogger statsLogger,
ByteBufAllocator allocator, long writeCacheSize,
- long readCacheSize, int readAheadCacheBatchSize) throws
IOException {
+ long readCacheSize, int readAheadCacheBatchSize, long
readAheadCacheBatchBytesSize)
+ throws IOException {
super(conf, ledgerManager, ledgerDirsManager,
indexDirsManager, entryLogger,
- statsLogger, allocator, writeCacheSize, readCacheSize,
readAheadCacheBatchSize);
+ statsLogger, allocator, writeCacheSize, readCacheSize,
readAheadCacheBatchSize,
+ readAheadCacheBatchBytesSize);
}
@Override
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index f7dbf38032..c0a021418d 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -749,6 +749,9 @@ gcEntryLogMetadataCacheEnabled=false
# By default it will be allocated to 25% of the available direct memory
# dbStorage_readAheadCacheMaxSizeMb=
+# How many entries' bytes to pre-fill in cache after a read cache miss.
Default is -1. 0 or less disables this feature
+# dbStorage_readAheadCacheBatchBytesSize=-1
+
# How many entries to pre-fill in cache after a read cache miss
# dbStorage_readAheadCacheBatchSize=100