This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.16 by this push:
     new e0f23cf021 [feature] [server] add 
dbStorage_readAheadCacheBatchBytesSize properties when read ahead entries 
(#3895) (#4463)
e0f23cf021 is described below

commit e0f23cf021ef9d241acb43a8ac2a06aa487d5538
Author: Hang Chen <[email protected]>
AuthorDate: Mon Jul 8 17:14:43 2024 +0800

    [feature] [server] add dbStorage_readAheadCacheBatchBytesSize properties 
when read ahead entries (#3895) (#4463)
    
    * [feature] [server] add dbStorage_readAheadCacheBatchBytesSize properties 
when read ahead entries
    
    ---------
    
    Co-authored-by: lushiji <[email protected]>
    (cherry picked from commit f5455f01584b1b0a592f020eed49d3cb774da0a9)
    
    Co-authored-by: StevenLuMT <[email protected]>
---
 .../bookie/storage/ldb/DbLedgerStorage.java        |  11 +-
 .../ldb/SingleDirectoryDbLedgerStorage.java        |  25 +-
 .../apache/bookkeeper/bookie/TestBookieImpl.java   |  28 +-
 .../storage/ldb/DbLedgerStorageReadCacheTest.java  | 368 +++++++++++++++++++++
 .../storage/ldb/DbLedgerStorageWriteCacheTest.java |  10 +-
 conf/bk_server.conf                                |   3 +
 6 files changed, 429 insertions(+), 16 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 8824b1cb6f..60f752e226 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -102,7 +102,10 @@ public class DbLedgerStorage implements LedgerStorage {
         (long) (0.25 * PlatformDependent.estimateMaxDirectMemory()) / MB;
 
     static final String READ_AHEAD_CACHE_BATCH_SIZE = 
"dbStorage_readAheadCacheBatchSize";
+    static final String READ_AHEAD_CACHE_BATCH_BYTES_SIZE = 
"dbStorage_readAheadCacheBatchBytesSize";
     private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
+    // the default value is -1. this feature(limit of read ahead bytes) is 
disabled
+    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE = -1;
 
     private static final long DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB =
         (long) (0.125 * PlatformDependent.estimateMaxDirectMemory())
@@ -171,6 +174,8 @@ public class DbLedgerStorage implements LedgerStorage {
         long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
         long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
         int readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, 
DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
+        long readAheadCacheBatchBytesSize = 
conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
+                DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE);
 
         ledgerStorageList = Lists.newArrayList();
         for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
@@ -237,7 +242,7 @@ public class DbLedgerStorage implements LedgerStorage {
                 idm, entrylogger,
                 statsLogger, perDirectoryWriteCacheSize,
                 perDirectoryReadCacheSize,
-                readAheadCacheBatchSize));
+                readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
             
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
             if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
                 
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
@@ -276,11 +281,11 @@ public class DbLedgerStorage implements LedgerStorage {
     protected SingleDirectoryDbLedgerStorage 
newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
             LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, 
LedgerDirsManager indexDirsManager,
             EntryLogger entryLogger, StatsLogger statsLogger, long 
writeCacheSize, long readCacheSize,
-            int readAheadCacheBatchSize)
+            int readAheadCacheBatchSize, long readAheadCacheBatchBytesSize)
             throws IOException {
         return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, 
ledgerDirsManager, indexDirsManager, entryLogger,
                                                   statsLogger, allocator, 
writeCacheSize, readCacheSize,
-                                                  readAheadCacheBatchSize);
+                                                  readAheadCacheBatchSize, 
readAheadCacheBatchBytesSize);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 7f004c20e5..61aebd8e1a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -137,6 +137,7 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
     private final long writeCacheMaxSize;
     private final long readCacheMaxSize;
     private final int readAheadCacheBatchSize;
+    private final long readAheadCacheBatchBytesSize;
 
     private final long maxThrottleTimeNanos;
 
@@ -152,7 +153,8 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
     public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, 
LedgerManager ledgerManager,
                                           LedgerDirsManager ledgerDirsManager, 
LedgerDirsManager indexDirsManager,
                                           EntryLogger entryLogger, StatsLogger 
statsLogger, ByteBufAllocator allocator,
-                                          long writeCacheSize, long 
readCacheSize, int readAheadCacheBatchSize)
+                                          long writeCacheSize, long 
readCacheSize, int readAheadCacheBatchSize,
+                                          long readAheadCacheBatchBytesSize)
             throws IOException {
         checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
                 "Db implementation only allows for one storage dir");
