This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.14 by this push:
new e56d6d67ab revert PR 3653 and make delete entries batch size
configurable (#3768)
e56d6d67ab is described below
commit e56d6d67ab9be3f0bedb010d849e501723d72d54
Author: Hang Chen <[email protected]>
AuthorDate: Wed Feb 8 14:52:10 2023 +0800
revert PR 3653 and make delete entries batch size configurable (#3768)
### Motivations
This PR is to resolve the issue
https://github.com/apache/bookkeeper/issues/3734 in branch-4.14 by following
this suggestion.
https://github.com/apache/bookkeeper/issues/3734#issuecomment-1407338171
### Modifications
1. Revert https://github.com/apache/bookkeeper/pull/3653
2. Bring https://github.com/apache/bookkeeper/pull/3646 to branch-4.14 to
make delete entries batch size configurable.
---
.../bookie/storage/ldb/EntryLocationIndex.java | 76 ++++++++++++++++++----
.../bookkeeper/conf/ServerConfiguration.java | 21 ++++++
conf/bk_server.conf | 4 ++
3 files changed, 88 insertions(+), 13 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
index b009d2cbb3..f07f870b1a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -49,12 +49,14 @@ public class EntryLocationIndex implements Closeable {
private final KeyValueStorage locationsDb;
private final ConcurrentLongHashSet deletedLedgers = new
ConcurrentLongHashSet();
+ private final int deleteEntriesBatchSize;
private final EntryLocationIndexStats stats;
public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory
storageFactory, String basePath,
StatsLogger stats) throws IOException {
locationsDb = storageFactory.newKeyValueStorage(basePath, "locations",
DbConfigType.Huge, conf);
+ deleteEntriesBatchSize = conf.getRocksDBDeleteEntriesBatchSize();
this.stats = new EntryLocationIndexStats(
stats,
@@ -185,11 +187,10 @@ public class EntryLocationIndex implements Closeable {
deletedLedgers.add(ledgerId);
}
- private static final int DELETE_ENTRIES_BATCH_SIZE = 100000;
-
public void removeOffsetFromDeletedLedgers() throws IOException {
LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
+ LongPairWrapper keyToDelete = LongPairWrapper.get(-1, -1);
Set<Long> ledgersToDelete = deletedLedgers.items();
@@ -199,30 +200,79 @@ public class EntryLocationIndex implements Closeable {
log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
long startTime = System.nanoTime();
+ long deletedEntries = 0;
+ long deletedEntriesInBatch = 0;
+
+ Batch batch = locationsDb.newBatch();
- try (Batch batch = locationsDb.newBatch()) {
+ try {
for (long ledgerId : ledgersToDelete) {
if (log.isDebugEnabled()) {
log.debug("Deleting indexes from ledger {}", ledgerId);
}
-
firstKeyWrapper.set(ledgerId, 0);
lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);
- batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
- }
+ Entry<byte[], byte[]> firstKeyRes =
locationsDb.getCeil(firstKeyWrapper.array);
+ if (firstKeyRes == null ||
ArrayUtil.getLong(firstKeyRes.getKey(), 0) != ledgerId) {
+ // No entries found for ledger
+ if (log.isDebugEnabled()) {
+ log.debug("No entries found for ledger {}", ledgerId);
+ }
+ continue;
+ }
- batch.flush();
- for (long ledgerId : ledgersToDelete) {
- deletedLedgers.remove(ledgerId);
+ long firstEntryId = ArrayUtil.getLong(firstKeyRes.getKey(), 8);
+ long lastEntryId;
+ try {
+ lastEntryId = getLastEntryInLedgerInternal(ledgerId);
+ } catch (Bookie.NoEntryException nee) {
+ if (log.isDebugEnabled()) {
+ log.debug("No last entry id found for ledger {}",
ledgerId);
+ }
+ continue;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Deleting index for ledger {} entries ({} ->
{})",
+ ledgerId, firstEntryId, lastEntryId);
+ }
+
+ // Iterate over all the keys and remove each of them
+ for (long entryId = firstEntryId; entryId <= lastEntryId;
entryId++) {
+ keyToDelete.set(ledgerId, entryId);
+ if (log.isDebugEnabled()) {
+ log.debug("Deleting index for ({}, {})",
keyToDelete.getFirst(), keyToDelete.getSecond());
+ }
+ batch.remove(keyToDelete.array);
+ ++deletedEntriesInBatch;
+ ++deletedEntries;
+ }
+
+ if (deletedEntriesInBatch > deleteEntriesBatchSize) {
+ batch.flush();
+ batch.clear();
+ deletedEntriesInBatch = 0;
+ }
}
} finally {
- firstKeyWrapper.recycle();
- lastKeyWrapper.recycle();
+ try {
+ batch.flush();
+ batch.clear();
+ } finally {
+ firstKeyWrapper.recycle();
+ lastKeyWrapper.recycle();
+ keyToDelete.recycle();
+ batch.close();
+ }
}
- log.info("Deleted indexes from {} ledgers in {} seconds",
ledgersToDelete.size(),
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) /
1000.0);
+ log.info("Deleted indexes for {} entries from {} ledgers in {}
seconds", deletedEntries, ledgersToDelete.size(),
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) /
1000.0);
+
+ // Removed from pending set
+ for (long ledgerId : ledgersToDelete) {
+ deletedLedgers.remove(ledgerId);
+ }
}
private static final Logger log =
LoggerFactory.getLogger(EntryLocationIndex.class);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 05b199729d..367132bc47 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -307,6 +307,7 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
// Certificate role based authorization
protected static final String AUTHORIZED_ROLES = "authorizedRoles";
+ protected static final String ROCKSDB_DELETE_ENTRIES_BATCH_SIZE =
"rocksDBDeleteEntriesBatchSize";
/**
* Construct a default configuration object.
@@ -3614,4 +3615,24 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
public int getInFlightReadEntryNumInLedgerChecker(){
return getInt(IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER, -1);
}
+
+ /**
+ * Get entry log location index delete entries batch size from RocksDB.
+ *
+ * @return Int rocksDB delete entries batch size configured in Service
configuration.
+ */
+ public int getRocksDBDeleteEntriesBatchSize() {
+ return getInt(ROCKSDB_DELETE_ENTRIES_BATCH_SIZE, 100000);
+ }
+
+ /**
+ * Set entry log location index delete entries batch size from RocksDB.
+ *
+ * @param rocksDBDeleteEntriesBatchSize
+ * @return
+ */
+ public ServerConfiguration setRocksDBDeleteEntriesBatchSize(int
rocksDBDeleteEntriesBatchSize) {
+ this.setProperty(ROCKSDB_DELETE_ENTRIES_BATCH_SIZE,
rocksDBDeleteEntriesBatchSize);
+ return this;
+ }
}
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 2a22b7ef7f..1d275586fa 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -714,6 +714,10 @@ ledgerDirectories=/tmp/bk-data
# Default is to use 10% / numberOfLedgers of the direct memory size
# dbStorage_rocksDB_blockCacheSize=
+# entry log location index delete entries batch size from RocksDB.
+# Default is 100000
+# rocksDBDeleteEntriesBatchSize=100000
+
# Other RocksDB specific tunables
# dbStorage_rocksDB_writeBufferSizeMB=64
# dbStorage_rocksDB_sstSizeInMB=64