This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 083fd4cb6fa [improve][broker] Default
managedLedgerMaxReadsInFlightSizeInMB to 15% of direct memory (#25979)
083fd4cb6fa is described below
commit 083fd4cb6fa7585268a0534ecbc70746348e316b
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Jun 9 10:00:56 2026 +0300
[improve][broker] Default managedLedgerMaxReadsInFlightSizeInMB to 15% of
direct memory (#25979)
---
conf/broker.conf | 19 ++++++----
conf/standalone.conf | 13 +++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 15 +++++---
.../pulsar/broker/ManagedLedgerClientFactory.java | 41 +++++++++++++++-------
.../api/KeySharedSubscriptionBrokerCacheTest.java | 2 +-
...ySharedSubscriptionDisabledBrokerCacheTest.java | 2 +-
6 files changed, 67 insertions(+), 25 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 8ed01cc8656..4826bbeb5c7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1473,13 +1473,18 @@ managedLedgerCursorMaxEntriesPerLedger=50000
# Max time before triggering a rollover on a cursor ledger
managedLedgerCursorRolloverTimeInSeconds=14400
-# Maximum amount of memory used hold data read from storage (or from the
cache).
-# This mechanism prevents the broker to have too many concurrent
-# reads from storage and fall into Out of Memory errors in case
-# of multiple concurrent reads to multiple concurrent consumers.
-# Set 0 in order to disable the feature.
-#
-managedLedgerMaxReadsInFlightSizeInMB=0
+# Maximum buffer size in MB for bytes read from storage (or from the cache).
+# This is the memory retained by data read from storage (or cache) until it
has been
+# delivered to the Consumer Netty channel. This provides backpressure for
BookKeeper and
+# tiered storage reads, preventing the broker from having too many concurrent
reads and
+# running into Out of Memory errors when there are multiple concurrent reads
to multiple
+# concurrent consumers.
+# When left unset (empty), it defaults to the greater of
dispatcherMaxReadSizeBytes and 15% of available
+# JVM direct memory; dispatcherMaxReadSizeBytes is the minimum value so the
limiter can never block the
+# completion of a single read.
+# Set to 0 to disable the feature.
+# Set to a value greater than 0 to use that many MB.
+managedLedgerMaxReadsInFlightSizeInMB=
# Max number of "acknowledgment holes" that are going to be persistently
stored.
# When acknowledging out of order, a consumer will leave holes that are
supposed
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 38c5d1a5b89..2dba365a5a7 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -914,6 +914,19 @@ managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed=true
# The default value is 2 * managedLedgerCacheEvictionTimeThresholdMillis.
managedLedgerContinueCachingAddedEntriesAfterLastActiveCursorLeavesMillis=
+# Maximum buffer size in MB for bytes read from storage (or from the cache).
+# This is the memory retained by data read from storage (or cache) until it
has been
+# delivered to the Consumer Netty channel. This provides backpressure for
BookKeeper and
+# tiered storage reads, preventing the broker from having too many concurrent
reads and
+# running into Out of Memory errors when there are multiple concurrent reads
to multiple
+# concurrent consumers.
+# When left unset (empty), it defaults to the greater of
dispatcherMaxReadSizeBytes and 15% of available
+# JVM direct memory; dispatcherMaxReadSizeBytes is the minimum value so the
limiter can never block the
+# completion of a single read.
+# Set to 0 to disable the feature.
+# Set to a value greater than 0 to use that many MB.
+managedLedgerMaxReadsInFlightSizeInMB=
+
# Configure the threshold (in number of entries) from where a cursor should be
considered 'backlogged'
# and thus should be set as inactive.
# This has no effect when cacheEvictionByExpectedReadCount is enabled.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 7771b43059f..48be38ad809 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2335,10 +2335,17 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ "inserting in cache")
private boolean managedLedgerCacheCopyEntries = false;
- @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum buffer size
for bytes read from storage."
- + " This is the memory retained by data read from storage (or
cache) until it has been delivered to the"
- + " Consumer Netty channel. Use O to disable")
- private long managedLedgerMaxReadsInFlightSizeInMB = 0;
+ @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum buffer size
in MB for bytes read from storage"
+ + " (or from the cache). This is the memory retained by data read
from storage (or cache) until it has"
+ + " been delivered to the Consumer Netty channel. This provides
backpressure for BookKeeper and tiered"
+ + " storage reads, preventing the broker from having too many
concurrent reads and running into Out of"
+ + " Memory errors when there are multiple concurrent reads to
multiple concurrent consumers.\n"
+ + "When left unset (empty), it defaults to the greater of
dispatcherMaxReadSizeBytes and 15% of"
+ + " available JVM direct memory; dispatcherMaxReadSizeBytes is the
minimum value so the limiter"
+ + " can never block the completion of a single read.\n"
+ + "Set to 0 to disable the feature.\n"
+ + "Set to a value greater than 0 to use that many MB.")
+ private Long managedLedgerMaxReadsInFlightSizeInMB = null;
@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum time to wait
for acquiring permits for max reads in "
+ "flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0)
and the limit is reached.")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 0d168b4ca57..3dece00e89a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.stats.CacheMetricsCollector;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@CustomLog
@@ -90,18 +91,34 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
);
}
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
- long managedLedgerMaxReadsInFlightSizeBytes =
conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L * 1024L;
- if (managedLedgerMaxReadsInFlightSizeBytes > 0 &&
conf.getDispatcherMaxReadSizeBytes() > 0
- && managedLedgerMaxReadsInFlightSizeBytes <
conf.getDispatcherMaxReadSizeBytes()) {
- log.warn()
- .attr("managedLedgerMaxReadsInFlightSizeInMB",
conf.getManagedLedgerMaxReadsInFlightSizeInMB())
- .attr("dispatcherMaxReadSizeBytes",
conf.getDispatcherMaxReadSizeBytes())
- .attr("minManagedLedgerMaxReadsInFlightSizeInMB",
- (conf.getDispatcherMaxReadSizeBytes() / (1024L *
1024L)) + 1)
- .log("Invalid configuration:"
- + " managedLedgerMaxReadsInFlightSizeInMB in bytes"
- + " should be greater than"
- + " dispatcherMaxReadSizeBytes");
+ Long managedLedgerMaxReadsInFlightSizeInMB =
conf.getManagedLedgerMaxReadsInFlightSizeInMB();
+ // A single dispatcher read can retain up to
dispatcherMaxReadSizeBytes bytes. The in-flight reads
+ // limit must be at least this size, otherwise the limiter can block
the completion of a single read.
+ long dispatcherMaxReadSizeBytes = conf.getDispatcherMaxReadSizeBytes();
+ final long managedLedgerMaxReadsInFlightSizeBytes;
+ if (managedLedgerMaxReadsInFlightSizeInMB == null) {
+ // When unset, default to 15% of the available JVM direct memory,
but never below the maximum
+ // size of a single read (dispatcherMaxReadSizeBytes) so that the
limiter can never block the
+ // completion of one read.
+ long fractionOfDirectMemory = (long) (0.15d *
DirectMemoryUtils.jvmMaxDirectMemory());
+ managedLedgerMaxReadsInFlightSizeBytes =
Math.max(fractionOfDirectMemory, dispatcherMaxReadSizeBytes);
+ } else {
+ // An explicit 0 disables the feature; an explicit value > 0 is
used as-is.
+ managedLedgerMaxReadsInFlightSizeBytes =
managedLedgerMaxReadsInFlightSizeInMB * 1024L * 1024L;
+ // Warn when the feature is enabled but manually configured below
the size of a single read, since
+ // the limiter would then be unable to admit one read. Disabled
(0) is ignored.
+ if (managedLedgerMaxReadsInFlightSizeBytes > 0 &&
dispatcherMaxReadSizeBytes > 0
+ && managedLedgerMaxReadsInFlightSizeBytes <
dispatcherMaxReadSizeBytes) {
+ log.warn()
+ .attr("managedLedgerMaxReadsInFlightSizeInMB",
managedLedgerMaxReadsInFlightSizeInMB)
+ .attr("dispatcherMaxReadSizeBytes",
dispatcherMaxReadSizeBytes)
+ .attr("minManagedLedgerMaxReadsInFlightSizeInMB",
+ (dispatcherMaxReadSizeBytes + (1024L * 1024L)
- 1) / (1024L * 1024L))
+ .log("Invalid configuration:"
+ + " managedLedgerMaxReadsInFlightSizeInMB in
bytes should be greater than or"
+ + " equal to dispatcherMaxReadSizeBytes,
otherwise the in-flight reads limiter"
+ + " can block the completion of a single
read.");
+ }
}
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(managedLedgerMaxReadsInFlightSizeBytes);
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
index d78c78883b7..2419a86c7db 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java
@@ -100,7 +100,7 @@ public class KeySharedSubscriptionBrokerCacheTest extends
ProducerConsumerBase {
// Important: this is currently necessary to make use of cache for
replay queue reads
conf.setCacheEvictionByMarkDeletedPosition(true);
- conf.setManagedLedgerMaxReadsInFlightSizeInMB(100);
+ conf.setManagedLedgerMaxReadsInFlightSizeInMB(100L);
conf.setDispatcherRetryBackoffInitialTimeInMs(0);
conf.setDispatcherRetryBackoffMaxTimeInMs(0);
conf.setKeySharedUnblockingIntervalMs(0);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java
index 7a5b16df32a..f4160bc8a7a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java
@@ -94,7 +94,7 @@ public class KeySharedSubscriptionDisabledBrokerCacheTest
extends ProducerConsum
this.conf.setUnblockStuckSubscriptionEnabled(false);
this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
conf.setManagedLedgerCacheSizeMB(0);
- conf.setManagedLedgerMaxReadsInFlightSizeInMB(0);
+ conf.setManagedLedgerMaxReadsInFlightSizeInMB(0L);
conf.setDispatcherRetryBackoffInitialTimeInMs(0);
conf.setDispatcherRetryBackoffMaxTimeInMs(0);
conf.setKeySharedUnblockingIntervalMs(0);