@@ -182,6 +184,7 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
 
         readCacheMaxSize = readCacheSize;
         this.readAheadCacheBatchSize = readAheadCacheBatchSize;
+        this.readAheadCacheBatchBytesSize = readAheadCacheBatchBytesSize;
 
         // Do not attempt to perform read-ahead more than half the total size 
of the cache
         maxReadAheadBytesSize = readCacheMaxSize / 2;
@@ -663,9 +666,7 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
             long currentEntryLogId = firstEntryLogId;
             long currentEntryLocation = firstEntryLocation;
 
-            while (count < readAheadCacheBatchSize
-                    && size < maxReadAheadBytesSize
-                    && currentEntryLogId == firstEntryLogId) {
+            while (chargeReadAheadCache(count, size) && currentEntryLogId == 
firstEntryLogId) {
                 ByteBuf entry = entryLogger.readEntry(orginalLedgerId,
                         firstEntryId, currentEntryLocation);
 
@@ -703,6 +704,17 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
         }
     }
 
+    protected boolean chargeReadAheadCache(int currentReadAheadCount, long 
currentReadAheadBytes) {
+        // compatible with old logic
+        boolean chargeSizeCondition = currentReadAheadCount < 
readAheadCacheBatchSize
+                && currentReadAheadBytes < maxReadAheadBytesSize;
+        if (chargeSizeCondition && readAheadCacheBatchBytesSize > 0) {
+            // exact limits limit the size and count for each batch
+            chargeSizeCondition = currentReadAheadBytes < 
readAheadCacheBatchBytesSize;
+        }
+        return chargeSizeCondition;
+    }
+
     public ByteBuf getLastEntry(long ledgerId) throws IOException, 
BookieException {
         throwIfLimbo(ledgerId);
 
@@ -1278,4 +1290,9 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
             }
         }
     }
