This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 083fd4cb6fa [improve][broker] Default 
managedLedgerMaxReadsInFlightSizeInMB to 15% of direct memory (#25979)
083fd4cb6fa is described below

commit 083fd4cb6fa7585268a0534ecbc70746348e316b
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Jun 9 10:00:56 2026 +0300

    [improve][broker] Default managedLedgerMaxReadsInFlightSizeInMB to 15% of 
direct memory (#25979)
---
 conf/broker.conf                                   | 19 ++++++----
 conf/standalone.conf                               | 13 +++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 15 +++++---
 .../pulsar/broker/ManagedLedgerClientFactory.java  | 41 +++++++++++++++-------
 .../api/KeySharedSubscriptionBrokerCacheTest.java  |  2 +-
 ...ySharedSubscriptionDisabledBrokerCacheTest.java |  2 +-
 6 files changed, 67 insertions(+), 25 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 8ed01cc8656..4826bbeb5c7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1473,13 +1473,18 @@ managedLedgerCursorMaxEntriesPerLedger=50000
 # Max time before triggering a rollover on a cursor ledger
 managedLedgerCursorRolloverTimeInSeconds=14400
 
-# Maximum amount of memory used hold data read from storage (or from the 
cache).
-# This mechanism prevents the broker to have too many concurrent
-# reads from storage and fall into Out of Memory errors in case
-# of multiple concurrent reads to multiple concurrent consumers.
-# Set 0 in order to disable the feature.
-#
-managedLedgerMaxReadsInFlightSizeInMB=0
+# Maximum buffer size in MB for bytes read from storage (or from the cache).
+# This is the memory retained by data read from storage (or cache) until it 
has been
+# delivered to the Consumer Netty channel. This provides backpressure for 
BookKeeper and
+# tiered storage reads, preventing the broker from having too many concurrent 
reads and
+# running into Out of Memory errors when there are multiple concurrent reads 
to multiple
+# concurrent consumers.
+# When left unset (empty), it defaults to the greater of 
dispatcherMaxReadSizeBytes and 15% of available
+# JVM direct memory; dispatcherMaxReadSizeBytes is the minimum value so the 
limiter can never block the
+# completion of a single read.
+# Set to 0 to disable the feature.
+# Set to a value greater than 0 to use that many MB.
+managedLedgerMaxReadsInFlightSizeInMB=
 
 # Max number of "acknowledgment holes" that are going to be persistently 
stored.
 # When acknowledging out of order, a consumer will leave holes that are 
supposed
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 38c5d1a5b89..2dba365a5a7 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -914,6 +914,19 @@ managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed=true
 # The default value is 2 * managedLedgerCacheEvictionTimeThresholdMillis.
 managedLedgerContinueCachingAddedEntriesAfterLastActiveCursorLeavesMillis=
 
+# Maximum buffer size in MB for bytes read from storage (or from the cache).
+# This is the memory retained by data read from storage (or cache) until it 
has been
+# delivered to the Consumer Netty channel. This provides backpressure for 
BookKeeper and
+# tiered storage reads, preventing the broker from having too many concurrent 
reads and
+# running into Out of Memory errors when there are multiple concurrent reads 
to multiple
+# concurrent consumers.
+# When left unset (empty), it defaults to the greater of 
dispatcherMaxReadSizeBytes and 15% of available
+# JVM direct memory; dispatcherMaxReadSizeBytes is the minimum value so the 
limiter can never block the
+# completion of a single read.
+# Set to 0 to disable the feature.
+# Set to a value greater than 0 to use that many MB.
+managedLedgerMaxReadsInFlightSizeInMB=
+
 # Configure the threshold (in number of entries) from where a cursor should be 
considered 'backlogged'
 # and thus should be set as inactive.
 # This has no effect when cacheEvictionByExpectedReadCount is enabled.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 7771b43059f..48be38ad809 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2335,10 +2335,17 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             + "inserting in cache")
     private boolean managedLedgerCacheCopyEntries = false;
 
-    @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum buffer size 
for bytes read from storage."
-            + " This is the memory retained by data read from storage (or 
cache) until it has been delivered to the"
-            + " Consumer Netty channel. Use O to disable")
-    private long managedLedgerMaxReadsInFlightSizeInMB = 0;
+    @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum buffer size 
in MB for bytes read from storage"
+            + " (or from the cache). This is the memory retained by data read 
from storage (or cache) until it has"
+            + " been delivered to the Consumer Netty channel. This provides 
backpressure for BookKeeper and tiered"
+            + " storage reads, preventing the broker from having too many 
concurrent reads and running into Out of"
+            + " Memory errors when there are multiple concurrent reads to 
multiple concurrent consumers.\n"
+            + "When left unset (empty), it defaults to the greater of 
dispatcherMaxReadSizeBytes and 15% of"
+            + " available JVM direct memory; dispatcherMaxReadSizeBytes is the 
minimum value so the limiter"
+            + " can never block the completion of a single read.\n"
+            + "Set to 0 to disable the feature.\n"
+            + "Set to a value greater than 0 to use that many MB.")
+    private Long managedLedgerMaxReadsInFlightSizeInMB = null;
 
     @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum time to wait 
for acquiring permits for max reads in "
             + "flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0) 
