This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit a6a407487ae4bd64caf25c06e27cfbe7c64c3f5d
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Sep 19 08:59:57 2022 -0700

    Speed up the rebuildinding of RocksDB index (#3458)
    
    * Speed up the rebuildinding of RocksDB index
    
    * fix check style
    
    Co-authored-by: chenhang <[email protected]>
    (cherry picked from commit 7004d994938e914561d8343a96d9d75ce98b2837)
---
 .../bookie/storage/ldb/LocationsIndexRebuildOp.java   | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
index 55c5c90c31..e15457ca8e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -32,7 +32,8 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
@@ -55,6 +56,8 @@ public class LocationsIndexRebuildOp {
         this.conf = conf;
     }
 
+    private static final int BATCH_COMMIT_SIZE = 10_000;
+
     public void initiate() throws IOException {
         LOG.info("Starting locations index rebuilding");
 
@@ -82,6 +85,8 @@ public class LocationsIndexRebuildOp {
         int totalEntryLogs = entryLogs.size();
         int completedEntryLogs = 0;
         LOG.info("Scanning {} entry logs", totalEntryLogs);
+        AtomicReference<KeyValueStorage.Batch> batch = new 
AtomicReference<>(newIndex.newBatch());
+        AtomicInteger count = new AtomicInteger();
 
         for (long entryLogId : entryLogs) {
             entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
@@ -100,7 +105,15 @@ public class LocationsIndexRebuildOp {
                     // Update the ledger index page
                     LongPairWrapper key = LongPairWrapper.get(ledgerId, 
entryId);
                     LongWrapper value = LongWrapper.get(location);
-                    newIndex.put(key.array, value.array);
+                    batch.get().put(key.array, value.array);
+
+                    if (count.incrementAndGet() > BATCH_COMMIT_SIZE) {
+                        batch.get().flush();
+                        batch.get().close();
+
+                        batch.set(newIndex.newBatch());
+                        count.set(0);
+                    }
                 }
 
                 @Override
@@ -112,6 +125,8 @@ public class LocationsIndexRebuildOp {
             ++completedEntryLogs;
             LOG.info("Completed scanning of log {}.log -- {} / {}", 
Long.toHexString(entryLogId), completedEntryLogs,
                     totalEntryLogs);
+            batch.get().flush();
+            batch.get().close();
         }
 
         newIndex.sync();

Reply via email to