This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new dca471d Avoid usage of RocksDB deleteRange() in DbLedgerStorage
dca471d is described below
commit dca471dfddbdbdea3dac029ddd76958340fa6629
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Aug 23 10:19:02 2018 -0700
Avoid usage of RocksDB deleteRange() in DbLedgerStorage
### Motivation
There are few issues that are reconducible to a performance degradation in
RocksDB when using deleteRange() feature (eg:
https://github.com/apache/incubator-pulsar/issues/1737 and others).
There is some discussion going on RocksDB to address this issue:
https://github.com/facebook/rocksdb/issues/3959
In the meantime, we should rollback the change and don't use deleteRange
until these issues are resolved.
### Changes
This PR is essentially reverting back the commit
https://github.com/yahoo/bookkeeper/commit/4b849904bcd65b49cf963e6508dc7fb745f56294
from Yahoo branch (which was squashed when merging back to apache).
The only addition here is to use `DELETE_ENTRIES_BATCH_SIZE` to amortize
the cost of `batch.flush()` when there are many ledgers with few entries.
Author: Matteo Merli <[email protected]>
Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli
<[email protected]>, Sijie Guo <[email protected]>
This closes #1620 from merlimat/rollback-delete-range
---
.../bookie/storage/ldb/EntryLocationIndex.java | 65 ++++++++++++++++++----
1 file changed, 55 insertions(+), 10 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
index 53e37a2..21b87e2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.EntryLocation;
@@ -188,6 +189,8 @@ public class EntryLocationIndex implements Closeable {
deletedLedgers.add(ledgerId);
}
+ private static final int DELETE_ENTRIES_BATCH_SIZE = 100000;
+
public void removeOffsetFromDeletedLedgers() throws IOException {
LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
@@ -200,6 +203,10 @@ public class EntryLocationIndex implements Closeable {
}
log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
+ long startTime = System.nanoTime();
+ long deletedEntries = 0;
+ long deletedEntriesInBatch = 0;
+
Batch batch = locationsDb.newBatch();
try {
@@ -211,20 +218,58 @@ public class EntryLocationIndex implements Closeable {
firstKeyWrapper.set(ledgerId, 0);
lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);
- batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
- }
+ Entry<byte[], byte[]> firstKeyRes =
locationsDb.getCeil(firstKeyWrapper.array);
+ if (firstKeyRes == null ||
ArrayUtil.getLong(firstKeyRes.getKey(), 0) != ledgerId) {
+ // No entries found for ledger
+ if (log.isDebugEnabled()) {
+ log.debug("No entries found for ledger {}", ledgerId);
+ }
+ continue;
+ }
- batch.flush();
+ long firstEntryId = ArrayUtil.getLong(firstKeyRes.getKey(), 8);
+ long lastEntryId = getLastEntryInLedgerInternal(ledgerId);
+ if (log.isDebugEnabled()) {
+ log.debug("Deleting index for ledger {} entries ({} ->
{})",
+ ledgerId, firstEntryId, lastEntryId);
+ }
- // Removed from pending set
- for (long ledgerId : ledgersToDelete) {
- deletedLedgers.remove(ledgerId);
+ // Iterate over all the keys and remove each of them
+ for (long entryId = firstEntryId; entryId <= lastEntryId;
entryId++) {
+ keyToDelete.set(ledgerId, entryId);
+ if (log.isDebugEnabled()) {
+ log.debug("Deleting index for ({}, {})",
keyToDelete.getFirst(), keyToDelete.getSecond());
+ }
+ batch.remove(keyToDelete.array);
+ ++deletedEntriesInBatch;
+ ++deletedEntries;
+ }
+
+ if (deletedEntriesInBatch > DELETE_ENTRIES_BATCH_SIZE) {
+ batch.flush();
+ batch.clear();
+ deletedEntriesInBatch = 0;
+ }
}
} finally {
- firstKeyWrapper.recycle();
- lastKeyWrapper.recycle();
- keyToDelete.recycle();
- batch.close();
+ try {
+ batch.flush();
+ batch.clear();
+ } finally {
+
+ firstKeyWrapper.recycle();
+ lastKeyWrapper.recycle();
+ keyToDelete.recycle();
+ batch.close();
+ }
+ }
+
+ log.info("Deleted indexes for {} entries from {} ledgers in {}
seconds", deletedEntries, ledgersToDelete.size(),
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) /
1000.0);
+
+ // Removed from pending set
+ for (long ledgerId : ledgersToDelete) {
+ deletedLedgers.remove(ledgerId);
}
}