This is an automated email from the ASF dual-hosted git repository.

thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee33c99606e [improve][broker] PIP-444: Rate limit for deleting ledger 
to alleviate the zk pressure. (#24760)
ee33c99606e is described below

commit ee33c99606e8f573dc55f47529e7c209bd8194e3
Author: Wenzhi Feng <[email protected]>
AuthorDate: Sat Oct 11 15:28:31 2025 +0800

    [improve][broker] PIP-444: Rate limit for deleting ledger to alleviate the 
zk pressure. (#24760)
    
    Co-authored-by: fengwenzhi <[email protected]>
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    | 28 +++++++++++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 47 +++++++++++++++++++---
 .../apache/pulsar/broker/ServiceConfiguration.java |  5 +++
 .../pulsar/broker/service/BrokerService.java       | 25 ++++++++++++
 4 files changed, 100 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index bd4c1a5014e..8fbf1cfda0d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -23,6 +23,8 @@ import java.nio.charset.StandardCharsets;
 import java.time.Clock;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.Setter;
@@ -61,6 +63,8 @@ public class ManagedLedgerConfig {
     private int metadataMaxEntriesPerLedger = 50000;
     private int ledgerRolloverTimeout = 4 * 3600;
     private double throttleMarkDelete = 0;
+    private Semaphore ledgerDeletionSemaphore;
+    private ExecutorService ledgerDeleteExecutor;
     private long retentionTimeMs = 0;
     private long retentionSizeInMB = 0;
     private boolean autoSkipNonRecoverableData;
@@ -410,6 +414,30 @@ public class ManagedLedgerConfig {
         return this;
     }
 
+    /**
+     * @return the semaphore used to limit concurrent ledger deletions
+     */
+    public Semaphore getLedgerDeletionSemaphore() {
+        return ledgerDeletionSemaphore;
+    }
+
+    public ManagedLedgerConfig setLedgerDeletionSemaphore(Semaphore semaphore) 
{
+        this.ledgerDeletionSemaphore = semaphore;
+        return this;
+    }
+
+    /**
+     * @return the executor service to be used for deleting ledgers
+     */
+    public ExecutorService getLedgerDeleteExecutor() {
+        return ledgerDeleteExecutor;
+    }
+
+    public ManagedLedgerConfig setLedgerDeleteExecutor(ExecutorService 
executor) {
+        this.ledgerDeleteExecutor = executor;
+        return this;
+    }
+
     /**
      * Set the retention time for the ManagedLedger.
      * <p>
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d157677d210..346ddb2f6c9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -57,6 +57,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -354,6 +355,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     private long lastEvictOffloadedLedgers;
     private static final int MINIMUM_EVICTION_INTERVAL_DIVIDER = 10;
 
+    // Semaphore to limit concurrent ledger deletion
+    private Semaphore deleteLedgerSemaphore = null;
+    // Executor service for executing ledger deletion tasks
+    private ExecutorService deleteLedgerExecutor = null;
+
     public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper 
bookKeeper, MetaStore store,
             ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
             final String name) {
@@ -402,6 +408,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         this.minBacklogEntriesForCaching = 
config.getMinimumBacklogEntriesForCaching();
         this.maxBacklogBetweenCursorsForCaching = 
config.getMaxBacklogBetweenCursorsForCaching();
         this.managedLedgerAttributes = new ManagedLedgerAttributes(this);
+        if (config.getLedgerDeletionSemaphore() != null) {
+            this.deleteLedgerSemaphore = config.getLedgerDeletionSemaphore();
+            this.deleteLedgerExecutor = config.getLedgerDeleteExecutor();
+        }
     }
 
     synchronized void initialize(final ManagedLedgerInitializeLedgerCallback 
callback, final Object ctx) {
@@ -569,7 +579,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             public void operationComplete(Void v, Stat stat) {
                 ledgersStat = stat;
                 emptyLedgersToBeDeleted.forEach(ledgerId -> {
-                    bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+                    asyncDeleteLedgerWithConcurrencyLimit(ledgerId, (rc, ctx) 
-> {
                         log.info("[{}] Deleted empty ledger ledgerId={} 
rc={}", name, ledgerId, rc);
                     }, null);
                 });
@@ -1763,7 +1773,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     log.warn("[{}] Error updating meta data with the new list 
of ledgers: {}", name, e.getMessage());
                     handleBadVersion(e);
                     mbean.startDataLedgerDeleteOp();
-                    bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
+                    asyncDeleteLedgerWithConcurrencyLimit(lh.getId(), (rc1, 
ctx1) -> {
                         mbean.endDataLedgerDeleteOp();
                         if (rc1 != BKException.Code.OK) {
                             log.warn("[{}] Failed to delete ledger {}: {}", 
name, lh.getId(),
@@ -1846,7 +1856,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         STATE_UPDATER.set(this, State.LedgerOpened);
         // Delete original "currentLedger" if it has been removed from 
"ledgers".
         if (originalCurrentLedger != null && 
!ledgers.containsKey(originalCurrentLedger.getId())){
-            bookKeeper.asyncDeleteLedger(originalCurrentLedger.getId(), (rc, 
ctx) -> {
+            
asyncDeleteLedgerWithConcurrencyLimit(originalCurrentLedger.getId(), (rc, ctx) 
-> {
                 mbean.endDataLedgerDeleteOp();
                 log.info("[{}] Delete complete for empty ledger {}. rc={}", 
name, originalCurrentLedger.getId(), rc);
             }, null);
@@ -3385,7 +3395,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     private void asyncDeleteLedgerWithRetry(CompletableFuture<Void> future, 
long ledgerId, long retry) {
-        bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+        asyncDeleteLedgerWithConcurrencyLimit(ledgerId, (rc, ctx) -> {
             if (isNoSuchLedgerExistsException(rc)) {
                 log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
                 future.complete(null);
@@ -3408,6 +3418,33 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }, null);
     }
 
+    /**
+     * Delete a ledger asynchronously, applying a concurrency limit if 
configured.
+     * @param ledgerId
+     * @param cb
+     * @param ctx
+     */
+    private void asyncDeleteLedgerWithConcurrencyLimit(long ledgerId,
+                                                       
org.apache.bookkeeper.client.AsyncCallback.DeleteCallback cb,
+                                                       Object ctx) {
+        if (deleteLedgerSemaphore != null) {
+            AsyncCallback.DeleteCallback cbWrapper = (rc, ctx1) -> {
+                deleteLedgerSemaphore.release();
+                cb.deleteComplete(rc, ctx1);
+            };
+            deleteLedgerExecutor.execute(() -> {
+                try {
+                    deleteLedgerSemaphore.acquire();
+                    bookKeeper.asyncDeleteLedger(ledgerId, cbWrapper, ctx);
+                } catch (InterruptedException e) {
+                    log.error("[{}] Interrupted while waiting to delete ledger 
{}", name, ledgerId);
+                }
+            });
+        } else {
+            bookKeeper.asyncDeleteLedger(ledgerId, cb, ctx);
+        }
+    }
+
     @SuppressWarnings("checkstyle:fallthrough")
     private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
         List<LedgerInfo> ledgers = 
Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values());
@@ -3422,7 +3459,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Deleting ledger {}", name, ls);
             }
-            bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> {
+            asyncDeleteLedgerWithConcurrencyLimit(ls.getLedgerId(), (rc, ctx1) 
-> {
                 switch (rc) {
                 case Code.NoSuchLedgerExistsException:
                 case Code.NoSuchLedgerExistsOnMetadataServerException:
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5ca0db944a4..30fef55ece3 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2262,6 +2262,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "Rate limit the amount of writes per second generated by 
consumer acking the messages"
     )
     private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
+    @FieldContext(
+            category = CATEGORY_STORAGE_ML,
+            doc = "Max number of concurrent requests for deleting ledgers at 
broker level"
+    )
+    private int managedLedgerDeleteMaxConcurrentRequests = 1000;
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
         dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5c19de44341..79dffdf7aad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -329,6 +329,12 @@ public class BrokerService implements Closeable {
     private final TopicEventsDispatcher topicEventsDispatcher = new 
TopicEventsDispatcher();
     private volatile boolean unloaded = false;
 
+    // semaphore for limiting the concurrency of ledger deletion at broker 
level,
+    // thus all managed ledgers sharing the same semaphore
+    private final Semaphore ledgerDeletionSemaphore;
+
+    private final ExecutorProvider ledgerDeletionExecutorProvider;
+
     public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) 
throws Exception {
         this.pulsar = pulsar;
         this.clock = pulsar.getClock();
@@ -451,6 +457,16 @@ public class BrokerService implements Closeable {
                         .getBrokerEntryPayloadProcessors(), 
BrokerService.class.getClassLoader());
 
         this.bundlesQuotas = new BundlesQuotas(pulsar);
+        if 
(pulsar.getConfiguration().getManagedLedgerDeleteMaxConcurrentRequests() > 0) {
+            log.info("Setting managed ledger deletion max concurrent requests 
to {}",
+                    
pulsar.getConfiguration().getManagedLedgerDeleteMaxConcurrentRequests());
+            this.ledgerDeletionSemaphore = new Semaphore(
+                    
pulsar.getConfiguration().getManagedLedgerDeleteMaxConcurrentRequests());
+            this.ledgerDeletionExecutorProvider = new ExecutorProvider(1, 
"pulsar-ledger-deletion");
+        } else {
+            this.ledgerDeletionSemaphore = null;
+            this.ledgerDeletionExecutorProvider = null;
+        }
     }
 
     protected DispatchRateLimiterFactory 
createDispatchRateLimiterFactory(ServiceConfiguration config)
@@ -802,6 +818,12 @@ public class BrokerService implements Closeable {
         try {
             log.info("Shutting down Pulsar Broker service");
 
+            // shutdown executor for ledger deletion
+            if (ledgerDeletionExecutorProvider != null) {
+                log.info("Shutting down executor for ledger deletion...");
+                ledgerDeletionExecutorProvider.shutdownNow();
+            }
+
             // unregister non-static metrics collectors
             pendingTopicLoadRequests.unregister();
             pendingLookupRequests.unregister();
@@ -2057,6 +2079,9 @@ public class BrokerService implements Closeable {
             
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()
 >= 0
                     ? persistencePolicies.getManagedLedgerMaxMarkDeleteRate()
                     : 
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit());
+            
managedLedgerConfig.setLedgerDeletionSemaphore(this.ledgerDeletionSemaphore);
+            
managedLedgerConfig.setLedgerDeleteExecutor(this.ledgerDeletionExecutorProvider 
!= null
+                    ? this.ledgerDeletionExecutorProvider.getExecutor() : 
null);
             
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
             
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
 

Reply via email to