sijie closed pull request #1125: Issue #1124: Lower memory usage in 
GarbageCollectionThread while extracting all ledger meta data
URL: https://github.com/apache/bookkeeper/pull/1125
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 4880fa354..79515ea4b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -389,16 +389,7 @@ private void doGcEntryLogs() {
 
         // Loop through all of the entry logs and remove the non-active 
ledgers.
         entryLogMetaMap.forEach((entryLogId, meta) -> {
-            meta.removeLedgerIf((entryLogLedger) -> {
-                // Remove the entry log ledger from the set if it isn't active.
-               try {
-                   return !ledgerStorage.ledgerExists(entryLogLedger);
-               } catch (IOException e) {
-                   LOG.error("Error reading from ledger storage", e);
-                   return false;
-               }
-           });
-
+           removeIfLedgerNotExists(meta);
            if (meta.isEmpty()) {
                // This means the entry log is not associated with any active 
ledgers anymore.
                // We can remove this entry log file now.
@@ -414,6 +405,18 @@ private void doGcEntryLogs() {
         this.numActiveEntryLogs = entryLogMetaMap.keySet().size();
     }
 
+    private void removeIfLedgerNotExists(EntryLogMetadata meta) {
+        meta.removeLedgerIf((entryLogLedger) -> {
+            // Remove the entry log ledger from the set if it isn't active.
+            try {
+                return !ledgerStorage.ledgerExists(entryLogLedger);
+            } catch (IOException e) {
+                LOG.error("Error reading from ledger storage", e);
+                return false;
+            }
+        });
+    }
+
     /**
      * Compact entry logs if necessary.
      *
@@ -546,7 +549,12 @@ protected void compactEntryLog(EntryLogMetadata 
entryLogMeta) {
             try {
                 // Read through the entry log file and extract the entry log 
meta
                 EntryLogMetadata entryLogMeta = 
entryLogger.getEntryLogMetadata(entryLogId);
-                entryLogMetaMap.put(entryLogId, entryLogMeta);
+                removeIfLedgerNotExists(entryLogMeta);
+                if (entryLogMeta.isEmpty()) {
+                    entryLogger.removeEntryLog(entryLogId);
+                } else {
+                    entryLogMetaMap.put(entryLogId, entryLogMeta);
+                }
             } catch (IOException e) {
                 hasExceptionWhenScan = true;
                 LOG.warn("Premature exception when processing " + entryLogId
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 6873a5c53..cbf844f26 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -39,8 +39,10 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -835,6 +837,74 @@ public void checkpointComplete(Checkpoint checkpoint,
         storage.gcThread.doCompactEntryLogs(threshold);
     }
 
+    /**
+     * Test extractMetaFromEntryLogs optimized method to avoid excess memory 
usage.
+     */
+    public void testExtractMetaFromEntryLogs() throws Exception {
+        // Always run this test with Throttle enabled.
+        baseConf.setIsThrottleByBytes(true);
+        // restart bookies
+        restartBookies(baseConf);
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        File tmpDir = createTempDir("bkTest", ".dir");
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+
+        LedgerDirsManager dirs = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+            new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+        final Set<Long> ledgers = Collections
+            .newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+
+        LedgerManager manager = getLedgerManager(ledgers);
+
+        CheckpointSource checkpointSource = new CheckpointSource() {
+
+            @Override
+            public Checkpoint newCheckpoint() {
+                return null;
+            }
+
+            @Override
+            public void checkpointComplete(Checkpoint checkpoint,
+                                           boolean compact) throws IOException 
{
+            }
+        };
+        InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
+        storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
+            Checkpointer.NULL, NullStatsLogger.INSTANCE);
+
+        for (long ledger = 0; ledger <= 10; ledger++) {
+            ledgers.add(ledger);
+            for (int entry = 1; entry <= 50; entry++) {
+                try {
+                    storage.addEntry(genEntry(ledger, entry, ENTRY_SIZE));
+                } catch (IOException e) {
+                    //ignore exception on failure to add entry.
+                }
+            }
+        }
+
+        storage.flush();
+        storage.shutdown();
+
+        storage = new InterleavedLedgerStorage();
+        storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
+                           Checkpointer.NULL, NullStatsLogger.INSTANCE);
+
+        long startingEntriesCount = 
storage.gcThread.entryLogger.getLeastUnflushedLogId()
+            - storage.gcThread.scannedLogId;
+        LOG.info("The old Log Entry count is: " + startingEntriesCount);
+
+        Map<Long, EntryLogMetadata> entryLogMetaData = new HashMap<>();
+        long finalEntriesCount = 
storage.gcThread.entryLogger.getLeastUnflushedLogId()
+            - storage.gcThread.scannedLogId;
+        LOG.info("The latest Log Entry count is: " + finalEntriesCount);
+
+        assertTrue("The GC did not clean up entries...", startingEntriesCount 
!= finalEntriesCount);
+        assertTrue("Entries Count is zero", finalEntriesCount == 0);
+    }
+
     private ByteBuf genEntry(long ledger, long entry, int size) {
         ByteBuf bb = Unpooled.buffer(size);
         bb.writeLong(ledger);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to