This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 28de2a8  Issue #1124: Lower memory usage in GarbageCollectionThread 
while extracting all ledger meta data
28de2a8 is described below

commit 28de2a8d626f43eda39af00e452e3c12c98c5cba
Author: Kishor Patil <[email protected]>
AuthorDate: Sat Feb 17 00:58:52 2018 +0800

    Issue #1124: Lower memory usage in GarbageCollectionThread while extracting 
all ledger meta data
    
    Descriptions of the changes in this PR:
    
    The PR contains the fix to cleanup non-existent ledger log entries from 
EntryLogMetadata while extracting all log entries.
    
    Master Issue: #1124
    
    Author: Kishor Patil <[email protected]>
    
    Reviewers: Yiming Zang <[email protected]>, Sijie Guo <[email protected]>
    
    This closes #1125 from kishorvpatil/gcThreadFix, closes #1124
---
 .../bookkeeper/bookie/GarbageCollectorThread.java  | 30 ++++++----
 .../apache/bookkeeper/bookie/CompactionTest.java   | 70 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 11 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 4880fa3..79515ea 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -389,16 +389,7 @@ public class GarbageCollectorThread extends SafeRunnable {
 
         // Loop through all of the entry logs and remove the non-active 
ledgers.
         entryLogMetaMap.forEach((entryLogId, meta) -> {
-            meta.removeLedgerIf((entryLogLedger) -> {
-                // Remove the entry log ledger from the set if it isn't active.
-               try {
-                   return !ledgerStorage.ledgerExists(entryLogLedger);
-               } catch (IOException e) {
-                   LOG.error("Error reading from ledger storage", e);
-                   return false;
-               }
-           });
-
+           removeIfLedgerNotExists(meta);
            if (meta.isEmpty()) {
                // This means the entry log is not associated with any active 
ledgers anymore.
                // We can remove this entry log file now.
@@ -414,6 +405,18 @@ public class GarbageCollectorThread extends SafeRunnable {
         this.numActiveEntryLogs = entryLogMetaMap.keySet().size();
     }
 
+    private void removeIfLedgerNotExists(EntryLogMetadata meta) {
+        meta.removeLedgerIf((entryLogLedger) -> {
+            // Remove the entry log ledger from the set if it isn't active.
+            try {
+                return !ledgerStorage.ledgerExists(entryLogLedger);
+            } catch (IOException e) {
+                LOG.error("Error reading from ledger storage", e);
+                return false;
+            }
+        });
+    }
+
     /**
      * Compact entry logs if necessary.
      *
@@ -546,7 +549,12 @@ public class GarbageCollectorThread extends SafeRunnable {
             try {
                 // Read through the entry log file and extract the entry log 
meta
                 EntryLogMetadata entryLogMeta = 
entryLogger.getEntryLogMetadata(entryLogId);
-                entryLogMetaMap.put(entryLogId, entryLogMeta);
+                removeIfLedgerNotExists(entryLogMeta);
+                if (entryLogMeta.isEmpty()) {
+                    entryLogger.removeEntryLog(entryLogId);
+                } else {
+                    entryLogMetaMap.put(entryLogId, entryLogMeta);
+                }
             } catch (IOException e) {
                 hasExceptionWhenScan = true;
                 LOG.warn("Premature exception when processing " + entryLogId
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 6873a5c..cbf844f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -39,8 +39,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -835,6 +837,74 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
         storage.gcThread.doCompactEntryLogs(threshold);
     }
 
+    /**
+     * Test extractMetaFromEntryLogs optimized method to avoid excess memory 
usage.
+     */
+    public void testExtractMetaFromEntryLogs() throws Exception {
+        // Always run this test with Throttle enabled.
+        baseConf.setIsThrottleByBytes(true);
+        // restart bookies
+        restartBookies(baseConf);
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        File tmpDir = createTempDir("bkTest", ".dir");
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+
+        LedgerDirsManager dirs = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+            new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+        final Set<Long> ledgers = Collections
+            .newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+
+        LedgerManager manager = getLedgerManager(ledgers);
+
+        CheckpointSource checkpointSource = new CheckpointSource() {
+
+            @Override
+            public Checkpoint newCheckpoint() {
+                return null;
+            }
+
+            @Override
+            public void checkpointComplete(Checkpoint checkpoint,
+                                           boolean compact) throws IOException 
{
+            }
+        };
+        InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
+        storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
+            Checkpointer.NULL, NullStatsLogger.INSTANCE);
+
+        for (long ledger = 0; ledger <= 10; ledger++) {
+            ledgers.add(ledger);
+            for (int entry = 1; entry <= 50; entry++) {
+                try {
+                    storage.addEntry(genEntry(ledger, entry, ENTRY_SIZE));
+                } catch (IOException e) {
+                    //ignore exception on failure to add entry.
+                }
+            }
+        }
+
+        storage.flush();
+        storage.shutdown();
+
+        storage = new InterleavedLedgerStorage();
+        storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
+                           Checkpointer.NULL, NullStatsLogger.INSTANCE);
+
+        long startingEntriesCount = 
storage.gcThread.entryLogger.getLeastUnflushedLogId()
+            - storage.gcThread.scannedLogId;
+        LOG.info("The old Log Entry count is: " + startingEntriesCount);
+
+        Map<Long, EntryLogMetadata> entryLogMetaData = new HashMap<>();
+        long finalEntriesCount = 
storage.gcThread.entryLogger.getLeastUnflushedLogId()
+            - storage.gcThread.scannedLogId;
+        LOG.info("The latest Log Entry count is: " + finalEntriesCount);
+
+        assertTrue("The GC did not clean up entries...", startingEntriesCount 
!= finalEntriesCount);
+        assertTrue("Entries Count is zero", finalEntriesCount == 0);
+    }
+
     private ByteBuf genEntry(long ledger, long entry, int size) {
         ByteBuf bb = Unpooled.buffer(size);
         bb.writeLong(ledger);

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to