Repository: bookkeeper
Updated Branches:
  refs/heads/master b6dd50534 -> e0d331781


BOOKKEEPER-836: disable compaction when disk becomes full, otherwise compaction 
will fill up disk quickly (zhaijia via sijie)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e0d33178
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e0d33178
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e0d33178

Branch: refs/heads/master
Commit: e0d331781ec7a0e415bc6f16c38686aab34cb0c5
Parents: b6dd505
Author: Sijie Guo <si...@apache.org>
Authored: Tue Apr 21 00:55:03 2015 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Apr 21 00:55:03 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../bookie/GarbageCollectorThread.java          | 53 ++++++++++++++++---
 .../bookie/InterleavedLedgerStorage.java        | 40 +++++++++++---
 .../bookkeeper/conf/ServerConfiguration.java    | 27 ++++++++++
 .../bookkeeper/bookie/CompactionTest.java       | 55 ++++++++++++++++++++
 5 files changed, 164 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e55b2d6..686b867 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -83,6 +83,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-695: Some entry logs are not removed from the bookie 
storage (Matteo Merli via sijie)
 
+        BOOKKEEPER-836: disable compaction when disk becomes full, otherwise 
compaction will fill up disk quickly (zhaijia via sijie)
+
 Release 4.3.0 - 2014-10-03
 
   Non-backward compatible changes:

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 299fb3e..1ca43e0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -66,6 +66,8 @@ public class GarbageCollectorThread extends BookieThread {
     final double majorCompactionThreshold;
     final long majorCompactionInterval;
 
+    final boolean isForceGCAllowWhenNoSpace;
+
     long lastMinorCompactionTime;
     long lastMajorCompactionTime;
 
@@ -93,6 +95,10 @@ public class GarbageCollectorThread extends BookieThread {
 
     // Boolean to trigger a forced GC.
     final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false);
+    // Boolean to disable major compaction, when disk is almost full
+    final AtomicBoolean suspendMajorCompaction = new AtomicBoolean(false);
+    // Boolean to disable minor compaction, when disk is full
+    final AtomicBoolean suspendMinorCompaction = new AtomicBoolean(false);
 
     final GarbageCollector garbageCollector;
     final GarbageCleaner garbageCleaner;
@@ -264,6 +270,7 @@ public class GarbageCollectorThread extends BookieThread {
         minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
         majorCompactionThreshold = conf.getMajorCompactionThreshold();
         majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND;
+        isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
 
         if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
             if (minorCompactionThreshold > 1.0f) {
@@ -321,6 +328,30 @@ public class GarbageCollectorThread extends BookieThread {
         }
     }
 
+    public void suspendMajorGC() {
+        if (suspendMajorCompaction.compareAndSet(false, true)) {
+            LOG.info("Suspend Major Compaction triggered by thread: {}", 
Thread.currentThread().getName());
+        }
+    }
+    
+    public void resumeMajorGC() {
+        if (suspendMajorCompaction.compareAndSet(true, false)) {
+            LOG.info("{} Major Compaction back to normal since bookie has 
enough space now.", Thread.currentThread().getName());
+        }
+    }
+
+    public void suspendMinorGC() {
+        if (suspendMinorCompaction.compareAndSet(false, true)) {
+            LOG.info("Suspend Minor Compaction triggered by thread: {}", 
Thread.currentThread().getName());
+        }
+    }
+    
+    public void resumeMinorGC() {
+        if (suspendMinorCompaction.compareAndSet(true, false)) {
+            LOG.info("{} Minor Compaction back to normal since bookie has 
enough space now.", Thread.currentThread().getName());
+        }
+    }
+
     @Override
     public void run() {
         while (running) {
@@ -332,6 +363,7 @@ public class GarbageCollectorThread extends BookieThread {
                     continue;
                 }
             }
+
             boolean force = forceGarbageCollection.get();
             if (force) {
                 LOG.info("Garbage collector thread forced to perform GC before 
expiry of wait time.");
@@ -347,11 +379,20 @@ public class GarbageCollectorThread extends BookieThread {
             // gc entry logs
             doGcEntryLogs();
 
+            boolean suspendMajor = suspendMajorCompaction.get();
+            boolean suspendMinor = suspendMinorCompaction.get();
+            if (suspendMajor) {
+                LOG.info("Disk almost full, suspend major compaction to slow 
down filling disk.");
+            }
+            if (suspendMinor) {
+                LOG.info("Disk full, suspend minor compaction to slow down 
filling disk.");
+            }
+
             long curTime = MathUtils.now();
-            if (force || (enableMajorCompaction &&
-                curTime - lastMajorCompactionTime > majorCompactionInterval)) {
+            if (enableMajorCompaction && (!suspendMajor) && 
+                (force || curTime - lastMajorCompactionTime > 
majorCompactionInterval)) {
                 // enter major compaction
-                LOG.info("Enter major compaction");
+                LOG.info("Enter major compaction, suspendMajor {}", 
suspendMajor);
                 doCompactEntryLogs(majorCompactionThreshold);
                 lastMajorCompactionTime = MathUtils.now();
                 // also move minor compaction time
@@ -359,10 +400,10 @@ public class GarbageCollectorThread extends BookieThread {
                 continue;
             }
 
-            if (force || (enableMinorCompaction &&
-                curTime - lastMinorCompactionTime > minorCompactionInterval)) {
+            if (enableMinorCompaction && (!suspendMinor) && 
+                (force || curTime - lastMinorCompactionTime > 
minorCompactionInterval)) {
                 // enter minor compaction
-                LOG.info("Enter minor compaction");
+                LOG.info("Enter minor compaction, suspendMinor {}", 
suspendMinor);
                 doCompactEntryLogs(minorCompactionThreshold);
                 lastMinorCompactionTime = MathUtils.now();
             }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 5ee9ab0..eb27757 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -127,17 +127,31 @@ class InterleavedLedgerStorage implements LedgerStorage, 
EntryLogListener {
 
             @Override
             public void diskAlmostFull(File disk) {
-                gcThread.enableForceGC();
+                if (gcThread.isForceGCAllowWhenNoSpace) {
+                    gcThread.enableForceGC();
+                } else {
+                    gcThread.suspendMajorGC();
+                }
             }
 
             @Override
             public void diskFull(File disk) {
-                gcThread.enableForceGC();
+                if (gcThread.isForceGCAllowWhenNoSpace) {
+                    gcThread.enableForceGC();
+                } else {
+                    gcThread.suspendMajorGC();
+                    gcThread.suspendMinorGC();
+                }
             }
 
             @Override
             public void allDisksFull() {
-                gcThread.enableForceGC();
+                if (gcThread.isForceGCAllowWhenNoSpace) {
+                    gcThread.enableForceGC();
+                } else {
+                    gcThread.suspendMajorGC();
+                    gcThread.suspendMinorGC();
+                }
             }
 
             @Override
@@ -147,14 +161,26 @@ class InterleavedLedgerStorage implements LedgerStorage, 
EntryLogListener {
 
             @Override
             public void diskWritable(File disk) {
-                // we have enough space now, disable force gc.
-                gcThread.disableForceGC();
+                // we have enough space now
+                if (gcThread.isForceGCAllowWhenNoSpace) {
+                    // disable force gc.
+                    gcThread.disableForceGC();
+                } else {
+                    // resume compaction to normal.
+                    gcThread.resumeMajorGC();
+                    gcThread.resumeMinorGC();
+                }
             }
 
             @Override
             public void diskJustWritable(File disk) {
-                // if a disk is just writable, we still need force gc.
-                gcThread.enableForceGC();
+                if (gcThread.isForceGCAllowWhenNoSpace) {
+                    // if a disk is just writable, we still need force gc.
+                    gcThread.enableForceGC();
+                } else {
+                    // still under warn threshold, only resume minor 
compaction.
+                    gcThread.resumeMinorGC();
+                }
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index cd9f7a0..9164616 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -47,6 +47,7 @@ public class ServerConfiguration extends 
AbstractConfiguration {
 
     // Gc Parameters
     protected final static String GC_WAIT_TIME = "gcWaitTime";
+    protected final static String IS_FORCE_GC_ALLOW_WHEN_NO_SPACE = 
"isForceGCAllowWhenNoSpace";
     // Sync Parameters
     protected final static String FLUSH_INTERVAL = "flushInterval";
     // Bookie death watch interval
@@ -783,6 +784,32 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * Get whether force compaction is allowed when disk full or almost full.
+     *
+     * Force GC may get some space back, but may also fill up disk space more
+     * quickly. This is because new log files are created before GC, while old
+     * garbage log files deleted after GC. 
+     *
+     * @return true  - do force GC when disk full, 
+     *         false - suspend GC when disk full.
+     */
+    public boolean getIsForceGCAllowWhenNoSpace() {
+        return getBoolean(IS_FORCE_GC_ALLOW_WHEN_NO_SPACE, false);
+    }
+
+    /**
+     * Set whether force GC is allowed when disk full or almost full.
+     * 
+     * @param force true to allow force GC; false to suspend GC
+     *
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setIsForceGCAllowWhenNoSpace(boolean force) {
+        setProperty(IS_FORCE_GC_ALLOW_WHEN_NO_SPACE, force);
+        return this;
+    }
+
+    /**
      * Set the grace period which the rereplication worker will wait before
      * fencing and rereplicating a ledger fragment which is still being written
      * to, on bookie failure.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e0d33178/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 6c9c4a7..101fdac 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -571,4 +571,59 @@ public class CompactionTest extends 
BookKeeperClusterTestCase {
         bb.flip();
         return bb;
     }
+
+    /**
+     * Suspend garbage collection when suspendMajor/suspendMinor is set.
+     */
+    @Test(timeout=60000)
+    public void testSuspendGarbageCollection() throws Exception {
+        ServerConfiguration conf = newServerConfiguration();
+        conf.setGcWaitTime(500);
+        conf.setMinorCompactionInterval(1);
+        conf.setMajorCompactionInterval(2);
+        LedgerDirsManager dirManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs());
+        CheckpointSource cp = new CheckpointSource() {
+            @Override
+            public Checkpoint newCheckpoint() {
+                // Do nothing.
+                return null;
+            }
+
+            @Override
+            public void checkpointComplete(Checkpoint checkPoint, boolean 
compact)
+                throws IOException {
+                // Do nothing.
+            }
+        };
+        Bookie.checkDirectoryStructure(conf.getJournalDir());
+        for (File dir : dirManager.getAllLedgerDirs()) {
+            Bookie.checkDirectoryStructure(dir);
+        }
+        InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf,
+                        LedgerManagerFactory.newLedgerManagerFactory(conf, 
zkc).newLedgerManager(),
+                        dirManager, cp);
+        storage.start();
+        
+        // test suspend Major GC.
+        Thread.sleep(conf.getMajorCompactionInterval() * 1000
+                   + conf.getGcWaitTime());
+        storage.gcThread.suspendMajorGC();
+        Thread.sleep(1000);
+        long startTime = MathUtils.now();
+        Thread.sleep(conf.getMajorCompactionInterval() * 1000
+                   + conf.getGcWaitTime());
+        assertTrue("major compaction triggered while set suspend",
+                storage.gcThread.lastMajorCompactionTime < startTime); 
+
+        // test suspend Minor GC.
+        storage.gcThread.suspendMinorGC();
+        Thread.sleep(1000);
+        startTime = MathUtils.now();
+        Thread.sleep(conf.getMajorCompactionInterval() * 1000
+                   + conf.getGcWaitTime());
+        assertTrue("minor compaction triggered while set suspend",
+                storage.gcThread.lastMinorCompactionTime < startTime);
+        storage.gcThread.resumeMinorGC();
+        storage.gcThread.resumeMajorGC();
+    }
 }

Reply via email to