This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new e413c70 Issue 2728: Entry Log GC may get blocked when using
entryLogPerLedgerEnabled option (#2779)
e413c70 is described below
commit e413c7094f5a4ba7cde3b2e399f8ebe4366b464c
Author: Raúl Gracia <[email protected]>
AuthorDate: Mon Oct 11 15:35:02 2021 +0200
Issue 2728: Entry Log GC may get blocked when using
entryLogPerLedgerEnabled option (#2779)
---
.../org/apache/bookkeeper/bookie/EntryLogger.java | 30 ++++++++++
.../bookkeeper/bookie/GarbageCollectorThread.java | 28 ++++++---
.../apache/bookkeeper/bookie/CompactionTest.java | 70 ++++++++++++++++++++++
.../java/org/apache/bookkeeper/util/TestUtils.java | 36 +++++++----
4 files changed, 147 insertions(+), 17 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 504adfa..49a9ca4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -482,6 +482,27 @@ public class EntryLogger {
return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
}
+ /**
+ * Get the last log id created so far. If entryLogPerLedger is enabled,
the Garbage Collector
+ * process needs to look beyond the least unflushed entry log file, as
there may be entry logs
+ * ready to be garbage collected.
+ *
+ * @return last entry log id created.
+ */
+ long getLastLogId() {
+ return recentlyCreatedEntryLogsStatus.getLastLogId();
+ }
+
+ /**
+ * Returns whether the current log id exists and has been rotated already.
+ *
+ * @param entryLogId EntryLog id to check.
+ * @return Whether the given entryLogId exists and has been rotated.
+ */
+ boolean isFlushedEntryLog(Long entryLogId) {
+ return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId);
+ }
+
long getPreviousAllocatedEntryLogId() {
return entryLoggerAllocator.getPreallocatedLogId();
}
@@ -1249,5 +1270,14 @@ public class EntryLogger {
synchronized long getLeastUnflushedLogId() {
return leastUnflushedLogId;
}
+
+ synchronized long getLastLogId() {
+ return !entryLogsStatusMap.isEmpty() ?
entryLogsStatusMap.lastKey() : 0;
+ }
+
+ synchronized boolean isFlushedEntryLog(Long entryLogId) {
+ return entryLogsStatusMap.containsKey(entryLogId) &&
entryLogsStatusMap.get(entryLogId)
+ || entryLogId < leastUnflushedLogId;
+ }
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index fb54890..cafbf53 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import lombok.Getter;
import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
@@ -583,12 +584,15 @@ public class GarbageCollectorThread extends SafeRunnable {
* @throws IOException
*/
protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long,
EntryLogMetadata> entryLogMetaMap) {
- // Extract it for every entry log except for the current one.
- // Entry Log ID's are just a long value that starts at 0 and increments
- // by 1 when the log fills up and we roll to a new one.
- long curLogId = entryLogger.getLeastUnflushedLogId();
+ // Entry Log ID's are just a long value that starts at 0 and
increments by 1 when the log fills up and we roll
+ // to a new one. We scan entry logs as follows:
+ // - entryLogPerLedgerEnabled is false: Extract it for every entry log
except for the current one (un-flushed).
+ // - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up
to the highest known id.
+ Supplier<Long> finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled()
? entryLogger.getLastLogId() :
+ entryLogger.getLeastUnflushedLogId();
boolean hasExceptionWhenScan = false;
- for (long entryLogId = scannedLogId; entryLogId < curLogId;
entryLogId++) {
+ boolean increaseScannedLogId = true;
+ for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get();
entryLogId++) {
// Comb the current entry log file if it has not already been
extracted.
if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
@@ -600,6 +604,15 @@ public class GarbageCollectorThread extends SafeRunnable {
continue;
}
+ // If entryLogPerLedgerEnabled is true, we will look for entry log
files beyond getLeastUnflushedLogId()
+ // that have been explicitly rotated or below
getLeastUnflushedLogId().
+ if (conf.isEntryLogPerLedgerEnabled() &&
!entryLogger.isFlushedEntryLog(entryLogId)) {
+ LOG.info("Entry log {} not flushed (entryLogPerLedgerEnabled).
Starting next iteration at this point.",
+ entryLogId);
+ increaseScannedLogId = false;
+ continue;
+ }
+
LOG.info("Extracting entry log meta from entryLogId: {}",
entryLogId);
try {
@@ -619,8 +632,9 @@ public class GarbageCollectorThread extends SafeRunnable {
// if scan failed on some entry log, we don't move 'scannedLogId'
to next id
// if scan succeed, we don't need to scan it again during next gc
run,
- // we move 'scannedLogId' to next id
- if (!hasExceptionWhenScan) {
+ // we move 'scannedLogId' to next id (unless
entryLogPerLedgerEnabled is true
+ // and we have found and un-flushed entry log already).
+ if (!hasExceptionWhenScan && (!conf.isEntryLogPerLedgerEnabled()
|| increaseScannedLogId)) {
++scannedLogId;
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 408446d..ccf6fd4 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -51,6 +51,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -545,6 +546,75 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
}
@Test
+ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws
Exception {
+ // restart bookies
+ restartBookies(c-> {
+ c.setMajorCompactionThreshold(0.0f);
+ c.setGcWaitTime(60000);
+ c.setMinorCompactionInterval(120000);
+ c.setMajorCompactionInterval(240000);
+ c.setForceAllowCompaction(true);
+ c.setEntryLogPerLedgerEnabled(true);
+ return c;
+ });
+
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, false);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
+ long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
+ assertFalse(getGCThread().enableMajorCompaction);
+ assertTrue(getGCThread().enableMinorCompaction);
+
+ // remove ledgers 1 and 2
+ bkc.deleteLedger(lhs[1].getId());
+ bkc.deleteLedger(lhs[2].getId());
+
+ // Need to wait until entry log 3 gets flushed before initiating GC to
satisfy assertions.
+ while (!getGCThread().entryLogger.isFlushedEntryLog(3L)) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+
+ LOG.info("Finished deleting the ledgers contains most entries.");
+ getGCThread().triggerGC(true, false, false).get();
+
+ assertEquals(lastMajorCompactionTime,
getGCThread().lastMajorCompactionTime);
+ assertTrue(getGCThread().lastMinorCompactionTime >
lastMinorCompactionTime);
+
+ // At this point, we have the following state of ledgers end entry
logs:
+ // L0 (not deleted) -> E0 (un-flushed): Entry log should exist.
+ // L1 (deleted) -> E1 (un-flushed): Entry log should exist as
un-flushed entry logs are not considered for GC.
+ // L2 (deleted) -> E2 (flushed): Entry log should have been garbage
collected.
+ // E3 (flushed): Entry log should have been garbage
collected.
+ // E4 (un-flushed): Entry log should exist as
un-flushed entry logs are not considered for GC.
+ assertTrue("Not found entry log files [0, 1, 4].log that should not
have been compacted in: "
+ + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0,
1, 4));
+ assertTrue("Found entry log files [2, 3].log that should have been
compacted in ledgerDirectory: "
+ + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 2,
3));
+
+ // Now, let's mark E1 as flushed, as its ledger L1 has been deleted
already. In this case, the GC algorithm
+ // should consider it for deletion.
+
getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L);
+ getGCThread().triggerGC(true, false, false).get();
+ assertTrue("Found entry log file 1.log that should have been compacted
in ledgerDirectory: "
+ + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0),
1));
+
+ // Once removed the ledger L0, then deleting E0 is fine (only if it
has been flushed).
+ bkc.deleteLedger(lhs[0].getId());
+ getGCThread().triggerGC(true, false, false).get();
+ assertTrue("Found entry log file 0.log that should not have been
compacted in ledgerDirectory: "
+ + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0));
+
getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L);
+ getGCThread().triggerGC(true, false, false).get();
+ assertTrue("Found entry log file 0.log that should have been compacted
in ledgerDirectory: "
+ + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0),
0));
+ }
+
+ @Test
public void testMinorCompactionWithNoWritableLedgerDirs() throws Exception
{
// prepare data
LedgerHandle[] lhs = prepareData(3, false);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index 462d472..27f1abb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -22,6 +22,7 @@
package org.apache.bookkeeper.util;
import java.io.File;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -49,9 +50,31 @@ public final class TestUtils {
return bookieId.toString().replace('.', '_').replace('-',
'_').replace(":", "_");
}
+ public static boolean hasAllLogFiles(File ledgerDirectory, Integer...
logsId) {
+ Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+ return logs.containsAll(Arrays.asList(logsId));
+ }
+
+ public static boolean hasNoneLogFiles(File ledgerDirectory, Integer...
logsId) {
+ Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+ return Arrays.stream(logsId).noneMatch(logs::contains);
+ }
+
public static boolean hasLogFiles(File ledgerDirectory, boolean partial,
Integer... logsId) {
- boolean result = partial ? false : true;
- Set<Integer> logs = new HashSet<Integer>();
+ boolean result = !partial;
+ Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+ for (Integer logId : logsId) {
+ boolean exist = logs.contains(logId);
+ if ((partial && exist)
+ || (!partial && !exist)) {
+ return !result;
+ }
+ }
+ return result;
+ }
+
+ private static Set<Integer> findEntryLogFileIds(File ledgerDirectory) {
+ Set<Integer> logs = new HashSet<>();
for (File file :
BookieImpl.getCurrentDirectory(ledgerDirectory).listFiles()) {
if (file.isFile()) {
String name = file.getName();
@@ -61,14 +84,7 @@ public final class TestUtils {
logs.add(Integer.parseInt(name.split("\\.")[0], 16));
}
}
- for (Integer logId : logsId) {
- boolean exist = logs.contains(logId);
- if ((partial && exist)
- || (!partial && !exist)) {
- return !result;
- }
- }
- return result;
+ return logs;
}
public static void waitUntilLacUpdated(ReadHandle rh, long newLac) throws
Exception {