Repository: bookkeeper Updated Branches: refs/heads/master bf8ef14bc -> 825e0e7b4
BOOKKEEPER-1004: Allow bookie garbage collection to be triggered manu⦠Ran CompactionTest#testForceGarbageCollection Author: Govind Menon <[email protected]> Reviewers: Enrico Olivelli <[email protected]> Closes #109 from govind-menon/BOOKKEEPER-1004 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/825e0e7b Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/825e0e7b Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/825e0e7b Branch: refs/heads/master Commit: 825e0e7b4a27730d85afbe823a405ba333de3a65 Parents: bf8ef14 Author: Govind Menon <[email protected]> Authored: Tue Mar 28 13:39:22 2017 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Mar 28 13:39:22 2017 -0700 ---------------------------------------------------------------------- .../bookie/GarbageCollectorThread.java | 136 +++++++++++-------- .../bookkeeper/bookie/BookieAccessor.java | 1 + .../bookkeeper/bookie/CompactionTest.java | 4 +- 3 files changed, 80 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/825e0e7b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index aa42475..f4b35f8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -29,16 +29,22 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.SafeRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,13 +52,16 @@ import org.slf4j.LoggerFactory; * This is the garbage collector thread that runs in the background to * remove any entry log files that no longer contains any active ledger. */ -public class GarbageCollectorThread extends BookieThread { +public class GarbageCollectorThread extends SafeRunnable { private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class); private static final int SECOND = 1000; // Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger private Map<Long, EntryLogMetadata> entryLogMetaMap = new ConcurrentHashMap<Long, EntryLogMetadata>(); + ScheduledExecutorService gcExecutor; + Future<?> scheduledFuture = null; + // This is how often we want to run the Garbage Collector Thread (in milliseconds). final long gcWaitTime; @@ -188,7 +197,9 @@ public class GarbageCollectorThread extends BookieThread { LedgerManager ledgerManager, final CompactableLedgerStorage ledgerStorage) throws IOException { - super("GarbageCollectorThread"); + gcExecutor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("GarbageCollectorThread-%d").build() + ); this.entryLogger = ledgerStorage.getEntryLogger(); this.ledgerStorage = ledgerStorage; @@ -266,10 +277,10 @@ public class GarbageCollectorThread extends BookieThread { lastMinorCompactionTime = lastMajorCompactionTime = MathUtils.now(); } - public synchronized void enableForceGC() { + public void enableForceGC() { if (forceGarbageCollection.compareAndSet(false, true)) { LOG.info("Forced garbage collection triggered by thread: {}", Thread.currentThread().getName()); - notify(); + triggerGC(); } } @@ -280,6 +291,13 @@ public class GarbageCollectorThread extends BookieThread { } } + /** + * Manually trigger GC (for testing) + */ + Future<?> triggerGC() { + return gcExecutor.submit(this); + } + public void suspendMajorGC() { if (suspendMajorCompaction.compareAndSet(false, true)) { LOG.info("Suspend Major Compaction triggered by thread: {}", Thread.currentThread().getName()); @@ -304,64 +322,60 @@ public class GarbageCollectorThread extends BookieThread { } } - @Override - public void run() { - while (running) { - synchronized (this) { - try { - wait(gcWaitTime); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - continue; - } - } - - boolean force = forceGarbageCollection.get(); - if (force) { - LOG.info("Garbage collector thread forced to perform GC before expiry of wait time."); - } + public void start() { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + scheduledFuture = gcExecutor.scheduleAtFixedRate(this, gcWaitTime, gcWaitTime, TimeUnit.MILLISECONDS); + } - // Extract all of the ledger ID's that comprise all of the entry logs - // (except for the current new one which is still being written to). - entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap); + @Override + public void safeRun() { + boolean force = forceGarbageCollection.get(); + if (force) { + LOG.info("Garbage collector thread forced to perform GC before expiry of wait time."); + } - // gc inactive/deleted ledgers - doGcLedgers(); + // Extract all of the ledger ID's that comprise all of the entry logs + // (except for the current new one which is still being written to). + entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap); - // gc entry logs - doGcEntryLogs(); + // gc inactive/deleted ledgers + doGcLedgers(); - boolean suspendMajor = suspendMajorCompaction.get(); - boolean suspendMinor = suspendMinorCompaction.get(); - if (suspendMajor) { - LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); - } - if (suspendMinor) { - LOG.info("Disk full, suspend minor compaction to slow down filling disk."); - } + // gc entry logs + doGcEntryLogs(); - long curTime = MathUtils.now(); - if (enableMajorCompaction && (!suspendMajor) && - (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { - // enter major compaction - LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); - doCompactEntryLogs(majorCompactionThreshold); - lastMajorCompactionTime = MathUtils.now(); - // also move minor compaction time - lastMinorCompactionTime = lastMajorCompactionTime; - continue; - } + boolean suspendMajor = suspendMajorCompaction.get(); + boolean suspendMinor = suspendMinorCompaction.get(); + if (suspendMajor) { + LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); + } + if (suspendMinor) { + LOG.info("Disk full, suspend minor compaction to slow down filling disk."); + } - if (enableMinorCompaction && (!suspendMinor) && - (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { - // enter minor compaction - LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); - doCompactEntryLogs(minorCompactionThreshold); - lastMinorCompactionTime = MathUtils.now(); - } + long curTime = MathUtils.now(); + if (enableMajorCompaction && (!suspendMajor) && + (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { + // enter major compaction + LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); + doCompactEntryLogs(majorCompactionThreshold); + lastMajorCompactionTime = MathUtils.now(); + // and also move minor compaction time + lastMinorCompactionTime = lastMajorCompactionTime; forceGarbageCollection.set(false); + return; + } + + if (enableMinorCompaction && (!suspendMinor) && + (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { + // enter minor compaction + LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); + doCompactEntryLogs(minorCompactionThreshold); + lastMinorCompactionTime = MathUtils.now(); } - LOG.info("GarbageCollectorThread exited loop!"); + forceGarbageCollection.set(false); } /** @@ -468,12 +482,16 @@ public class GarbageCollectorThread extends BookieThread { public void shutdown() throws InterruptedException { this.running = false; LOG.info("Shutting down GarbageCollectorThread"); - if (compacting.compareAndSet(false, true)) { - // if setting compacting flag succeed, means gcThread is not compacting now - // it is safe to interrupt itself now - this.interrupt(); + + while (!compacting.compareAndSet(false, true)) { + // Wait till the thread stops compacting + Thread.sleep(100); + } + gcExecutor.shutdown(); + if (gcExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("GC executor did not shut down in 60 seconds. Killing"); + gcExecutor.shutdownNow(); } - this.join(); } /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/825e0e7b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java index f49f8ae..07d810e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; +import java.util.concurrent.Future; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/825e0e7b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index ed881f1..6ae5e60 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -232,9 +232,9 @@ public class CompactionTest extends BookKeeperClusterTestCase { dirManager, dirManager, cp, NullStatsLogger.INSTANCE); storage.start(); long startTime = MathUtils.now(); - Thread.sleep(2000); storage.gcThread.enableForceGC(); - Thread.sleep(1000); + storage.gcThread.triggerGC().get(); //major + storage.gcThread.triggerGC().get(); //minor // Minor and Major compaction times should be larger than when we started // this test. assertTrue("Minor or major compaction did not trigger even on forcing.",
