This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.17
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 67eacd0e570e3d490b2a1ee54e36a8b2e86dbe1d
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Sep 3 10:21:28 2025 +0800

    Only stop Gc for the fill disk for DbLedgerStorage (#4661)
    
    * Only stop Gc for the fill disk for DbLedgerStorage
    
    (cherry picked from commit 0621ae6fce446c00f5becf0c81744bd8f9ec1c90)
---
 .../bookie/storage/ldb/DbLedgerStorage.java        |   2 +-
 .../ldb/SingleDirectoryDbLedgerStorage.java        |  40 +++++-
 .../bookie/BookieStorageThresholdTest.java         | 135 +++++++++++++++++++++
 3 files changed, 171 insertions(+), 6 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 60f752e226..e07a06b69c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -449,7 +449,7 @@ public class DbLedgerStorage implements LedgerStorage {
     }
 
     @VisibleForTesting
-    List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
+    public List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
         return ledgerStorageList;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index eb1372bc3a..1422743df5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -39,6 +39,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.PrimitiveIterator.OfLong;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
@@ -149,6 +150,8 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
 
     private final Counter flushExecutorTime;
     private final boolean singleLedgerDirs;
+    private final String ledgerBaseDir;
+    private final String indexBaseDir;
 
     public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, 
LedgerManager ledgerManager,
                                           LedgerDirsManager ledgerDirsManager, 
LedgerDirsManager indexDirsManager,
@@ -158,8 +161,7 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
             throws IOException {
         checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
                 "Db implementation only allows for one storage dir");
-
-        String ledgerBaseDir = 
ledgerDirsManager.getAllLedgerDirs().get(0).getPath();
+        ledgerBaseDir = ledgerDirsManager.getAllLedgerDirs().get(0).getPath();
         // indexBaseDir default use ledgerBaseDir
         String indexBaseDir = ledgerBaseDir;
         if (CollectionUtils.isEmpty(indexDirsManager.getAllLedgerDirs())
@@ -172,6 +174,7 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
             log.info("indexDir is specified a separate dir, creating single 
directory db ledger storage on {}",
                     indexBaseDir);
         }
+        this.indexBaseDir = indexBaseDir;
 
         StatsLogger ledgerIndexDirStatsLogger = statsLogger
                 .scopeLabel("ledgerDir", ledgerBaseDir)
@@ -228,9 +231,9 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
             flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS);
         });
 
-        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+        
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener(ledgerBaseDir));
         if (!ledgerBaseDir.equals(indexBaseDir)) {
-            indexDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+            
indexDirsManager.addLedgerDirsListener(getLedgerDirsListener(indexBaseDir));
         }
     }
 
@@ -1148,11 +1151,19 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
                 "getListOfEntriesOfLedger method is currently unsupported for 
SingleDirectoryDbLedgerStorage");
     }
 
