sijie closed pull request #1125: Issue #1124: Lower memory usage in GarbageCollectionThread while extracting all ledger meta data URL: https://github.com/apache/bookkeeper/pull/1125
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 4880fa354..79515ea4b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -389,16 +389,7 @@ private void doGcEntryLogs() { // Loop through all of the entry logs and remove the non-active ledgers. entryLogMetaMap.forEach((entryLogId, meta) -> { - meta.removeLedgerIf((entryLogLedger) -> { - // Remove the entry log ledger from the set if it isn't active. - try { - return !ledgerStorage.ledgerExists(entryLogLedger); - } catch (IOException e) { - LOG.error("Error reading from ledger storage", e); - return false; - } - }); - + removeIfLedgerNotExists(meta); if (meta.isEmpty()) { // This means the entry log is not associated with any active ledgers anymore. // We can remove this entry log file now. @@ -414,6 +405,18 @@ private void doGcEntryLogs() { this.numActiveEntryLogs = entryLogMetaMap.keySet().size(); } + private void removeIfLedgerNotExists(EntryLogMetadata meta) { + meta.removeLedgerIf((entryLogLedger) -> { + // Remove the entry log ledger from the set if it isn't active. + try { + return !ledgerStorage.ledgerExists(entryLogLedger); + } catch (IOException e) { + LOG.error("Error reading from ledger storage", e); + return false; + } + }); + } + /** * Compact entry logs if necessary. * @@ -546,7 +549,12 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) { try { // Read through the entry log file and extract the entry log meta EntryLogMetadata entryLogMeta = entryLogger.getEntryLogMetadata(entryLogId); - entryLogMetaMap.put(entryLogId, entryLogMeta); + removeIfLedgerNotExists(entryLogMeta); + if (entryLogMeta.isEmpty()) { + entryLogger.removeEntryLog(entryLogId); + } else { + entryLogMetaMap.put(entryLogId, entryLogMeta); + } } catch (IOException e) { hasExceptionWhenScan = true; LOG.warn("Premature exception when processing " + entryLogId diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 6873a5c53..cbf844f26 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -39,8 +39,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -835,6 +837,74 @@ public void checkpointComplete(Checkpoint checkpoint, storage.gcThread.doCompactEntryLogs(threshold); } + /** + * Test extractMetaFromEntryLogs optimized method to avoid excess memory usage. + */ + public void testExtractMetaFromEntryLogs() throws Exception { + // Always run this test with Throttle enabled. + baseConf.setIsThrottleByBytes(true); + // restart bookies + restartBookies(baseConf); + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + File tmpDir = createTempDir("bkTest", ".dir"); + File curDir = Bookie.getCurrentDirectory(tmpDir); + Bookie.checkDirectoryStructure(curDir); + conf.setLedgerDirNames(new String[] { tmpDir.toString() }); + + LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs(), + new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); + final Set<Long> ledgers = Collections + .newSetFromMap(new ConcurrentHashMap<Long, Boolean>()); + + LedgerManager manager = getLedgerManager(ledgers); + + CheckpointSource checkpointSource = new CheckpointSource() { + + @Override + public Checkpoint newCheckpoint() { + return null; + } + + @Override + public void checkpointComplete(Checkpoint checkpoint, + boolean compact) throws IOException { + } + }; + InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); + storage.initialize(conf, manager, dirs, dirs, null, checkpointSource, + Checkpointer.NULL, NullStatsLogger.INSTANCE); + + for (long ledger = 0; ledger <= 10; ledger++) { + ledgers.add(ledger); + for (int entry = 1; entry <= 50; entry++) { + try { + storage.addEntry(genEntry(ledger, entry, ENTRY_SIZE)); + } catch (IOException e) { + //ignore exception on failure to add entry. + } + } + } + + storage.flush(); + storage.shutdown(); + + storage = new InterleavedLedgerStorage(); + storage.initialize(conf, manager, dirs, dirs, null, checkpointSource, + Checkpointer.NULL, NullStatsLogger.INSTANCE); + + long startingEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId() + - storage.gcThread.scannedLogId; + LOG.info("The old Log Entry count is: " + startingEntriesCount); + + Map<Long, EntryLogMetadata> entryLogMetaData = new HashMap<>(); + long finalEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId() + - storage.gcThread.scannedLogId; + LOG.info("The latest Log Entry count is: " + finalEntriesCount); + + assertTrue("The GC did not clean up entries...", startingEntriesCount != finalEntriesCount); + assertTrue("Entries Count is zero", finalEntriesCount == 0); + } + private ByteBuf genEntry(long ledger, long entry, int size) { ByteBuf bb = Unpooled.buffer(size); bb.writeLong(ledger); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services