+
+    @VisibleForTesting
+    DbLedgerStorageStats getDbLedgerStorageStats() {
+        return dbLedgerStorageStats;
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java
index a6bb99174e..cd0e967b61 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestBookieImpl.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.meta.MetadataBookieDriver;
 import org.apache.bookkeeper.meta.NullMetadataBookieDriver;
 import org.apache.bookkeeper.proto.SimpleBookieServiceInfoProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +46,19 @@ public class TestBookieImpl extends BookieImpl {
         this(new ResourceBuilder(conf).build());
     }
 
+    public TestBookieImpl(Resources resources, StatsLogger statsLogger) throws 
Exception {
+        super(resources.conf,
+                resources.registrationManager,
+                resources.storage,
+                resources.diskChecker,
+                resources.ledgerDirsManager,
+                resources.indexDirsManager,
+                statsLogger,
+                UnpooledByteBufAllocator.DEFAULT,
+                new SimpleBookieServiceInfoProvider(resources.conf));
+        this.resources = resources;
+    }
+
     public TestBookieImpl(Resources resources) throws Exception {
         super(resources.conf,
                 resources.registrationManager,
@@ -157,12 +171,16 @@ public class TestBookieImpl extends BookieImpl {
             return this;
         }
 
-        Resources build() throws Exception {
+        public Resources build() throws Exception {
+            return build(NullStatsLogger.INSTANCE);
+        }
+
+        public Resources build(StatsLogger statsLogger) throws Exception {
             if (metadataBookieDriver == null) {
                 if (conf.getMetadataServiceUri() == null) {
                     metadataBookieDriver = new NullMetadataBookieDriver();
                 } else {
-                    metadataBookieDriver = 
BookieResources.createMetadataDriver(conf, NullStatsLogger.INSTANCE);
+                    metadataBookieDriver = 
BookieResources.createMetadataDriver(conf, statsLogger);
                 }
             }
             if (registrationManager == null) {
@@ -173,13 +191,13 @@ public class TestBookieImpl extends BookieImpl {
 
             DiskChecker diskChecker = BookieResources.createDiskChecker(conf);
             LedgerDirsManager ledgerDirsManager = 
BookieResources.createLedgerDirsManager(
-                    conf, diskChecker, NullStatsLogger.INSTANCE);
+                    conf, diskChecker, statsLogger);
             LedgerDirsManager indexDirsManager = 
BookieResources.createIndexDirsManager(
-                    conf, diskChecker, NullStatsLogger.INSTANCE, 
ledgerDirsManager);
+                    conf, diskChecker, statsLogger, ledgerDirsManager);
 
             LedgerStorage storage = BookieResources.createLedgerStorage(
                     conf, ledgerManager, ledgerDirsManager, indexDirsManager,
-                    NullStatsLogger.INSTANCE, 
UnpooledByteBufAllocator.DEFAULT);
+                    statsLogger, UnpooledByteBufAllocator.DEFAULT);
 
             return new Resources(conf,
                                  metadataBookieDriver,
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java
new file mode 100644
index 0000000000..81ef7f9495
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageReadCacheTest.java
@@ -0,0 +1,368 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.READ_AHEAD_CACHE_BATCH_BYTES_SIZE;
+import static 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.READ_AHEAD_CACHE_BATCH_SIZE;
+import static 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.List;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger;
+import org.apache.bookkeeper.bookie.TestBookieImpl;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for {@link DbLedgerStorage}.
+ */
+public class DbLedgerStorageReadCacheTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DbLedgerStorageReadCacheTest.class);
+
+    @Test
+    public void chargeReadAheadCacheRegressionTest() {
+        TestDB testDB = new TestDB();
+        try {
+            long readAheadCacheMaxSizeMb = 16L;
+            int readAheadCacheBatchSize = 1024;
+            long readAheadCacheBatchBytesSize = -1;
+            setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize, 
readAheadCacheBatchBytesSize);
+            SingleDirectoryDbLedgerStorage sdb = 
testDB.getStorage().getLedgerStorageList().get(0);
+            /**
+             * case1: currentReadAheadCount < readAheadCacheBatchSize
+             *        currentReadAheadBytes < maxReadAheadBytesSize
+             * result: true
+             */
+            int currentReadAheadCount = 1;
+            long currentReadAheadBytes = 1;
+            assertTrue(sdb.chargeReadAheadCache(currentReadAheadCount, 
currentReadAheadBytes));
+
+            /**
+             * case2: currentReadAheadCount > readAheadCacheBatchSize
+             *        currentReadAheadBytes < maxReadAheadBytesSize
+             * result: false
+             */
+            currentReadAheadCount = readAheadCacheBatchSize + 1;
+            currentReadAheadBytes = 1;
+            assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount, 
currentReadAheadBytes));
+
+            /**
+             * case3: currentReadAheadCount < readAheadCacheBatchSize
+             *        currentReadAheadBytes > maxReadAheadBytesSize
+             * result: false
+             */
+            currentReadAheadCount = 1;
+            currentReadAheadBytes = readAheadCacheMaxSizeMb / 2 * 1024 * 1024 
+ 1;
+            assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount, 
currentReadAheadBytes));
+        } catch (Throwable e) {
+            LOGGER.error("readAheadCacheBatchSizeUnitTest run error", e);
+        } finally {
+            teardown(testDB.getStorage(), testDB.getTmpDir());
+        }
+    }
+
+    @Test
+    public void chargeReadAheadCacheUnitTest() {
+        TestDB testDB = new TestDB();
+        try {
+            long readAheadCacheMaxSizeMb = 16L;
+            int readAheadCacheBatchSize = 1024;
+            long readAheadCacheBatchBytesSize = 2 * 1024 * 1024;
+            setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize, 
readAheadCacheBatchBytesSize);
+            SingleDirectoryDbLedgerStorage sdb = 
testDB.getStorage().getLedgerStorageList().get(0);
+            /**
+             * case1: currentReadAheadCount < readAheadCacheBatchSize
+             *        currentReadAheadBytes < readAheadCacheBatchBytesSize
+             *        currentReadAheadBytes < readCacheMaxSize
+             * result: true
+             */
+            int currentReadAheadCount = 1;
+            long currentReadAheadBytes = 1;
+            assertTrue(sdb.chargeReadAheadCache(currentReadAheadCount, 
currentReadAheadBytes));
+
+            /**
+             * case2: currentReadAheadCount > readAheadCacheBatchSize
+             *        currentReadAheadBytes < readAheadCacheBatchBytesSize
+             *        currentReadAheadBytes < readCacheMaxSize
+             * result: false
+             */
+            currentReadAheadCount = readAheadCacheBatchSize + 1;
+            currentReadAheadBytes = 1;
+            assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount, 
currentReadAheadBytes));
+
+            /**
+             * case3: currentReadAheadCount < readAheadCacheBatchSize
+             *        currentReadAheadBytes > readAheadCacheBatchBytesSize
+             *        currentReadAheadBytes < readCacheMaxSize
+             * result: false
+             */
+            currentReadAheadCount = 1;
+            currentReadAheadBytes = readAheadCacheBatchBytesSize + 1;
+            assertFalse(sdb.chargeReadAheadCache(currentReadAheadCount, 
currentReadAheadBytes));
+        } catch (Throwable e) {
+            LOGGER.error("readAheadCacheBatchSizeUnitTest run error", e);
+        } finally {
+            teardown(testDB.getStorage(), testDB.getTmpDir());
+        }
+    }
+
+    @Test
+    public void compareDiffReadAheadPerfTest() {
+        /**
+         * case1(read ahead cache by limit bytes size):
+         * config: readAheadCacheMaxSizeMb = 2 * 8;
+         *         readAheadCacheBatchSize = 1024;
+         *         readAheadCacheBatchBytesSize = 2 * 1024 * 1024;
+         * case content:
+         * LedgerId:0, read 1024 pieces of entry,each piece of entry is 10KB
+         * LedgerId:1, read 1024 pieces of entry,each piece of entry is 10KB
+         * LedgerId:2, read 1024 pieces of entry,each piece of entry is 10KB
+         * LedgerId:3, read 1024 pieces of entry,each piece of entry is 10KB
+         */
+        CacheResult cacheBatchBytesSizeResult = readAheadCacheBatchBytesSize();
+
+        /**
+         * case2(read ahead cache by limit count):
+         * config: readAheadCacheMaxSizeMb = 2 * 8;
+         *         readAheadCacheBatchSize = 1024;
+         * case content:
+         * LedgerId:0, read 1024 pieces of entry,each piece of entry is 10KB
+         * LedgerId:1, read 1024 pieces of entry,each piece of entry is 10KB
+         * LedgerId:2, read 1024 pieces of entry,each piece of entry is 10KB
+         * LedgerId:3, read 1024 pieces of entry,each piece of entry is 10KB
+         */
+        CacheResult cacheBatchSizeResult = readAheadCacheBatchSize();
+
+        /**
+         * result: case1(read ahead cache by limit bytes size) get less 
cachemiss,
+         * it is suitable for large messages, reduce the pollution of 
readAhead large messages to readCache
+         */
+        assertEquals(8, cacheBatchBytesSizeResult.getCacheMissCount());
+        assertEquals(132, cacheBatchSizeResult.getCacheMissCount());
+        assertTrue(cacheBatchBytesSizeResult.getCacheMissCount() < 
cacheBatchSizeResult.getCacheMissCount());
+        assertEquals(
+                cacheBatchBytesSizeResult.getCacheMissCount() + 
cacheBatchBytesSizeResult.getCacheHitCount(),
+                cacheBatchSizeResult.getCacheMissCount() + 
cacheBatchSizeResult.getCacheHitCount());
+    }
+
+    public void setup(TestDB testDB, long readAheadCacheMaxSizeMb,
+                      int readAheadCacheBatchSize, long 
readAheadCacheBatchBytesSize) throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = BookieImpl.getCurrentDirectory(tmpDir);
+        BookieImpl.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[]{tmpDir.toString()});
+        if (readAheadCacheMaxSizeMb > 0) {
+            conf.setProperty(READ_AHEAD_CACHE_MAX_SIZE_MB, 
readAheadCacheMaxSizeMb);
+        }
+        if (readAheadCacheBatchSize > 0) {
+            conf.setProperty(READ_AHEAD_CACHE_BATCH_SIZE, 
readAheadCacheBatchSize);
+        }
+        if (readAheadCacheBatchBytesSize > 0) {
+            conf.setProperty(READ_AHEAD_CACHE_BATCH_BYTES_SIZE, 
readAheadCacheBatchBytesSize);
+        }
+        TestStatsProvider.TestStatsLogger statsLogger = new 
TestStatsProvider().getStatsLogger("test");
+        BookieImpl bookie = new TestBookieImpl(new 
TestBookieImpl.ResourceBuilder(conf).build(statsLogger),
+                statsLogger);
+
+        DbLedgerStorage storage = (DbLedgerStorage) bookie.getLedgerStorage();
+
+        storage.getLedgerStorageList().forEach(singleDirectoryDbLedgerStorage 
-> {
+            assertTrue(singleDirectoryDbLedgerStorage.getEntryLogger() 
instanceof DefaultEntryLogger);
+        });
+        testDB.setStorage(storage);
+        testDB.setTmpDir(tmpDir);
+    }
+
+    public void teardown(DbLedgerStorage storage, File tmpDir) {
+        if (storage != null) {
+            try {
+                storage.shutdown();
+            } catch (InterruptedException e) {
+                LOGGER.error("storage.shutdown has error", e);
+            }
+        }
+        if (tmpDir != null) {
+            tmpDir.delete();
+        }
+    }
+
+    private void addEntries(DbLedgerStorage storage, long minLedgerId, long 
maxLedgerId,
+                            long minEntryId, long maxEntryId) throws Exception 
{
+        // Add entries
+        for (long lid = minLedgerId; lid < maxLedgerId; lid++) {
+            long lac = 0;
+            for (long eid = minEntryId; eid < maxEntryId; eid++) {
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writeLong(lid); // ledger id
+                entry.writeLong(eid); // entry id
+                entry.writeLong(lac); // lac
+                entry.writeBytes((get4KbMsg()).getBytes());
+                assertEquals(eid, storage.addEntry(entry));
+                lac++;
+            }
+        }
+    }
+
+    private String get4KbMsg() {
+        StringBuffer buffer = new StringBuffer();
+        for (int i = 0; i < 1024; i++) {
+            buffer.append("1234");
+        }
+        assertEquals(4 * 1024, buffer.toString().length());
+        return buffer.toString();
+    }
+
+    private CacheResult readAheadCacheBatchBytesSize() {
+        Long cacheMissCount;
+        TestDB testDB = new TestDB();
+        try {
+            long readAheadCacheMaxSizeMb = 2 * 8L;
+            int readAheadCacheBatchSize = 1024;
+            long readAheadCacheBatchBytesSize = 2 * 1024 * 1024;
+            long minEntryId = 0;
+            long maxEntryId = 1024;
+
+            setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize, 
readAheadCacheBatchBytesSize);
+            addEntries(testDB.getStorage(), 0, 4, minEntryId, maxEntryId);
+
+            testDB.getStorage().flush();
+            assertEquals(false, testDB.getStorage().isFlushRequired());
+            // Read from db
+            for (long eid = minEntryId; eid < maxEntryId / 2; eid++) {
+                testDB.getStorage().getEntry(0, eid);
+                testDB.getStorage().getEntry(1, eid);
+                testDB.getStorage().getEntry(2, eid);
+                testDB.getStorage().getEntry(3, eid);
+            }
+            List<SingleDirectoryDbLedgerStorage> ledgerStorageList = 
testDB.getStorage().getLedgerStorageList();
+            DbLedgerStorageStats ledgerStats = 
ledgerStorageList.get(0).getDbLedgerStorageStats();
+            cacheMissCount = ledgerStats.getReadCacheMissCounter().get();
+            Long cacheHitCount = ledgerStats.getReadCacheHitCounter().get();
+            LOGGER.info("simple1.cacheMissCount={},cacheHitCount={}", 
cacheMissCount, cacheHitCount);
+            return new CacheResult(cacheMissCount, cacheHitCount);
+        } catch (Throwable e) {
+            LOGGER.error("test case run error", e);
+            return new CacheResult(0, 0);
+        } finally {
+            teardown(testDB.getStorage(), testDB.getTmpDir());
+        }
+    }
+
+    public CacheResult readAheadCacheBatchSize() {
+        Long cacheMissCount;
+        TestDB testDB = new TestDB();
+        try {
+            long readAheadCacheMaxSizeMb = 2 * 8L;
+            int readAheadCacheBatchSize = 1024;
+            long readAheadCacheBatchBytesSize = -1;
+            long minEntryId = 0;
+            long maxEntryId = 1024;
+
+            setup(testDB, readAheadCacheMaxSizeMb, readAheadCacheBatchSize, 
readAheadCacheBatchBytesSize);
+            addEntries(testDB.getStorage(), 0, 4, minEntryId, maxEntryId);
+
+            testDB.getStorage().flush();
+            assertEquals(false, testDB.getStorage().isFlushRequired());
+            // Read from db
+            for (long eid = minEntryId; eid < maxEntryId / 2; eid++) {
+                testDB.getStorage().getEntry(0, eid);
+                testDB.getStorage().getEntry(1, eid);
+                testDB.getStorage().getEntry(2, eid);
+                testDB.getStorage().getEntry(3, eid);
+            }
+            List<SingleDirectoryDbLedgerStorage> ledgerStorageList = 
testDB.getStorage().getLedgerStorageList();
+            DbLedgerStorageStats ledgerStats = 
ledgerStorageList.get(0).getDbLedgerStorageStats();
+            cacheMissCount = ledgerStats.getReadCacheMissCounter().get();
+            Long cacheHitCount = ledgerStats.getReadCacheHitCounter().get();
+            LOGGER.info("simple2.cacheMissCount={},cacheHitCount={}", 
cacheMissCount, cacheHitCount);
+            return new CacheResult(cacheMissCount, cacheHitCount);
+        } catch (Throwable e) {
+            LOGGER.error("test case run error", e);
+            return new CacheResult(0, 0);
+        } finally {
+            teardown(testDB.getStorage(), testDB.getTmpDir());
+        }
+    }
+
+    private class TestDB {
+        private DbLedgerStorage storage;
+        private File tmpDir;
+
+        public DbLedgerStorage getStorage() {
+            return storage;
+        }
+
+        public void setStorage(DbLedgerStorage storage) {
+            this.storage = storage;
+        }
+
+        public File getTmpDir() {
+            return tmpDir;
+        }
+
+        public void setTmpDir(File tmpDir) {
+            this.tmpDir = tmpDir;
+        }
+    }
+
+    private class CacheResult {
+        private long cacheMissCount;
+        private long cacheHitCount;
+
+        private CacheResult(long cacheMissCount, long cacheHitCount) {
+            this.cacheMissCount = cacheMissCount;
+            this.cacheHitCount = cacheHitCount;
+        }
+
+        public long getCacheMissCount() {
+            return cacheMissCount;
+        }
+
+        public void setCacheMissCount(long cacheMissCount) {
+            this.cacheMissCount = cacheMissCount;
+        }
+
+        public long getCacheHitCount() {
+            return cacheHitCount;
+        }
+
+        public void setCacheHitCount(long cacheHitCount) {
+            this.cacheHitCount = cacheHitCount;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index a9bee08cb2..102f7f5add 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -56,11 +56,11 @@ public class DbLedgerStorageWriteCacheTest {
         protected SingleDirectoryDbLedgerStorage 
newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
             LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, 
LedgerDirsManager indexDirsManager,
             EntryLogger entryLogger, StatsLogger statsLogger,
-            long writeCacheSize, long readCacheSize, int 
readAheadCacheBatchSize)
+            long writeCacheSize, long readCacheSize, int 
readAheadCacheBatchSize, long readAheadCacheBatchBytesSize)
                 throws IOException {
             return new MockedSingleDirectoryDbLedgerStorage(conf, 
ledgerManager, ledgerDirsManager, indexDirsManager,
                 entryLogger, statsLogger, allocator, writeCacheSize,
-                readCacheSize, readAheadCacheBatchSize);
+                readCacheSize, readAheadCacheBatchSize, 
readAheadCacheBatchBytesSize);
         }
 
         private static class MockedSingleDirectoryDbLedgerStorage extends 
