This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.17 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 67eacd0e570e3d490b2a1ee54e36a8b2e86dbe1d Author: Xiangying Meng <[email protected]> AuthorDate: Wed Sep 3 10:21:28 2025 +0800 Only stop Gc for the fill disk for DbLedgerStorage (#4661) * Only stop Gc for the fill disk for DbLedgerStorage (cherry picked from commit 0621ae6fce446c00f5becf0c81744bd8f9ec1c90) --- .../bookie/storage/ldb/DbLedgerStorage.java | 2 +- .../ldb/SingleDirectoryDbLedgerStorage.java | 40 +++++- .../bookie/BookieStorageThresholdTest.java | 135 +++++++++++++++++++++ 3 files changed, 171 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 60f752e226..e07a06b69c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -449,7 +449,7 @@ public class DbLedgerStorage implements LedgerStorage { } @VisibleForTesting - List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() { + public List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() { return ledgerStorageList; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index eb1372bc3a..1422743df5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -39,6 +39,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.PrimitiveIterator.OfLong; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; @@ -149,6 +150,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private final Counter flushExecutorTime; private final boolean singleLedgerDirs; + private final String ledgerBaseDir; + private final String indexBaseDir; public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, @@ -158,8 +161,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage throws IOException { checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1, "Db implementation only allows for one storage dir"); - - String ledgerBaseDir = ledgerDirsManager.getAllLedgerDirs().get(0).getPath(); + ledgerBaseDir = ledgerDirsManager.getAllLedgerDirs().get(0).getPath(); // indexBaseDir default use ledgerBaseDir String indexBaseDir = ledgerBaseDir; if (CollectionUtils.isEmpty(indexDirsManager.getAllLedgerDirs()) @@ -172,6 +174,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage log.info("indexDir is specified a separate dir, creating single directory db ledger storage on {}", indexBaseDir); } + this.indexBaseDir = indexBaseDir; StatsLogger ledgerIndexDirStatsLogger = statsLogger .scopeLabel("ledgerDir", ledgerBaseDir) @@ -228,9 +231,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS); }); - ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); + ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener(ledgerBaseDir)); if (!ledgerBaseDir.equals(indexBaseDir)) { - indexDirsManager.addLedgerDirsListener(getLedgerDirsListener()); + indexDirsManager.addLedgerDirsListener(getLedgerDirsListener(indexBaseDir)); } } @@ -1148,11 +1151,19 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage "getListOfEntriesOfLedger method is currently unsupported for SingleDirectoryDbLedgerStorage"); } - private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener() { + private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener(String diskPath) { return new LedgerDirsListener() { + private final String currentFilePath = diskPath; + + private boolean isCurrentFile(File disk) { + return Objects.equals(disk.getPath(), currentFilePath); + } @Override public void diskAlmostFull(File disk) { + if (!isCurrentFile(disk)) { + return; + } if (gcThread.isForceGCAllowWhenNoSpace()) { gcThread.enableForceGC(); } else { @@ -1162,6 +1173,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage @Override public void diskFull(File disk) { + if (!isCurrentFile(disk)) { + return; + } if (gcThread.isForceGCAllowWhenNoSpace()) { gcThread.enableForceGC(); } else { @@ -1182,6 +1196,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage @Override public void diskWritable(File disk) { + if (!isCurrentFile(disk)) { + return; + } // we have enough space now if (gcThread.isForceGCAllowWhenNoSpace()) { // disable force gc. @@ -1195,6 +1212,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage @Override public void diskJustWritable(File disk) { + if (!isCurrentFile(disk)) { + return; + } if (gcThread.isForceGCAllowWhenNoSpace()) { // if a disk is just writable, we still need force gc. gcThread.enableForceGC(); @@ -1297,4 +1317,14 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage DbLedgerStorageStats getDbLedgerStorageStats() { return dbLedgerStorageStats; } + + @VisibleForTesting + public String getLedgerBaseDir() { + return ledgerBaseDir; + } + + @VisibleForTesting + public String getIndexBaseDir() { + return indexBaseDir; + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java index b242a0d866..d322c99d87 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java @@ -21,21 +21,28 @@ package org.apache.bookkeeper.bookie; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; +import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.common.testing.annotations.FlakyTest; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.DiskChecker; @@ -250,4 +257,132 @@ public class BookieStorageThresholdTest extends BookKeeperClusterTestCase { Thread.sleep(500); assertFalse("Bookie should be transitioned to ReadWrite", bookie.isReadOnly()); } + + @org.junit.Test + public void testStopGCOnCorrespondingDiskWhenDiskFull() throws Exception { + // 1. Create test directories + File ledgerDir1 = tmpDirs.createNew("ledger", "test1"); + File ledgerDir2 = tmpDirs.createNew("ledger", "test2"); + + // 2. Configure Bookie + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(1000); + conf.setLedgerDirNames(new String[] { ledgerDir1.getPath(), ledgerDir2.getPath() }); + conf.setDiskCheckInterval(100); // Shorten disk check interval + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + + // 3. Start Bookie and obtain internal components + Bookie bookie = new TestBookieImpl(conf); + BookieImpl bookieImpl = (BookieImpl) bookie; + LedgerDirsManager ledgerDirsManager = bookieImpl.getLedgerDirsManager(); + + // 4. Create custom disk checker (key step) + GCTestDiskChecker diskChecker = new GCTestDiskChecker( + conf.getDiskUsageThreshold(), + conf.getDiskUsageWarnThreshold() + ); + // Set directory status: dir1 full (100%), dir2 normal (50%) + File[] currentDirectories = BookieImpl.getCurrentDirectories(new File[] { ledgerDir1, ledgerDir2 }); + diskChecker.setUsageMap(currentDirectories[0], 1.0f); // 100% usage + diskChecker.setUsageMap(currentDirectories[1], 0.5f); // 50% usage + + // 5. Replace Bookie's disk checker + bookieImpl.dirsMonitor.shutdown(); // Stop default monitor + bookieImpl.dirsMonitor = new LedgerDirsMonitor( + conf, + diskChecker, + Collections.singletonList(ledgerDirsManager) + ); + bookieImpl.dirsMonitor.start(); + + // 6. Add disk state listener + CountDownLatch dir1Full = new CountDownLatch(1); + CountDownLatch dir1Writable = new CountDownLatch(1); + + ledgerDirsManager.addLedgerDirsListener(new LedgerDirsListener() { + @Override + public void diskFull(File disk) { + if (disk.equals(currentDirectories[0])) { + dir1Full.countDown(); + } + } + + @Override + public void diskWritable(File disk) { + if (disk.equals(currentDirectories[0])) { + dir1Writable.countDown(); + } + } + }); + + // 7. Wait for state update (ensure event is triggered) + assertTrue("dir1 did not trigger full state", dir1Full.await(30, TimeUnit.SECONDS)); + + // 8. Verify directory status + List<File> fullDirs = ledgerDirsManager.getFullFilledLedgerDirs(); + List<File> writableDirs = ledgerDirsManager.getWritableLedgerDirs(); + + assertTrue("dir1 should be marked as full", fullDirs.contains(currentDirectories[0])); + assertTrue("dir2 should remain writable", writableDirs.contains(currentDirectories[1])); + assertEquals("Only 1 writable directory should remain", 1, writableDirs.size()); + + // 9. Verify GC status + ((DbLedgerStorage) bookieImpl.ledgerStorage).getLedgerStorageList().forEach(storage -> { + if (Objects.equals(storage.getLedgerBaseDir(), currentDirectories[0].getPath())) { + assertTrue("dir1 should suspend minor GC", storage.isMinorGcSuspended()); + assertTrue("dir1 should suspend major GC", storage.isMajorGcSuspended()); + } else { + assertFalse("dir2 should not suspend minor GC", storage.isMinorGcSuspended()); + assertFalse("dir2 should not suspend major GC", storage.isMajorGcSuspended()); + } + }); + + // 10. Restore dir1 status + diskChecker.setUsageMap(currentDirectories[0], 0.5f); // 50% usage + assertTrue("dir1 did not become writable again", dir1Writable.await(3, TimeUnit.SECONDS)); + + // 11. Verify GC status after recovery + ((DbLedgerStorage) bookieImpl.ledgerStorage).getLedgerStorageList().forEach(storage -> { + if (Objects.equals(storage.getLedgerBaseDir(), currentDirectories[0].getPath())) { + assertFalse("dir1 should not suspend minor GC", storage.isMinorGcSuspended()); + assertFalse("dir1 should not suspend major GC", storage.isMajorGcSuspended()); + } else { + assertFalse("dir2 should not suspend minor GC", storage.isMinorGcSuspended()); + assertFalse("dir2 should not suspend major GC", storage.isMajorGcSuspended()); + } + }); + + // 12. Cleanup + bookie.shutdown(); + } + + // Custom disk checker (simulate different usage for directories) + static class GCTestDiskChecker extends DiskChecker { + private final Map<File, Float> usageMap = new ConcurrentHashMap<>(); + + public GCTestDiskChecker(float threshold, float warnThreshold) { + super(threshold, warnThreshold); + } + + // Set simulated usage for a directory + public void setUsageMap(File dir, float usage) { + usageMap.put(dir, usage); + } + + @Override + public float checkDir(File dir) throws DiskErrorException, DiskWarnThresholdException, DiskOutOfSpaceException { + Float usage = usageMap.get(dir); + if (usage == null) { + return super.checkDir(dir); // Default behavior + } + // Throw exception based on preset usage rate + if (usage >= 1.0) { + throw new DiskOutOfSpaceException("Simulated disk full", usage); + } else if (usage >= 0.9) { + throw new DiskWarnThresholdException("Simulated disk warning", usage); + } + return usage; + } + } + }
