This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.14 by this push:
     new e56d6d67ab revert PR 3653 and make delete entries batch size 
configurable (#3768)
e56d6d67ab is described below

commit e56d6d67ab9be3f0bedb010d849e501723d72d54
Author: Hang Chen <[email protected]>
AuthorDate: Wed Feb 8 14:52:10 2023 +0800

    revert PR 3653 and make delete entries batch size configurable (#3768)
    
    ### Motivations
    This PR is to resolve the issue 
https://github.com/apache/bookkeeper/issues/3734 in branch-4.14 by following 
this suggestion. 
https://github.com/apache/bookkeeper/issues/3734#issuecomment-1407338171
    
    ### Modifications
    1. Revert https://github.com/apache/bookkeeper/pull/3653
    2. Bring https://github.com/apache/bookkeeper/pull/3646 to branch-4.14 to 
make delete entries batch size configurable.
---
 .../bookie/storage/ldb/EntryLocationIndex.java     | 76 ++++++++++++++++++----
 .../bookkeeper/conf/ServerConfiguration.java       | 21 ++++++
 conf/bk_server.conf                                |  4 ++
 3 files changed, 88 insertions(+), 13 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
index b009d2cbb3..f07f870b1a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -49,12 +49,14 @@ public class EntryLocationIndex implements Closeable {
 
     private final KeyValueStorage locationsDb;
     private final ConcurrentLongHashSet deletedLedgers = new 
ConcurrentLongHashSet();
+    private final int deleteEntriesBatchSize;
 
     private final EntryLocationIndexStats stats;
 
     public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory 
storageFactory, String basePath,
             StatsLogger stats) throws IOException {
         locationsDb = storageFactory.newKeyValueStorage(basePath, "locations", 
DbConfigType.Huge, conf);
+        deleteEntriesBatchSize = conf.getRocksDBDeleteEntriesBatchSize();
 
         this.stats = new EntryLocationIndexStats(
             stats,
@@ -185,11 +187,10 @@ public class EntryLocationIndex implements Closeable {
         deletedLedgers.add(ledgerId);
     }
 
-    private static final int DELETE_ENTRIES_BATCH_SIZE = 100000;
-
     public void removeOffsetFromDeletedLedgers() throws IOException {
         LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
         LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
+        LongPairWrapper keyToDelete = LongPairWrapper.get(-1, -1);
 
         Set<Long> ledgersToDelete = deletedLedgers.items();
 
@@ -199,30 +200,79 @@ public class EntryLocationIndex implements Closeable {
 
         log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
         long startTime = System.nanoTime();
+        long deletedEntries = 0;
+        long deletedEntriesInBatch = 0;
+
+        Batch batch = locationsDb.newBatch();
 
-        try (Batch batch = locationsDb.newBatch()) {
+        try {
             for (long ledgerId : ledgersToDelete) {
                 if (log.isDebugEnabled()) {
                     log.debug("Deleting indexes from ledger {}", ledgerId);
                 }
-
                 firstKeyWrapper.set(ledgerId, 0);
                 lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);
 
-                batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
-            }
+                Entry<byte[], byte[]> firstKeyRes = 
locationsDb.getCeil(firstKeyWrapper.array);
+                if (firstKeyRes == null || 
ArrayUtil.getLong(firstKeyRes.getKey(), 0) != ledgerId) {
+                    // No entries found for ledger
+                    if (log.isDebugEnabled()) {
+                        log.debug("No entries found for ledger {}", ledgerId);
+                    }
+                    continue;
+                }
 
-            batch.flush();
-            for (long ledgerId : ledgersToDelete) {
-                deletedLedgers.remove(ledgerId);
+                long firstEntryId = ArrayUtil.getLong(firstKeyRes.getKey(), 8);
+                long lastEntryId;
+                try {
+                    lastEntryId = getLastEntryInLedgerInternal(ledgerId);
+                } catch (Bookie.NoEntryException nee) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("No last entry id found for ledger {}", 
ledgerId);
+                    }
+                    continue;
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug("Deleting index for ledger {} entries ({} -> 
{})",
+                        ledgerId, firstEntryId, lastEntryId);
+                }
+
+                // Iterate over all the keys and remove each of them
+                for (long entryId = firstEntryId; entryId <= lastEntryId; 
entryId++) {
+                    keyToDelete.set(ledgerId, entryId);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Deleting index for ({}, {})", 
keyToDelete.getFirst(), keyToDelete.getSecond());
+                    }
+                    batch.remove(keyToDelete.array);
+                    ++deletedEntriesInBatch;
+                    ++deletedEntries;
+                }
+
+                if (deletedEntriesInBatch > deleteEntriesBatchSize) {
+                    batch.flush();
+                    batch.clear();
+                    deletedEntriesInBatch = 0;
+                }
             }
         } finally {
-            firstKeyWrapper.recycle();
-            lastKeyWrapper.recycle();
+            try {
+                batch.flush();
+                batch.clear();
+            } finally {
+                firstKeyWrapper.recycle();
+                lastKeyWrapper.recycle();
+                keyToDelete.recycle();
+                batch.close();
+            }
         }
 
-        log.info("Deleted indexes from {} ledgers in {} seconds", 
ledgersToDelete.size(),
-                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) / 
1000.0);
+        log.info("Deleted indexes for {} entries from {} ledgers in {} 
seconds", deletedEntries, ledgersToDelete.size(),
+            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) / 
1000.0);
+
+        // Removed from pending set
+        for (long ledgerId : ledgersToDelete) {
+            deletedLedgers.remove(ledgerId);
+        }
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(EntryLocationIndex.class);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 05b199729d..367132bc47 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -307,6 +307,7 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
 
     // Certificate role based authorization
     protected static final String AUTHORIZED_ROLES = "authorizedRoles";
+    protected static final String ROCKSDB_DELETE_ENTRIES_BATCH_SIZE = 
"rocksDBDeleteEntriesBatchSize";
 
     /**
      * Construct a default configuration object.
@@ -3614,4 +3615,24 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     public int getInFlightReadEntryNumInLedgerChecker(){
         return getInt(IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER, -1);
     }
+
+    /**
+     * Get entry log location index delete entries batch size from RocksDB.
+     *
+     * @return Int rocksDB delete entries batch size configured in Service 
configuration.
+     */
+    public int getRocksDBDeleteEntriesBatchSize() {
+        return getInt(ROCKSDB_DELETE_ENTRIES_BATCH_SIZE, 100000);
+    }
+
+    /**
+     * Set entry log location index delete entries batch size from RocksDB.
+     *
+     * @param rocksDBDeleteEntriesBatchSize
+     * @return
+     */
+    public ServerConfiguration setRocksDBDeleteEntriesBatchSize(int 
rocksDBDeleteEntriesBatchSize) {
+        this.setProperty(ROCKSDB_DELETE_ENTRIES_BATCH_SIZE, 
rocksDBDeleteEntriesBatchSize);
+        return this;
+    }
 }
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 2a22b7ef7f..1d275586fa 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -714,6 +714,10 @@ ledgerDirectories=/tmp/bk-data
 # Default is to use 10% / numberOfLedgers of the direct memory size
 # dbStorage_rocksDB_blockCacheSize=
 
+# entry log location index delete entries batch size from RocksDB.
+# Default is 100000
+# rocksDBDeleteEntriesBatchSize=100000
+
 # Other RocksDB specific tunables
 # dbStorage_rocksDB_writeBufferSizeMB=64
 # dbStorage_rocksDB_sstSizeInMB=64

Reply via email to