SingleDirectoryDbLedgerStorage {
@@ -68,9 +68,11 @@ public class DbLedgerStorageWriteCacheTest {
                     LedgerDirsManager ledgerDirsManager, LedgerDirsManager 
indexDirsManager, EntryLogger entryLogger,
                     StatsLogger statsLogger,
                     ByteBufAllocator allocator, long writeCacheSize,
-                    long readCacheSize, int readAheadCacheBatchSize) throws 
IOException {
+                    long readCacheSize, int readAheadCacheBatchSize, long 
readAheadCacheBatchBytesSize)
+                    throws IOException {
                 super(conf, ledgerManager, ledgerDirsManager, 
indexDirsManager, entryLogger,
-                      statsLogger, allocator, writeCacheSize, readCacheSize, 
readAheadCacheBatchSize);
+                      statsLogger, allocator, writeCacheSize, readCacheSize, 
readAheadCacheBatchSize,
+                      readAheadCacheBatchBytesSize);
             }
 
           @Override
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index f7dbf38032..c0a021418d 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -749,6 +749,9 @@ gcEntryLogMetadataCacheEnabled=false
 # By default it will be allocated to 25% of the available direct memory
 # dbStorage_readAheadCacheMaxSizeMb=
 
+# How many entries' bytes to pre-fill in cache after a read cache miss. 
Default is -1. 0 or less disables this feature
+# dbStorage_readAheadCacheBatchBytesSize=-1
+
 # How many entries to pre-fill in cache after a read cache miss
 # dbStorage_readAheadCacheBatchSize=100
 

Reply via email to