This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7004d99493 Speed up the rebuildinding of RocksDB index (#3458)
7004d99493 is described below

commit 7004d994938e914561d8343a96d9d75ce98b2837
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Sep 19 08:59:57 2022 -0700

    Speed up the rebuildinding of RocksDB index (#3458)
    
    * Speed up the rebuildinding of RocksDB index
    
    * fix check style
    
    Co-authored-by: chenhang <[email protected]>
---
 .../bookie/storage/ldb/LocationsIndexRebuildOp.java   | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
index 4edbc2c3e9..37de152fb6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -31,6 +31,8 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.bookie.DefaultEntryLogger;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
@@ -53,6 +55,8 @@ public class LocationsIndexRebuildOp {
         this.conf = conf;
     }
 
+    private static final int BATCH_COMMIT_SIZE = 10_000;
+
     public void initiate() throws IOException {
         LOG.info("Starting locations index rebuilding");
         File[] indexDirs = conf.getIndexDirs();
@@ -90,6 +94,8 @@ public class LocationsIndexRebuildOp {
             int totalEntryLogs = entryLogs.size();
             int completedEntryLogs = 0;
             LOG.info("Scanning {} entry logs", totalEntryLogs);
+            AtomicReference<KeyValueStorage.Batch> batch = new 
AtomicReference<>(newIndex.newBatch());
+            AtomicInteger count = new AtomicInteger();
 
             for (long entryLogId : entryLogs) {
                 entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
@@ -108,7 +114,15 @@ public class LocationsIndexRebuildOp {
                         // Update the ledger index page
                         LongPairWrapper key = LongPairWrapper.get(ledgerId, 
entryId);
                         LongWrapper value = LongWrapper.get(location);
-                        newIndex.put(key.array, value.array);
+                        batch.get().put(key.array, value.array);
+
+                        if (count.incrementAndGet() > BATCH_COMMIT_SIZE) {
+                            batch.get().flush();
+                            batch.get().close();
+
+                            batch.set(newIndex.newBatch());
+                            count.set(0);
+                        }
                     }
 
                     @Override
@@ -122,6 +136,9 @@ public class LocationsIndexRebuildOp {
                         completedEntryLogs, totalEntryLogs);
             }
 
+            batch.get().flush();
+            batch.get().close();
+
             newIndex.sync();
             newIndex.close();
         }

Reply via email to