This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e413c70  Issue 2728: Entry Log GC may get blocked when using 
entryLogPerLedgerEnabled option (#2779)
e413c70 is described below

commit e413c7094f5a4ba7cde3b2e399f8ebe4366b464c
Author: Raúl Gracia <[email protected]>
AuthorDate: Mon Oct 11 15:35:02 2021 +0200

    Issue 2728: Entry Log GC may get blocked when using 
entryLogPerLedgerEnabled option (#2779)
---
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 30 ++++++++++
 .../bookkeeper/bookie/GarbageCollectorThread.java  | 28 ++++++---
 .../apache/bookkeeper/bookie/CompactionTest.java   | 70 ++++++++++++++++++++++
 .../java/org/apache/bookkeeper/util/TestUtils.java | 36 +++++++----
 4 files changed, 147 insertions(+), 17 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 504adfa..49a9ca4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -482,6 +482,27 @@ public class EntryLogger {
         return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
     }
 
+    /**
+     * Get the last log id created so far. If entryLogPerLedger is enabled, 
the Garbage Collector
+     * process needs to look beyond the least unflushed entry log file, as 
there may be entry logs
+     * ready to be garbage collected.
+     *
+     * @return last entry log id created.
+     */
+    long getLastLogId() {
+        return recentlyCreatedEntryLogsStatus.getLastLogId();
+    }
+
+    /**
+     * Returns whether the current log id exists and has been rotated already.
+     *
+     * @param entryLogId EntryLog id to check.
+     * @return Whether the given entryLogId exists and has been rotated.
+     */
+    boolean isFlushedEntryLog(Long entryLogId) {
+        return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId);
+    }
+
     long getPreviousAllocatedEntryLogId() {
         return entryLoggerAllocator.getPreallocatedLogId();
     }
@@ -1249,5 +1270,14 @@ public class EntryLogger {
         synchronized long getLeastUnflushedLogId() {
             return leastUnflushedLogId;
         }
+
+        synchronized long getLastLogId() {
+            return !entryLogsStatusMap.isEmpty() ? 
entryLogsStatusMap.lastKey() : 0;
+        }
+
+        synchronized boolean isFlushedEntryLog(Long entryLogId) {
+            return entryLogsStatusMap.containsKey(entryLogId) && 
entryLogsStatusMap.get(entryLogId)
+                    || entryLogId < leastUnflushedLogId;
+        }
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index fb54890..cafbf53 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import lombok.Getter;
 import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
@@ -583,12 +584,15 @@ public class GarbageCollectorThread extends SafeRunnable {
      * @throws IOException
      */
     protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, 
EntryLogMetadata> entryLogMetaMap) {
-        // Extract it for every entry log except for the current one.
-        // Entry Log ID's are just a long value that starts at 0 and increments
-        // by 1 when the log fills up and we roll to a new one.
-        long curLogId = entryLogger.getLeastUnflushedLogId();
+        // Entry Log ID's are just a long value that starts at 0 and 
increments by 1 when the log fills up and we roll
+        // to a new one. We scan entry logs as follows:
+        // - entryLogPerLedgerEnabled is false: Extract it for every entry log 
except for the current one (un-flushed).
+        // - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up 
to the highest known id.
+        Supplier<Long> finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled() 
? entryLogger.getLastLogId() :
+                entryLogger.getLeastUnflushedLogId();
         boolean hasExceptionWhenScan = false;
-        for (long entryLogId = scannedLogId; entryLogId < curLogId; 
entryLogId++) {
+        boolean increaseScannedLogId = true;
+        for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get(); 
entryLogId++) {
             // Comb the current entry log file if it has not already been 
extracted.
             if (entryLogMetaMap.containsKey(entryLogId)) {
                 continue;
@@ -600,6 +604,15 @@ public class GarbageCollectorThread extends SafeRunnable {
                 continue;
             }
 
+            // If entryLogPerLedgerEnabled is true, we will look for entry log 
files beyond getLeastUnflushedLogId()
+            // that have been explicitly rotated or below 
getLeastUnflushedLogId().
+            if (conf.isEntryLogPerLedgerEnabled() && 
!entryLogger.isFlushedEntryLog(entryLogId)) {
+                LOG.info("Entry log {} not flushed (entryLogPerLedgerEnabled). 
Starting next iteration at this point.",
+                        entryLogId);
+                increaseScannedLogId = false;
+                continue;
+            }
+
             LOG.info("Extracting entry log meta from entryLogId: {}", 
entryLogId);
 
             try {
@@ -619,8 +632,9 @@ public class GarbageCollectorThread extends SafeRunnable {
 
             // if scan failed on some entry log, we don't move 'scannedLogId' 
to next id
             // if scan succeed, we don't need to scan it again during next gc 
run,
-            // we move 'scannedLogId' to next id
-            if (!hasExceptionWhenScan) {
+            // we move 'scannedLogId' to next id (unless 
entryLogPerLedgerEnabled is true
+            // and we have found and un-flushed entry log already).
+            if (!hasExceptionWhenScan && (!conf.isEntryLogPerLedgerEnabled() 
|| increaseScannedLogId)) {
                 ++scannedLogId;
             }
         }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 408446d..ccf6fd4 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -51,6 +51,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -545,6 +546,75 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
     }
 
     @Test
+    public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws 
Exception {
+        // restart bookies
+        restartBookies(c-> {
+            c.setMajorCompactionThreshold(0.0f);
+            c.setGcWaitTime(60000);
+            c.setMinorCompactionInterval(120000);
+            c.setMajorCompactionInterval(240000);
+            c.setForceAllowCompaction(true);
+            c.setEntryLogPerLedgerEnabled(true);
+            return c;
+        });
+
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, false);
+
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+
+        long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
+        long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
+        assertFalse(getGCThread().enableMajorCompaction);
+        assertTrue(getGCThread().enableMinorCompaction);
+
+        // remove ledgers 1 and 2
+        bkc.deleteLedger(lhs[1].getId());
+        bkc.deleteLedger(lhs[2].getId());
+
+        // Need to wait until entry log 3 gets flushed before initiating GC to 
satisfy assertions.
+        while (!getGCThread().entryLogger.isFlushedEntryLog(3L)) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+
+        LOG.info("Finished deleting the ledgers contains most entries.");
+        getGCThread().triggerGC(true, false, false).get();
+
+        assertEquals(lastMajorCompactionTime, 
getGCThread().lastMajorCompactionTime);
+        assertTrue(getGCThread().lastMinorCompactionTime > 
lastMinorCompactionTime);
+
+        // At this point, we have the following state of ledgers end entry 
logs:
+        // L0 (not deleted) -> E0 (un-flushed): Entry log should exist.
+        // L1 (deleted) -> E1 (un-flushed): Entry log should exist as 
un-flushed entry logs are not considered for GC.
+        // L2 (deleted) -> E2 (flushed): Entry log should have been garbage 
collected.
+        //                 E3 (flushed): Entry log should have been garbage 
collected.
+        //                 E4 (un-flushed): Entry log should exist as 
un-flushed entry logs are not considered for GC.
+        assertTrue("Not found entry log files [0, 1, 4].log that should not 
have been compacted in: "
+                + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0, 
1, 4));
+        assertTrue("Found entry log files [2, 3].log that should have been 
compacted in ledgerDirectory: "
+                + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 2, 
3));
+
+        // Now, let's mark E1 as flushed, as its ledger L1 has been deleted 
already. In this case, the GC algorithm
+        // should consider it for deletion.
+        
getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L);
+        getGCThread().triggerGC(true, false, false).get();
+        assertTrue("Found entry log file 1.log that should have been compacted 
in ledgerDirectory: "
+                + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 
1));
+
+        // Once removed the ledger L0, then deleting E0 is fine (only if it 
has been flushed).
+        bkc.deleteLedger(lhs[0].getId());
+        getGCThread().triggerGC(true, false, false).get();
+        assertTrue("Found entry log file 0.log that should not have been 
compacted in ledgerDirectory: "
+                + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0));
+        
getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L);
+        getGCThread().triggerGC(true, false, false).get();
+        assertTrue("Found entry log file 0.log that should have been compacted 
in ledgerDirectory: "
+                + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 
0));
+    }
+
+    @Test
     public void testMinorCompactionWithNoWritableLedgerDirs() throws Exception 
{
         // prepare data
         LedgerHandle[] lhs = prepareData(3, false);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index 462d472..27f1abb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.util;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
@@ -49,9 +50,31 @@ public final class TestUtils {
         return bookieId.toString().replace('.', '_').replace('-', 
'_').replace(":", "_");
     }
 
+    public static boolean hasAllLogFiles(File ledgerDirectory, Integer... 
logsId) {
+        Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+        return logs.containsAll(Arrays.asList(logsId));
+    }
+
+    public static boolean hasNoneLogFiles(File ledgerDirectory, Integer... 
logsId) {
+        Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+        return Arrays.stream(logsId).noneMatch(logs::contains);
+    }
+
     public static boolean hasLogFiles(File ledgerDirectory, boolean partial, 
Integer... logsId) {
-        boolean result = partial ? false : true;
-        Set<Integer> logs = new HashSet<Integer>();
+        boolean result = !partial;
+        Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+        for (Integer logId : logsId) {
+            boolean exist = logs.contains(logId);
+            if ((partial && exist)
+                    || (!partial && !exist)) {
+                return !result;
+            }
+        }
+        return result;
+    }
+
+    private static Set<Integer> findEntryLogFileIds(File ledgerDirectory) {
+        Set<Integer> logs = new HashSet<>();
         for (File file : 
BookieImpl.getCurrentDirectory(ledgerDirectory).listFiles()) {
             if (file.isFile()) {
                 String name = file.getName();
@@ -61,14 +84,7 @@ public final class TestUtils {
                 logs.add(Integer.parseInt(name.split("\\.")[0], 16));
             }
         }
-        for (Integer logId : logsId) {
-            boolean exist = logs.contains(logId);
-            if ((partial && exist)
-                    || (!partial && !exist)) {
-                return !result;
-            }
-        }
-        return result;
+        return logs;
     }
 
     public static void waitUntilLacUpdated(ReadHandle rh, long newLac) throws 
Exception {

Reply via email to