This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7004d99493 Speed up the rebuildinding of RocksDB index (#3458)
7004d99493 is described below
commit 7004d994938e914561d8343a96d9d75ce98b2837
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Sep 19 08:59:57 2022 -0700
Speed up the rebuildinding of RocksDB index (#3458)
* Speed up the rebuildinding of RocksDB index
* fix check style
Co-authored-by: chenhang <[email protected]>
---
.../bookie/storage/ldb/LocationsIndexRebuildOp.java | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
index 4edbc2c3e9..37de152fb6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -31,6 +31,8 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
@@ -53,6 +55,8 @@ public class LocationsIndexRebuildOp {
this.conf = conf;
}
+ private static final int BATCH_COMMIT_SIZE = 10_000;
+
public void initiate() throws IOException {
LOG.info("Starting locations index rebuilding");
File[] indexDirs = conf.getIndexDirs();
@@ -90,6 +94,8 @@ public class LocationsIndexRebuildOp {
int totalEntryLogs = entryLogs.size();
int completedEntryLogs = 0;
LOG.info("Scanning {} entry logs", totalEntryLogs);
+ AtomicReference<KeyValueStorage.Batch> batch = new
AtomicReference<>(newIndex.newBatch());
+ AtomicInteger count = new AtomicInteger();
for (long entryLogId : entryLogs) {
entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
@@ -108,7 +114,15 @@ public class LocationsIndexRebuildOp {
// Update the ledger index page
LongPairWrapper key = LongPairWrapper.get(ledgerId,
entryId);
LongWrapper value = LongWrapper.get(location);
- newIndex.put(key.array, value.array);
+ batch.get().put(key.array, value.array);
+
+ if (count.incrementAndGet() > BATCH_COMMIT_SIZE) {
+ batch.get().flush();
+ batch.get().close();
+
+ batch.set(newIndex.newBatch());
+ count.set(0);
+ }
}
@Override
@@ -122,6 +136,9 @@ public class LocationsIndexRebuildOp {
completedEntryLogs, totalEntryLogs);
}
+ batch.get().flush();
+ batch.get().close();
+
newIndex.sync();
newIndex.close();
}