This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit a6a407487ae4bd64caf25c06e27cfbe7c64c3f5d Author: Matteo Merli <[email protected]> AuthorDate: Mon Sep 19 08:59:57 2022 -0700 Speed up the rebuildinding of RocksDB index (#3458) * Speed up the rebuildinding of RocksDB index * fix check style Co-authored-by: chenhang <[email protected]> (cherry picked from commit 7004d994938e914561d8343a96d9d75ce98b2837) --- .../bookie/storage/ldb/LocationsIndexRebuildOp.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 55c5c90c31..e15457ca8e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -32,7 +32,8 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.Set; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.bookie.BookieImpl; import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; @@ -55,6 +56,8 @@ public class LocationsIndexRebuildOp { this.conf = conf; } + private static final int BATCH_COMMIT_SIZE = 10_000; + public void initiate() throws IOException { LOG.info("Starting locations index rebuilding"); @@ -82,6 +85,8 @@ public class LocationsIndexRebuildOp { int totalEntryLogs = entryLogs.size(); int completedEntryLogs = 0; LOG.info("Scanning {} entry logs", totalEntryLogs); + AtomicReference<KeyValueStorage.Batch> batch = new AtomicReference<>(newIndex.newBatch()); + AtomicInteger count = new AtomicInteger(); for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @@ -100,7 +105,15 @@ public class LocationsIndexRebuildOp { // Update the ledger index page LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId); LongWrapper value = LongWrapper.get(location); - newIndex.put(key.array, value.array); + batch.get().put(key.array, value.array); + + if (count.incrementAndGet() > BATCH_COMMIT_SIZE) { + batch.get().flush(); + batch.get().close(); + + batch.set(newIndex.newBatch()); + count.set(0); + } } @Override @@ -112,6 +125,8 @@ public class LocationsIndexRebuildOp { ++completedEntryLogs; LOG.info("Completed scanning of log {}.log -- {} / {}", Long.toHexString(entryLogId), completedEntryLogs, totalEntryLogs); + batch.get().flush(); + batch.get().close(); } newIndex.sync();
