This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 063cc8b7cb0bb19fd69238a029868a54972010b1 Author: Andrey Yegorov <[email protected]> AuthorDate: Fri Jul 22 16:51:01 2022 -0700 [Issue 3389] Prioritize compaction of entry logs with the lowest amount of remaining usable data (#3390) Descriptions of the changes in this PR: ### Motivation Prioritize compaction to free up more space faster. ### Changes doCompactEntryLogs() iterates over entry logs in whatever natural order they happen to be, picks the first with usage below thresholds and starts compacting. Added a Priority Queue of entry logs to pick ones with the most compactable space first; it also helps when the time for compaction is limited (via majorCompactionMaxTimeMillis / minorCompactionMaxTimeMillis), instead of spending time on rewriting files with more data we'll pick the files with the least amount of data first. Master Issue: #3389 (cherry picked from commit 1825677b1ebacef113423b4afc463a0dcdc8988e) --- .../bookkeeper/bookie/EntryLogMetadataMap.java | 8 +++ .../bookkeeper/bookie/GarbageCollectorThread.java | 64 ++++++++++++++++++---- .../bookie/InMemoryEntryLogMetadataMap.java | 6 ++ .../storage/ldb/PersistentEntryLogMetadataMap.java | 61 ++++++++++++++++----- .../ldb/PersistentEntryLogMetadataMapTest.java | 14 +++++ 5 files changed, 129 insertions(+), 24 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java index 88f6ce5398..afd5c7f4d0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java @@ -59,6 +59,14 @@ public interface EntryLogMetadataMap extends Closeable { */ void forEach(BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException; + /** + * Performs the given action for the key. + * + * @param action + * @throws EntryLogMetadataMapException + */ + void forKey(long entryLogId, BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException; + /** * Removes entryLogMetadata record from the map. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 6e3d00c3d4..c88fa0bf4d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -28,6 +28,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -545,6 +547,11 @@ public class GarbageCollectorThread extends SafeRunnable { int[] entryLogUsageBuckets = new int[numBuckets]; int[] compactedBuckets = new int[numBuckets]; + ArrayList<LinkedList<Long>> compactableBuckets = new ArrayList<>(numBuckets); + for (int i = 0; i < numBuckets; i++) { + compactableBuckets.add(new LinkedList<>()); + } + long start = System.currentTimeMillis(); MutableLong end = new MutableLong(start); MutableLong timeDiff = new MutableLong(0); @@ -559,25 +566,62 @@ public class GarbageCollectorThread extends SafeRunnable { } if (meta.getUsage() >= threshold || (maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis) || !running) { - // We allow the usage limit calculation to continue so that we get a accurate + // We allow the usage limit calculation to continue so that we get an accurate // report of where the usage was prior to running compaction. return; } - if (LOG.isDebugEnabled()) { - LOG.debug("Compacting entry log {} with usage {} below threshold {}", - meta.getEntryLogId(), meta.getUsage(), threshold); - } - long priorRemainingSize = meta.getRemainingSize(); - compactEntryLog(meta); - gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize); - compactedBuckets[bucketIndex]++; + compactableBuckets.get(bucketIndex).add(meta.getEntryLogId()); }); + + LOG.info( + "Compaction: entry log usage buckets before compaction [10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}", + entryLogUsageBuckets); + + final int maxBucket = calculateUsageIndex(numBuckets, threshold); + stopCompaction: + for (int currBucket = 0; currBucket <= maxBucket; currBucket++) { + LinkedList<Long> entryLogIds = compactableBuckets.get(currBucket); + while (!entryLogIds.isEmpty()) { + if (timeDiff.getValue() < maxTimeMillis) { + end.setValue(System.currentTimeMillis()); + timeDiff.setValue(end.getValue() - start); + } + + if ((maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis) || !running) { + // We allow the usage limit calculation to continue so that we get an accurate + // report of where the usage was prior to running compaction. + break stopCompaction; + } + + final int bucketIndex = currBucket; + final long logId = entryLogIds.remove(); + + entryLogMetaMap.forKey(logId, (entryLogId, meta) -> { + if (meta == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Metadata for entry log {} already deleted", logId); + } + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Compacting entry log {} with usage {} below threshold {}", + meta.getEntryLogId(), meta.getUsage(), threshold); + } + + long priorRemainingSize = meta.getRemainingSize(); + compactEntryLog(meta); + gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize); + compactedBuckets[bucketIndex]++; + }); + } + } + if (LOG.isDebugEnabled()) { if (!running) { LOG.debug("Compaction exited due to gc not running"); } - if (timeDiff.getValue() > maxTimeMillis) { + if (maxTimeMillis > 0 && timeDiff.getValue() > maxTimeMillis) { LOG.debug("Compaction ran for {}ms but was limited by {}ms", timeDiff, maxTimeMillis); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java index 106a382f54..3fec798a0d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java @@ -47,6 +47,12 @@ public class InMemoryEntryLogMetadataMap implements EntryLogMetadataMap { entryLogMetaMap.forEach(action); } + @Override + public void forKey(long entryLogId, BiConsumer<Long, EntryLogMetadata> action) + throws BookieException.EntryLogMetadataMapException { + action.accept(entryLogId, entryLogMetaMap.get(entryLogId)); + } + @Override public void remove(long entryLogId) { entryLogMetaMap.remove(entryLogId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java index ec289fd16c..f3f99be571 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java @@ -139,20 +139,7 @@ public class PersistentEntryLogMetadataMap implements EntryLogMetadataMap { } Entry<byte[], byte[]> entry = iterator.next(); long entryLogId = ArrayUtil.getLong(entry.getKey(), 0); - ByteArrayInputStream localBais = bais.get(); - DataInputStream localDatais = datais.get(); - if (localBais.available() < entry.getValue().length) { - localBais.close(); - localDatais.close(); - ByteArrayInputStream newBais = new ByteArrayInputStream(entry.getValue()); - bais.set(newBais); - datais.set(new DataInputStream(newBais)); - } else { - localBais.read(entry.getValue(), 0, entry.getValue().length); - } - localBais.reset(); - localDatais.reset(); - EntryLogMetadataRecyclable metadata = EntryLogMetadata.deserialize(datais.get()); + EntryLogMetadataRecyclable metadata = getEntryLogMetadataRecyclable(entry.getValue()); try { action.accept(entryLogId, metadata); } finally { @@ -171,6 +158,52 @@ public class PersistentEntryLogMetadataMap implements EntryLogMetadataMap { } } + /** + * {@link EntryLogMetadata} life-cycle in supplied action will be transient + * and it will be recycled as soon as supplied action is completed. + */ + @Override + public void forKey(long entryLogId, BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException { + throwIfClosed(); + LongWrapper key = LongWrapper.get(entryLogId); + try { + byte[] value = metadataMapDB.get(key.array); + if (value == null || value.length == 0) { + action.accept(entryLogId, null); + return; + } + EntryLogMetadataRecyclable metadata = getEntryLogMetadataRecyclable(value); + try { + action.accept(entryLogId, metadata); + } finally { + metadata.recycle(); + } + } catch (IOException e) { + log.error("Failed to get metadata for entryLogId {}: {}", entryLogId, e.getMessage(), e); + throw new EntryLogMetadataMapException(e); + } finally { + key.recycle(); + } + } + + private EntryLogMetadataRecyclable getEntryLogMetadataRecyclable(byte[] value) throws IOException { + ByteArrayInputStream localBais = bais.get(); + DataInputStream localDatais = datais.get(); + if (localBais.available() < value.length) { + localBais.close(); + localDatais.close(); + ByteArrayInputStream newBais = new ByteArrayInputStream(value); + bais.set(newBais); + datais.set(new DataInputStream(newBais)); + } else { + localBais.read(value, 0, value.length); + } + localBais.reset(); + localDatais.reset(); + EntryLogMetadataRecyclable metadata = EntryLogMetadata.deserialize(datais.get()); + return metadata; + } + @Override public void remove(long entryLogId) throws EntryLogMetadataMapException { throwIfClosed(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java index 243fe692ec..205c347add 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import java.io.File; import java.util.List; +import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.EntryLogMetadata; import org.apache.bookkeeper.conf.ServerConfiguration; import org.junit.Rule; @@ -80,6 +81,19 @@ public class PersistentEntryLogMetadataMapTest { } }); + metadatas.forEach(meta -> { + long logId = meta.getEntryLogId(); + try { + entryMetadataMap.forKey(logId, (entryLogId, persistedMeta) -> { + assertEquals(meta.getEntryLogId(), persistedMeta.getEntryLogId()); + assertEquals(meta.getTotalSize(), persistedMeta.getTotalSize()); + assertEquals(logId, (long) entryLogId); + }); + } catch (BookieException.EntryLogMetadataMapException e) { + throw new RuntimeException(e); + } + }); + // remove entry-log entry for (int i = 1; i <= totalMetadata; i++) { entryMetadataMap.remove(i);
