Repository: bookkeeper Updated Branches: refs/heads/master b6dd50534 -> e0d331781
BOOKKEEPER-836: disable compaction when disk becomes full, otherwise compaction will fill up disk quickly (zhaijia via sijie) Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e0d33178 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e0d33178 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e0d33178 Branch: refs/heads/master Commit: e0d331781ec7a0e415bc6f16c38686aab34cb0c5 Parents: b6dd505 Author: Sijie Guo <si...@apache.org> Authored: Tue Apr 21 00:55:03 2015 -0700 Committer: Sijie Guo <si...@apache.org> Committed: Tue Apr 21 00:55:03 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../bookie/GarbageCollectorThread.java | 53 ++++++++++++++++--- .../bookie/InterleavedLedgerStorage.java | 40 +++++++++++--- .../bookkeeper/conf/ServerConfiguration.java | 27 ++++++++++ .../bookkeeper/bookie/CompactionTest.java | 55 ++++++++++++++++++++ 5 files changed, 164 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e55b2d6..686b867 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -83,6 +83,8 @@ Trunk (unreleased changes) BOOKKEEPER-695: Some entry logs are not removed from the bookie storage (Matteo Merli via sijie) + BOOKKEEPER-836: disable compaction when disk becomes full, otherwise compaction will fill up disk quickly (zhaijia via sijie) + Release 4.3.0 - 2014-10-03 Non-backward compatible changes: http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 299fb3e..1ca43e0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -66,6 +66,8 @@ public class GarbageCollectorThread extends BookieThread { final double majorCompactionThreshold; final long majorCompactionInterval; + final boolean isForceGCAllowWhenNoSpace; + long lastMinorCompactionTime; long lastMajorCompactionTime; @@ -93,6 +95,10 @@ public class GarbageCollectorThread extends BookieThread { // Boolean to trigger a forced GC. final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false); + // Boolean to disable major compaction, when disk is almost full + final AtomicBoolean suspendMajorCompaction = new AtomicBoolean(false); + // Boolean to disable minor compaction, when disk is full + final AtomicBoolean suspendMinorCompaction = new AtomicBoolean(false); final GarbageCollector garbageCollector; final GarbageCleaner garbageCleaner; @@ -264,6 +270,7 @@ public class GarbageCollectorThread extends BookieThread { minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND; majorCompactionThreshold = conf.getMajorCompactionThreshold(); majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND; + isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace(); if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) { if (minorCompactionThreshold > 1.0f) { @@ -321,6 +328,30 @@ public class GarbageCollectorThread extends BookieThread { } } + public void suspendMajorGC() { + if (suspendMajorCompaction.compareAndSet(false, true)) { + LOG.info("Suspend Major Compaction triggered by thread: {}", Thread.currentThread().getName()); + } + } + + public void resumeMajorGC() { + if (suspendMajorCompaction.compareAndSet(true, false)) { + LOG.info("{} Major Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName()); + } + } + + public void suspendMinorGC() { + if (suspendMinorCompaction.compareAndSet(false, true)) { + LOG.info("Suspend Minor Compaction triggered by thread: {}", Thread.currentThread().getName()); + } + } + + public void resumeMinorGC() { + if (suspendMinorCompaction.compareAndSet(true, false)) { + LOG.info("{} Minor Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName()); + } + } + @Override public void run() { while (running) { @@ -332,6 +363,7 @@ public class GarbageCollectorThread extends BookieThread { continue; } } + boolean force = forceGarbageCollection.get(); if (force) { LOG.info("Garbage collector thread forced to perform GC before expiry of wait time."); @@ -347,11 +379,20 @@ public class GarbageCollectorThread extends BookieThread { // gc entry logs doGcEntryLogs(); + boolean suspendMajor = suspendMajorCompaction.get(); + boolean suspendMinor = suspendMinorCompaction.get(); + if (suspendMajor) { + LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); + } + if (suspendMinor) { + LOG.info("Disk full, suspend minor compaction to slow down filling disk."); + } + long curTime = MathUtils.now(); - if (force || (enableMajorCompaction && - curTime - lastMajorCompactionTime > majorCompactionInterval)) { + if (enableMajorCompaction && (!suspendMajor) && + (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { // enter major compaction - LOG.info("Enter major compaction"); + LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); doCompactEntryLogs(majorCompactionThreshold); lastMajorCompactionTime = MathUtils.now(); // also move minor compaction time @@ -359,10 +400,10 @@ public class GarbageCollectorThread extends BookieThread { continue; } - if (force || (enableMinorCompaction && - curTime - lastMinorCompactionTime > minorCompactionInterval)) { + if (enableMinorCompaction && (!suspendMinor) && + (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { // enter minor compaction - LOG.info("Enter minor compaction"); + LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); doCompactEntryLogs(minorCompactionThreshold); lastMinorCompactionTime = MathUtils.now(); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 5ee9ab0..eb27757 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -127,17 +127,31 @@ class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener { @Override public void diskAlmostFull(File disk) { - gcThread.enableForceGC(); + if (gcThread.isForceGCAllowWhenNoSpace) { + gcThread.enableForceGC(); + } else { + gcThread.suspendMajorGC(); + } } @Override public void diskFull(File disk) { - gcThread.enableForceGC(); + if (gcThread.isForceGCAllowWhenNoSpace) { + gcThread.enableForceGC(); + } else { + gcThread.suspendMajorGC(); + gcThread.suspendMinorGC(); + } } @Override public void allDisksFull() { - gcThread.enableForceGC(); + if (gcThread.isForceGCAllowWhenNoSpace) { + gcThread.enableForceGC(); + } else { + gcThread.suspendMajorGC(); + gcThread.suspendMinorGC(); + } } @Override @@ -147,14 +161,26 @@ class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener { @Override public void diskWritable(File disk) { - // we have enough space now, disable force gc. - gcThread.disableForceGC(); + // we have enough space now + if (gcThread.isForceGCAllowWhenNoSpace) { + // disable force gc. + gcThread.disableForceGC(); + } else { + // resume compaction to normal. + gcThread.resumeMajorGC(); + gcThread.resumeMinorGC(); + } } @Override public void diskJustWritable(File disk) { - // if a disk is just writable, we still need force gc. - gcThread.enableForceGC(); + if (gcThread.isForceGCAllowWhenNoSpace) { + // if a disk is just writable, we still need force gc. + gcThread.enableForceGC(); + } else { + // still under warn threshold, only resume minor compaction. + gcThread.resumeMinorGC(); + } } }; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index cd9f7a0..9164616 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -47,6 +47,7 @@ public class ServerConfiguration extends AbstractConfiguration { // Gc Parameters protected final static String GC_WAIT_TIME = "gcWaitTime"; + protected final static String IS_FORCE_GC_ALLOW_WHEN_NO_SPACE = "isForceGCAllowWhenNoSpace"; // Sync Parameters protected final static String FLUSH_INTERVAL = "flushInterval"; // Bookie death watch interval @@ -783,6 +784,32 @@ public class ServerConfiguration extends AbstractConfiguration { } /** + * Get whether force compaction is allowed when disk full or almost full. + * + * Force GC may get some space back, but may also fill up disk space more + * quickly. This is because new log files are created before GC, while old + * garbage log files deleted after GC. + * + * @return true - do force GC when disk full, + * false - suspend GC when disk full. + */ + public boolean getIsForceGCAllowWhenNoSpace() { + return getBoolean(IS_FORCE_GC_ALLOW_WHEN_NO_SPACE, false); + } + + /** + * Set whether force GC is allowed when disk full or almost full. + * + * @param force true to allow force GC; false to suspend GC + * + * @return ServerConfiguration + */ + public ServerConfiguration setIsForceGCAllowWhenNoSpace(boolean force) { + setProperty(IS_FORCE_GC_ALLOW_WHEN_NO_SPACE, force); + return this; + } + + /** * Set the grace period which the rereplication worker will wait before * fencing and rereplicating a ledger fragment which is still being written * to, on bookie failure. http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 6c9c4a7..101fdac 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -571,4 +571,59 @@ public class CompactionTest extends BookKeeperClusterTestCase { bb.flip(); return bb; } + + /** + * Suspend garbage collection when suspendMajor/suspendMinor is set. + */ + @Test(timeout=60000) + public void testSuspendGarbageCollection() throws Exception { + ServerConfiguration conf = newServerConfiguration(); + conf.setGcWaitTime(500); + conf.setMinorCompactionInterval(1); + conf.setMajorCompactionInterval(2); + LedgerDirsManager dirManager = new LedgerDirsManager(conf, conf.getLedgerDirs()); + CheckpointSource cp = new CheckpointSource() { + @Override + public Checkpoint newCheckpoint() { + // Do nothing. + return null; + } + + @Override + public void checkpointComplete(Checkpoint checkPoint, boolean compact) + throws IOException { + // Do nothing. + } + }; + Bookie.checkDirectoryStructure(conf.getJournalDir()); + for (File dir : dirManager.getAllLedgerDirs()) { + Bookie.checkDirectoryStructure(dir); + } + InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf, + LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(), + dirManager, cp); + storage.start(); + + // test suspend Major GC. + Thread.sleep(conf.getMajorCompactionInterval() * 1000 + + conf.getGcWaitTime()); + storage.gcThread.suspendMajorGC(); + Thread.sleep(1000); + long startTime = MathUtils.now(); + Thread.sleep(conf.getMajorCompactionInterval() * 1000 + + conf.getGcWaitTime()); + assertTrue("major compaction triggered while set suspend", + storage.gcThread.lastMajorCompactionTime < startTime); + + // test suspend Minor GC. + storage.gcThread.suspendMinorGC(); + Thread.sleep(1000); + startTime = MathUtils.now(); + Thread.sleep(conf.getMajorCompactionInterval() * 1000 + + conf.getGcWaitTime()); + assertTrue("minor compaction triggered while set suspend", + storage.gcThread.lastMinorCompactionTime < startTime); + storage.gcThread.resumeMinorGC(); + storage.gcThread.resumeMajorGC(); + } }