This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 2fad33bfcf Add small files check in garbage collection (#3631)
2fad33bfcf is described below
commit 2fad33bfcf24a72f7fdf103969ed4b0aa26778a2
Author: Hang Chen <[email protected]>
AuthorDate: Thu Mar 16 11:28:48 2023 +0800
Add small files check in garbage collection (#3631)
### Motivation
When we use `TransactionalEntryLogCompactor` to compact the entry log
files, it will generate a lot of small entry log files, and for those files,
the file usage is usually greater than 90%, which can not be compacted unless
the file usage decreased.

### Changes
We introduce the entry log file size check during compaction, and the
checker is controlled by `gcEntryLogSizeRatio`.
If the total entry log file size is less than `gcEntryLogSizeRatio *
logSizeLimit`, the entry log file will be compacted even though the file usage
is greater than 90%. This feature is disabled by default and the
`gcEntryLogSizeRatio` default value is `0.0`
---
.../bookkeeper/bookie/GarbageCollectorThread.java | 11 +-
.../bookkeeper/conf/ServerConfiguration.java | 10 ++
.../bookie/GarbageCollectorThreadTest.java | 135 +++++++++++++++++++++
conf/bk_server.conf | 9 ++
4 files changed, 162 insertions(+), 3 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index d17450426e..1420c5ca6f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -561,15 +561,20 @@ public class GarbageCollectorThread implements Runnable {
MutableLong timeDiff = new MutableLong(0);
entryLogMetaMap.forEach((entryLogId, meta) -> {
- int bucketIndex = calculateUsageIndex(numBuckets, meta.getUsage());
+ double usage = meta.getUsage();
+ if (conf.isUseTargetEntryLogSizeForGc() && usage < 1.0d) {
+ usage = (double) meta.getRemainingSize() /
Math.max(meta.getTotalSize(), conf.getEntryLogSizeLimit());
+ }
+ int bucketIndex = calculateUsageIndex(numBuckets, usage);
entryLogUsageBuckets[bucketIndex]++;
if (timeDiff.getValue() < maxTimeMillis) {
end.setValue(System.currentTimeMillis());
timeDiff.setValue(end.getValue() - start);
}
- if (meta.getUsage() >= threshold || (maxTimeMillis > 0 &&
timeDiff.getValue() >= maxTimeMillis)
- || !running) {
+ if ((usage >= threshold
+ || (maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis)
+ || !running)) {
// We allow the usage limit calculation to continue so that we
get an accurate
// report of where the usage was prior to running compaction.
return;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 3fb8d18ef9..4db536ab49 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -116,6 +116,7 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
protected static final String VERIFY_METADATA_ON_GC = "verifyMetadataOnGC";
protected static final String GC_ENTRYLOGMETADATA_CACHE_ENABLED =
"gcEntryLogMetadataCacheEnabled";
protected static final String GC_ENTRYLOG_METADATA_CACHE_PATH =
"gcEntryLogMetadataCachePath";
+ protected static final String USE_TARGET_ENTRYLOG_SIZE_FOR_GC =
"useTargetEntryLogSizeForGc";
// Scrub Parameters
protected static final String LOCAL_SCRUB_PERIOD = "localScrubInterval";
protected static final String LOCAL_SCRUB_RATE_LIMIT =
"localScrubRateLimit";
@@ -554,6 +555,15 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
return this;
}
+ public boolean isUseTargetEntryLogSizeForGc() {
+ return getBoolean(USE_TARGET_ENTRYLOG_SIZE_FOR_GC, false);
+ }
+
+ public ServerConfiguration setUseTargetEntryLogSizeForGc(boolean
useTargetEntryLogSizeForGc) {
+ this.setProperty(USE_TARGET_ENTRYLOG_SIZE_FOR_GC,
useTargetEntryLogSizeForGc);
+ return this;
+ }
+
/**
* Get whether local scrub is enabled.
*
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
index 31828423a4..ebe07ce230 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
@@ -30,6 +30,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
@@ -238,4 +239,138 @@ public class GarbageCollectorThreadTest {
assertTrue(entryLogMetaMap.isEmpty());
assertFalse(entryLogger.logExists(logId3));
}
+
+ @Test
+ public void testCompactionWithFileSizeCheck() throws Exception {
+ File ledgerDir = tmpDirs.createNew("testFileSize", "ledgers");
+ EntryLogger entryLogger = newLegacyEntryLogger(20000, ledgerDir);
+
+ MockLedgerStorage storage = new MockLedgerStorage();
+ MockLedgerManager lm = new MockLedgerManager();
+
+ GarbageCollectorThread gcThread = new GarbageCollectorThread(
+
TestBKConfiguration.newServerConfiguration().setUseTargetEntryLogSizeForGc(true),
lm,
+ newDirsManager(ledgerDir),
+ storage, entryLogger, NullStatsLogger.INSTANCE);
+
+ // Add entries.
+ // Ledger 1 is on first entry log
+ // Ledger 2 spans first, second and third entry log
+ // Ledger 3 is on the third entry log (which is still active when
extract meta)
+ long loc1 = entryLogger.addEntry(1L, makeEntry(1L, 1L, 5000));
+ long loc2 = entryLogger.addEntry(2L, makeEntry(2L, 1L, 5000));
+ assertThat(logIdFromLocation(loc2), equalTo(logIdFromLocation(loc1)));
+ long loc3 = entryLogger.addEntry(2L, makeEntry(2L, 2L, 15000));
+ assertThat(logIdFromLocation(loc3),
greaterThan(logIdFromLocation(loc2)));
+ long loc4 = entryLogger.addEntry(2L, makeEntry(2L, 3L, 15000));
+ assertThat(logIdFromLocation(loc4),
greaterThan(logIdFromLocation(loc3)));
+ long loc5 = entryLogger.addEntry(3L, makeEntry(3L, 1L, 1000));
+ assertThat(logIdFromLocation(loc5), equalTo(logIdFromLocation(loc4)));
+ long loc6 = entryLogger.addEntry(3L, makeEntry(3L, 2L, 5000));
+
+ long logId1 = logIdFromLocation(loc2);
+ long logId2 = logIdFromLocation(loc3);
+ long logId3 = logIdFromLocation(loc5);
+ long logId4 = logIdFromLocation(loc6);
+ entryLogger.flush();
+
+ storage.setMasterKey(1L, new byte[0]);
+ storage.setMasterKey(2L, new byte[0]);
+ storage.setMasterKey(3L, new byte[0]);
+
+ assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1,
logId2, logId3));
+ assertTrue(entryLogger.logExists(logId1));
+ assertTrue(entryLogger.logExists(logId2));
+ assertTrue(entryLogger.logExists(logId3));
+ assertTrue(entryLogger.logExists(logId4));
+
+ // all ledgers exist, nothing should disappear
+ final EntryLogMetadataMap entryLogMetaMap =
gcThread.getEntryLogMetaMap();
+ gcThread.extractMetaFromEntryLogs();
+
+ assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1,
logId2, logId3));
+ assertTrue(entryLogMetaMap.containsKey(logId1));
+ assertTrue(entryLogMetaMap.containsKey(logId2));
+ assertTrue(entryLogger.logExists(logId3));
+
+ storage.deleteLedger(1);
+ // only logId 1 will be compacted.
+ gcThread.runWithFlags(true, true, false);
+
+ // logId1 and logId2 should be compacted
+ assertFalse(entryLogger.logExists(logId1));
+ assertTrue(entryLogger.logExists(logId2));
+ assertTrue(entryLogger.logExists(logId3));
+ assertFalse(entryLogMetaMap.containsKey(logId1));
+ assertTrue(entryLogMetaMap.containsKey(logId2));
+
+ assertEquals(1, storage.getUpdatedLocations().size());
+
+ EntryLocation location2 = storage.getUpdatedLocations().get(0);
+ assertEquals(2, location2.getLedger());
+ assertEquals(1, location2.getEntry());
+ assertEquals(logIdFromLocation(location2.getLocation()), logId4);
+ }
+
+ @Test
+ public void testCompactionWithoutFileSizeCheck() throws Exception {
+ File ledgerDir = tmpDirs.createNew("testFileSize", "ledgers");
+ EntryLogger entryLogger = newLegacyEntryLogger(20000, ledgerDir);
+
+ MockLedgerStorage storage = new MockLedgerStorage();
+ MockLedgerManager lm = new MockLedgerManager();
+
+ GarbageCollectorThread gcThread = new GarbageCollectorThread(
+ TestBKConfiguration.newServerConfiguration(), lm,
+ newDirsManager(ledgerDir),
+ storage, entryLogger, NullStatsLogger.INSTANCE);
+
+ // Add entries.
+ // Ledger 1 is on first entry log
+ // Ledger 2 spans first, second and third entry log
+ // Ledger 3 is on the third entry log (which is still active when
extract meta)
+ long loc1 = entryLogger.addEntry(1L, makeEntry(1L, 1L, 5000));
+ long loc2 = entryLogger.addEntry(2L, makeEntry(2L, 1L, 5000));
+ assertThat(logIdFromLocation(loc2), equalTo(logIdFromLocation(loc1)));
+ long loc3 = entryLogger.addEntry(2L, makeEntry(2L, 2L, 15000));
+ assertThat(logIdFromLocation(loc3),
greaterThan(logIdFromLocation(loc2)));
+ long loc4 = entryLogger.addEntry(2L, makeEntry(2L, 3L, 15000));
+ assertThat(logIdFromLocation(loc4),
greaterThan(logIdFromLocation(loc3)));
+ long loc5 = entryLogger.addEntry(3L, makeEntry(3L, 1L, 1000));
+ assertThat(logIdFromLocation(loc5), equalTo(logIdFromLocation(loc4)));
+
+ long logId1 = logIdFromLocation(loc2);
+ long logId2 = logIdFromLocation(loc3);
+ long logId3 = logIdFromLocation(loc5);
+ entryLogger.flush();
+
+ storage.setMasterKey(1L, new byte[0]);
+ storage.setMasterKey(2L, new byte[0]);
+ storage.setMasterKey(3L, new byte[0]);
+
+ assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1,
logId2));
+ assertTrue(entryLogger.logExists(logId1));
+ assertTrue(entryLogger.logExists(logId2));
+ assertTrue(entryLogger.logExists(logId3));
+
+ // all ledgers exist, nothing should disappear
+ final EntryLogMetadataMap entryLogMetaMap =
gcThread.getEntryLogMetaMap();
+ gcThread.extractMetaFromEntryLogs();
+
+ assertThat(entryLogger.getFlushedLogIds(), containsInAnyOrder(logId1,
logId2));
+ assertTrue(entryLogMetaMap.containsKey(logId1));
+ assertTrue(entryLogMetaMap.containsKey(logId2));
+ assertTrue(entryLogger.logExists(logId3));
+
+ gcThread.runWithFlags(true, true, false);
+
+ assertTrue(entryLogger.logExists(logId1));
+ assertTrue(entryLogger.logExists(logId2));
+ assertTrue(entryLogger.logExists(logId3));
+ assertTrue(entryLogMetaMap.containsKey(logId1));
+ assertTrue(entryLogMetaMap.containsKey(logId2));
+
+ assertEquals(0, storage.getUpdatedLocations().size());
+ }
+
}
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index cef24eaad1..812c5b072d 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -621,6 +621,15 @@ gcEntryLogMetadataCacheEnabled=false
# name "entrylogIndexCache"]
# gcEntryLogMetadataCachePath=
+# When judging whether an entry log file need to be compacted, we calculate
the usage rate of the entry log file based
+# on the actual size of the entry log file. However, if an entry log file is
1MB in size and 0.9MB of data is
+# being used, this entry log file won't be compacted by garbage collector due
to the high usage ratio,
+# which will result in many small entry log files.
+# We introduced the parameter `useTargetEntryLogSizeForGc` to determine
whether to calculate entry log file usage
+# based on the configured target entry log file size, which is configured by
`logSizeLimit`.
+# Default: useTargetEntryLogSizeForGc is false.
+# useTargetEntryLogSizeForGc=false
+
#############################################################################
## Disk utilization
#############################################################################