and the limit is reached.")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 0d168b4ca57..3dece00e89a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.stats.CacheMetricsCollector;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 @CustomLog
@@ -90,18 +91,34 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
             );
         }
         
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
-        long managedLedgerMaxReadsInFlightSizeBytes = 
conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L * 1024L;
-        if (managedLedgerMaxReadsInFlightSizeBytes > 0 && 
conf.getDispatcherMaxReadSizeBytes() > 0
-                && managedLedgerMaxReadsInFlightSizeBytes < 
conf.getDispatcherMaxReadSizeBytes()) {
-            log.warn()
-                    .attr("managedLedgerMaxReadsInFlightSizeInMB", 
conf.getManagedLedgerMaxReadsInFlightSizeInMB())
-                    .attr("dispatcherMaxReadSizeBytes", 
conf.getDispatcherMaxReadSizeBytes())
-                    .attr("minManagedLedgerMaxReadsInFlightSizeInMB",
-                            (conf.getDispatcherMaxReadSizeBytes() / (1024L * 
1024L)) + 1)
-                    .log("Invalid configuration:"
-                            + " managedLedgerMaxReadsInFlightSizeInMB in bytes"
-                            + " should be greater than"
-                            + " dispatcherMaxReadSizeBytes");
+        Long managedLedgerMaxReadsInFlightSizeInMB = 
conf.getManagedLedgerMaxReadsInFlightSizeInMB();
+        // A single dispatcher read can retain up to 
dispatcherMaxReadSizeBytes bytes. The in-flight reads
+        // limit must be at least this size, otherwise the limiter can block 
the completion of a single read.
+        long dispatcherMaxReadSizeBytes = conf.getDispatcherMaxReadSizeBytes();
+        final long managedLedgerMaxReadsInFlightSizeBytes;
+        if (managedLedgerMaxReadsInFlightSizeInMB == null) {
+            // When unset, default to 15% of the available JVM direct memory, 
but never below the maximum
+            // size of a single read (dispatcherMaxReadSizeBytes) so that the 
limiter can never block the
+            // completion of one read.
+            long fractionOfDirectMemory = (long) (0.15d * 
DirectMemoryUtils.jvmMaxDirectMemory());
+            managedLedgerMaxReadsInFlightSizeBytes = 
Math.max(fractionOfDirectMemory, dispatcherMaxReadSizeBytes);
+        } else {
+            // An explicit 0 disables the feature; an explicit value > 0 is 
used as-is.
+            managedLedgerMaxReadsInFlightSizeBytes = 
managedLedgerMaxReadsInFlightSizeInMB * 1024L * 1024L;
+            // Warn when the feature is enabled but manually configured below 
the size of a single read, since
+            // the limiter would then be unable to admit one read. Disabled 
(0) is ignored.
+            if (managedLedgerMaxReadsInFlightSizeBytes > 0 && 
dispatcherMaxReadSizeBytes > 0
+                    && managedLedgerMaxReadsInFlightSizeBytes < 
dispatcherMaxReadSizeBytes) {
+                log.warn()
+                        .attr("managedLedgerMaxReadsInFlightSizeInMB", 
managedLedgerMaxReadsInFlightSizeInMB)
+                        .attr("dispatcherMaxReadSizeBytes", 
dispatcherMaxReadSizeBytes)
+                        .attr("minManagedLedgerMaxReadsInFlightSizeInMB",
+                                (dispatcherMaxReadSizeBytes + (1024L * 1024L) 
- 1) / (1024L * 1024L))
+                        .log("Invalid configuration:"
+                                + " managedLedgerMaxReadsInFlightSizeInMB in 
bytes should be greater than or"
+                                + " equal to dispatcherMaxReadSizeBytes, 
otherwise the in-flight reads limiter"
+                                + " can block the completion of a single 
read.");
+            }
         }
         
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(managedLedgerMaxReadsInFlightSizeBytes);
         
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
index d78c78883b7..2419a86c7db 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
@@ -100,7 +100,7 @@ public class KeySharedSubscriptionBrokerCacheTest extends 
ProducerConsumerBase {
         // Important: this is currently necessary to make use of cache for 
replay queue reads
         conf.setCacheEvictionByMarkDeletedPosition(true);
 
-        conf.setManagedLedgerMaxReadsInFlightSizeInMB(100);
+        conf.setManagedLedgerMaxReadsInFlightSizeInMB(100L);
         conf.setDispatcherRetryBackoffInitialTimeInMs(0);
         conf.setDispatcherRetryBackoffMaxTimeInMs(0);
         conf.setKeySharedUnblockingIntervalMs(0);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java
index 7a5b16df32a..f4160bc8a7a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java
@@ -94,7 +94,7 @@ public class KeySharedSubscriptionDisabledBrokerCacheTest 
extends ProducerConsum
         this.conf.setUnblockStuckSubscriptionEnabled(false);
         this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
         conf.setManagedLedgerCacheSizeMB(0);
-        conf.setManagedLedgerMaxReadsInFlightSizeInMB(0);
+        conf.setManagedLedgerMaxReadsInFlightSizeInMB(0L);
         conf.setDispatcherRetryBackoffInitialTimeInMs(0);
         conf.setDispatcherRetryBackoffMaxTimeInMs(0);
         conf.setKeySharedUnblockingIntervalMs(0);

Reply via email to