-    private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener() {
+    private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener(String 
diskPath) {
         return new LedgerDirsListener() {
+            private final String currentFilePath = diskPath;
+
+            private boolean isCurrentFile(File disk) {
+                return Objects.equals(disk.getPath(), currentFilePath);
+            }
 
             @Override
             public void diskAlmostFull(File disk) {
+                if (!isCurrentFile(disk)) {
+                    return;
+                }
                 if (gcThread.isForceGCAllowWhenNoSpace()) {
                     gcThread.enableForceGC();
                 } else {
@@ -1162,6 +1173,9 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
 
             @Override
             public void diskFull(File disk) {
+                if (!isCurrentFile(disk)) {
+                    return;
+                }
                 if (gcThread.isForceGCAllowWhenNoSpace()) {
                     gcThread.enableForceGC();
                 } else {
@@ -1182,6 +1196,9 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
 
             @Override
             public void diskWritable(File disk) {
+                if (!isCurrentFile(disk)) {
+                    return;
+                }
                 // we have enough space now
                 if (gcThread.isForceGCAllowWhenNoSpace()) {
                     // disable force gc.
@@ -1195,6 +1212,9 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
 
             @Override
             public void diskJustWritable(File disk) {
+                if (!isCurrentFile(disk)) {
+                    return;
+                }
                 if (gcThread.isForceGCAllowWhenNoSpace()) {
                     // if a disk is just writable, we still need force gc.
                     gcThread.enableForceGC();
@@ -1297,4 +1317,14 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
     DbLedgerStorageStats getDbLedgerStorageStats() {
         return dbLedgerStorageStats;
     }
+
+    @VisibleForTesting
+    public String getLedgerBaseDir() {
+        return ledgerBaseDir;
+    }
+
+    @VisibleForTesting
+    public String getIndexBaseDir() {
+        return indexBaseDir;
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index b242a0d866..d322c99d87 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -21,21 +21,28 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.DiskChecker;
@@ -250,4 +257,132 @@ public class BookieStorageThresholdTest extends 
BookKeeperClusterTestCase {
         Thread.sleep(500);
         assertFalse("Bookie should be transitioned to ReadWrite", 
bookie.isReadOnly());
     }
+
+    @org.junit.Test
+    public void testStopGCOnCorrespondingDiskWhenDiskFull() throws Exception {
+        // 1. Create test directories
+        File ledgerDir1 = tmpDirs.createNew("ledger", "test1");
+        File ledgerDir2 = tmpDirs.createNew("ledger", "test2");
+
+        // 2. Configure Bookie
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(1000);
+        conf.setLedgerDirNames(new String[] { ledgerDir1.getPath(), 
ledgerDir2.getPath() });
+        conf.setDiskCheckInterval(100); // Shorten disk check interval
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+
+        // 3. Start Bookie and obtain internal components
+        Bookie bookie = new TestBookieImpl(conf);
+        BookieImpl bookieImpl = (BookieImpl) bookie;
+        LedgerDirsManager ledgerDirsManager = 
bookieImpl.getLedgerDirsManager();
+
+        // 4. Create custom disk checker (key step)
+        GCTestDiskChecker diskChecker = new GCTestDiskChecker(
+                conf.getDiskUsageThreshold(),
+                conf.getDiskUsageWarnThreshold()
+        );
+        // Set directory status: dir1 full (100%), dir2 normal (50%)
+        File[] currentDirectories = BookieImpl.getCurrentDirectories(new 
File[] { ledgerDir1, ledgerDir2 });
+        diskChecker.setUsageMap(currentDirectories[0], 1.0f);  // 100% usage
+        diskChecker.setUsageMap(currentDirectories[1], 0.5f);  // 50% usage
+
+        // 5. Replace Bookie's disk checker
+        bookieImpl.dirsMonitor.shutdown(); // Stop default monitor
+        bookieImpl.dirsMonitor = new LedgerDirsMonitor(
+                conf,
+                diskChecker,
+                Collections.singletonList(ledgerDirsManager)
+        );
+        bookieImpl.dirsMonitor.start();
+
+        // 6. Add disk state listener
+        CountDownLatch dir1Full = new CountDownLatch(1);
+        CountDownLatch dir1Writable = new CountDownLatch(1);
+
+        ledgerDirsManager.addLedgerDirsListener(new LedgerDirsListener() {
+            @Override
+            public void diskFull(File disk) {
+                if (disk.equals(currentDirectories[0])) {
+                    dir1Full.countDown();
+                }
+            }
+
+            @Override
+            public void diskWritable(File disk) {
+                if (disk.equals(currentDirectories[0])) {
+                    dir1Writable.countDown();
+                }
+            }
+        });
+
+        // 7. Wait for state update (ensure event is triggered)
+        assertTrue("dir1 did not trigger full state", dir1Full.await(30, 
TimeUnit.SECONDS));
+
+        // 8. Verify directory status
+        List<File> fullDirs = ledgerDirsManager.getFullFilledLedgerDirs();
+        List<File> writableDirs = ledgerDirsManager.getWritableLedgerDirs();
+
+        assertTrue("dir1 should be marked as full", 
fullDirs.contains(currentDirectories[0]));
+        assertTrue("dir2 should remain writable", 
writableDirs.contains(currentDirectories[1]));
+        assertEquals("Only 1 writable directory should remain", 1, 
writableDirs.size());
+
+        // 9. Verify GC status
+        ((DbLedgerStorage) 
bookieImpl.ledgerStorage).getLedgerStorageList().forEach(storage -> {
+            if (Objects.equals(storage.getLedgerBaseDir(), 
currentDirectories[0].getPath())) {
+                assertTrue("dir1 should suspend minor GC", 
storage.isMinorGcSuspended());
+                assertTrue("dir1 should suspend major GC", 
storage.isMajorGcSuspended());
+            } else {
+                assertFalse("dir2 should not suspend minor GC", 
storage.isMinorGcSuspended());
+                assertFalse("dir2 should not suspend major GC", 
storage.isMajorGcSuspended());
+            }
+        });
+
+        // 10. Restore dir1 status
+        diskChecker.setUsageMap(currentDirectories[0], 0.5f);  // 50% usage
+        assertTrue("dir1 did not become writable again", dir1Writable.await(3, 
TimeUnit.SECONDS));
+
+        // 11. Verify GC status after recovery
+        ((DbLedgerStorage) 
bookieImpl.ledgerStorage).getLedgerStorageList().forEach(storage -> {
+            if (Objects.equals(storage.getLedgerBaseDir(), 
currentDirectories[0].getPath())) {
+                assertFalse("dir1 should not suspend minor GC", 
storage.isMinorGcSuspended());
+                assertFalse("dir1 should not suspend major GC", 
storage.isMajorGcSuspended());
+            } else {
+                assertFalse("dir2 should not suspend minor GC", 
storage.isMinorGcSuspended());
+                assertFalse("dir2 should not suspend major GC", 
storage.isMajorGcSuspended());
+            }
+        });
+
+        // 12. Cleanup
+        bookie.shutdown();
+    }
+
+    // Custom disk checker (simulate different usage for directories)
+    static class GCTestDiskChecker extends DiskChecker {
+        private final Map<File, Float> usageMap = new ConcurrentHashMap<>();
+
+        public GCTestDiskChecker(float threshold, float warnThreshold) {
+            super(threshold, warnThreshold);
+        }
+
+        // Set simulated usage for a directory
+        public void setUsageMap(File dir, float usage) {
+            usageMap.put(dir, usage);
+        }
+
+        @Override
+        public float checkDir(File dir) throws DiskErrorException, 
DiskWarnThresholdException, DiskOutOfSpaceException {
+            Float usage = usageMap.get(dir);
+            if (usage == null) {
+                return super.checkDir(dir); // Default behavior
+            }
+            // Throw exception based on preset usage rate
+            if (usage >= 1.0) {
+                throw new DiskOutOfSpaceException("Simulated disk full", 
usage);
+            } else if (usage >= 0.9) {
+                throw new DiskWarnThresholdException("Simulated disk warning", 
usage);
+            }
+            return usage;
+        }
+    }
+
 }

Reply via email to