This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fe505670453 [improve][broker] Improve default limits for 
acknowledgment state persistence and Key_Shared look-ahead (#25992)
fe505670453 is described below

commit fe50567045388006b53deffda61fd795f96e6165
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jun 11 01:16:49 2026 +0300

    [improve][broker] Improve default limits for acknowledgment state 
persistence and Key_Shared look-ahead (#25992)
---
 conf/broker.conf                                   | 33 ++++++++++++++------
 conf/standalone.conf                               | 33 ++++++++++++++------
 .../apache/pulsar/broker/ServiceConfiguration.java | 36 ++++++++++++++++------
 3 files changed, 74 insertions(+), 28 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 4826bbeb5c7..c46bade3ab4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -446,7 +446,7 @@ maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
 # Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
 # connected consumer count, 
keySharedLookAheadMsgInReplayThresholdPerSubscription).
 # Setting this value to 0 will disable the limit calculated per consumer.
-keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
+keySharedLookAheadMsgInReplayThresholdPerConsumer=4000
 
 # For Key_Shared subscriptions, if messages cannot be dispatched to consumers 
due to a slow consumer
 # or a blocked key hash (because of ordering constraints), the broker will 
continue reading more
@@ -454,9 +454,18 @@ keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
 # messages reaches the calculated threshold.
 # Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
 # connected consumer count, 
keySharedLookAheadMsgInReplayThresholdPerSubscription).
-# This value should be set to a value less than 2 * 
managedLedgerMaxUnackedRangesToPersist.
-# Setting this value to 0 will disable the limit calculated per subscription.
-keySharedLookAheadMsgInReplayThresholdPerSubscription=20000
+# Keep this value relatively low to increase the cache hit ratio for 
Key_Shared replay queue reads; it should
+# also be less than 2 * managedLedgerMaxUnackedRangesToPersist.
+# However, with workloads that have low key cardinality (few distinct keys), a 
low value can leave some
+# consumers idle, since the dispatcher pauses once this threshold is reached 
and stops reading new messages
+# for the other consumers; increasing the value allows more messages to be in 
flight in such cases. Since the
+# effective threshold is the minimum of the per-consumer and per-subscription 
limits,
+# keySharedLookAheadMsgInReplayThresholdPerConsumer should also be increased, 
or set to 0 (disabled),
+# in that case.
+# Setting this value to 0 will disable the limit calculated per subscription. 
Disabling it might result in
+# the number of unacked ranges to persist exceeding 
managedLedgerMaxUnackedRangesToPersist, therefore
+# disabling is not recommended.
+keySharedLookAheadMsgInReplayThresholdPerSubscription=40000
 
 # Broker periodically checks if subscription is stuck and unblock if flag is 
enabled. (Default is disabled)
 unblockStuckSubscriptionEnabled=false
@@ -1495,7 +1504,13 @@ managedLedgerMaxReadsInFlightSizeInMB=
 # crashes.
 # Truncations emit a WARN log and increment
 # pulsar.broker.managed_ledger.cursor.persist.unacked_ranges.truncated.
-managedLedgerMaxUnackedRangesToPersist=10000
+# Note: when managedLedgerPersistIndividualAckAsLongArray is enabled (the 
default), the persisted size is
+# bounded by the backlog size (the range of entries the cursor spans), not by 
this max number of unacked
+# ranges. For BookKeeper ledger storage, with the default broker 
maxMessageSize and BookKeeper
+# nettyMaxFrameSizeBytes, the state fits a backlog of about 30M entries 
(excluding the
+# managedLedgerMaxBatchDeletedIndexToPersist storage, whose size is instead 
relative to the number of
+# acknowledgment holes).
+managedLedgerMaxUnackedRangesToPersist=200000
 
 # Maximum number of partially acknowledged batch messages per subscription 
that will have their batch
 # deleted indexes persisted. Batch deleted index state is handled when 
acknowledgmentAtBatchIndexLevelEnabled=true.
@@ -1504,7 +1519,7 @@ managedLedgerMaxUnackedRangesToPersist=10000
 # deleted indexes will be cleared while redelivering the messages to consumers.
 # Truncations emit a WARN log and increment
 # pulsar.broker.managed_ledger.cursor.persist.batch_deleted_indexes.truncated.
-managedLedgerMaxBatchDeletedIndexToPersist=10000
+managedLedgerMaxBatchDeletedIndexToPersist=200000
 
 # When storing acknowledgement state, choose a more compact serialization 
format that stores
 # individual acknowledgements as a bitmap which is serialized to an array of 
long values. NOTE: This setting requires
@@ -1519,14 +1534,14 @@ managedLedgerUnackedRangesOpenCacheSetEnabled=true
 # Max number of "acknowledgment holes" that can be stored in MetadataStore. If 
number of unack message range is higher
 # than this limit then broker will persist unacked ranges into bookkeeper to 
avoid additional data overhead into
 # MetadataStore.
-managedLedgerMaxUnackedRangesToPersistInMetadataStore=1000
+managedLedgerMaxUnackedRangesToPersistInMetadataStore=200000
 
 # ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, 
SNAPPY).
 # If value is NONE, then save the ManagedCursorInfo bytes data directly 
without compression.
 # Using compression reduces the size of persistent cursor (subscription) 
metadata. This enables using a higher
 # managedLedgerMaxUnackedRangesToPersistInMetadataStore value and reduces the 
overall metadata stored in
 # the metadata store such as ZooKeeper.
-managedCursorInfoCompressionType=NONE
+managedCursorInfoCompressionType=LZ4
 
 # ManagedCursorInfo compression size threshold (bytes), only compress metadata 
when origin size more then this value.
 # 0 means compression will always apply.
@@ -1538,7 +1553,7 @@ managedCursorInfoCompressionThresholdInBytes=16384
 # individual ledgers in BookKeeper or tiered storage, compression helps 
prevent the metadata size from exceeding
 # the maximum size of a metadata store entry (ZNode in ZooKeeper). This also 
reduces the overall metadata stored
 # in the metadata store such as ZooKeeper.
-managedLedgerInfoCompressionType=NONE
+managedLedgerInfoCompressionType=LZ4
 
 # ManagedLedgerInfo compression size threshold (bytes), only compress metadata 
when origin size more then this value.
 # 0 means compression will always apply.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 2dba365a5a7..bbce4e908da 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -278,7 +278,7 @@ maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
 # Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
 # connected consumer count, 
keySharedLookAheadMsgInReplayThresholdPerSubscription).
 # Setting this value to 0 will disable the limit calculated per consumer.
-keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
+keySharedLookAheadMsgInReplayThresholdPerConsumer=4000
 
 # For Key_Shared subscriptions, if messages cannot be dispatched to consumers 
due to a slow consumer
 # or a blocked key hash (because of ordering constraints), the broker will 
continue reading more
@@ -286,9 +286,18 @@ keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
 # messages reaches the calculated threshold.
 # Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
 # connected consumer count, 
keySharedLookAheadMsgInReplayThresholdPerSubscription).
-# This value should be set to a value less than 2 * 
managedLedgerMaxUnackedRangesToPersist.
-# Setting this value to 0 will disable the limit calculated per subscription.
-keySharedLookAheadMsgInReplayThresholdPerSubscription=20000
+# Keep this value relatively low to increase the cache hit ratio for 
Key_Shared replay queue reads; it should
+# also be less than 2 * managedLedgerMaxUnackedRangesToPersist.
+# However, with workloads that have low key cardinality (few distinct keys), a 
low value can leave some
+# consumers idle, since the dispatcher pauses once this threshold is reached 
and stops reading new messages
+# for the other consumers; increasing the value allows more messages to be in 
flight in such cases. Since the
+# effective threshold is the minimum of the per-consumer and per-subscription 
limits,
+# keySharedLookAheadMsgInReplayThresholdPerConsumer should also be increased, 
or set to 0 (disabled),
+# in that case.
+# Setting this value to 0 will disable the limit calculated per subscription. 
Disabling it might result in
+# the number of unacked ranges to persist exceeding 
managedLedgerMaxUnackedRangesToPersist, therefore
+# disabling is not recommended.
+keySharedLookAheadMsgInReplayThresholdPerSubscription=40000
 
 # Tick time to schedule task that checks topic publish rate limiting across 
all topics
 # Reducing to lower value can give more accuracy while throttling publish but
@@ -978,14 +987,20 @@ managedLedgerMaxSizePerLedgerMbytes=2048
 # that were acknowledged. After the max number of ranges is reached, the 
information
 # will only be tracked in memory and messages will be redelivered in case of
 # crashes.
-managedLedgerMaxUnackedRangesToPersist=10000
+# Note: when managedLedgerPersistIndividualAckAsLongArray is enabled (the 
default), the persisted size is
+# bounded by the backlog size (the range of entries the cursor spans), not by 
this max number of unacked
+# ranges. For BookKeeper ledger storage, with the default broker 
maxMessageSize and BookKeeper
+# nettyMaxFrameSizeBytes, the state fits a backlog of about 30M entries 
(excluding the
+# managedLedgerMaxBatchDeletedIndexToPersist storage, whose size is instead 
relative to the number of
+# acknowledgment holes).
+managedLedgerMaxUnackedRangesToPersist=200000
 
 # Maximum number of partially acknowledged batch messages per subscription 
that will have their batch
 # deleted indexes persisted. Batch deleted index state is handled when 
acknowledgmentAtBatchIndexLevelEnabled=true.
 # When this limit is exceeded, remaining batch message containing the batch 
deleted indexes will
 # only be tracked in memory. In case of broker restarts or load balancing 
events, the batch
 # deleted indexes will be cleared while redelivering the messages to consumers.
-managedLedgerMaxBatchDeletedIndexToPersist=10000
+managedLedgerMaxBatchDeletedIndexToPersist=200000
 
 # When storing acknowledgement state, choose a more compact serialization 
format that stores
 # individual acknowledgements as a bitmap which is serialized to an array of 
long values. NOTE: This setting requires
@@ -1000,14 +1015,14 @@ managedLedgerUnackedRangesOpenCacheSetEnabled=true
 # Max number of "acknowledgment holes" that can be stored in MetadataStore. If 
number of unack message range is higher
 # than this limit then broker will persist unacked ranges into bookkeeper to 
avoid additional data overhead into
 # MetadataStore.
-managedLedgerMaxUnackedRangesToPersistInMetadataStore=1000
+managedLedgerMaxUnackedRangesToPersistInMetadataStore=200000
 
 # ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, 
SNAPPY).
 # If value is NONE, then save the ManagedCursorInfo bytes data directly 
without compression.
 # Using compression reduces the size of persistent cursor (subscription) 
metadata. This enables using a higher
 # managedLedgerMaxUnackedRangesToPersistInMetadataStore value and reduces the 
overall metadata stored in
 # the metadata store such as ZooKeeper.
-managedCursorInfoCompressionType=NONE
+managedCursorInfoCompressionType=LZ4
 
 # ManagedCursorInfo compression size threshold (bytes), only compress metadata 
when origin size more then this value.
 # 0 means compression will always apply.
@@ -1019,7 +1034,7 @@ managedCursorInfoCompressionThresholdInBytes=16384
 # individual ledgers in BookKeeper or tiered storage, compression helps 
prevent the metadata size from exceeding
 # the maximum size of a metadata store entry (ZNode in ZooKeeper). This also 
reduces the overall metadata stored
 # in the metadata store such as ZooKeeper.
-managedLedgerInfoCompressionType=NONE
+managedLedgerInfoCompressionType=LZ4
 
 # ManagedLedgerInfo compression size threshold (bytes), only compress metadata 
when origin size more then this value.
 # 0 means compression will always apply.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 48be38ad809..ae4ec5777de 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1113,7 +1113,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
                     + "Setting this value to 0 will disable the limit 
calculated per consumer.",
             dynamic = true
     )
-    private int keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000;
+    private int keySharedLookAheadMsgInReplayThresholdPerConsumer = 4000;
 
     @FieldContext(
             category = CATEGORY_POLICIES,
@@ -1124,11 +1124,21 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
                     + "Formula: threshold = 
min(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
                     + " connected consumer count, 
keySharedLookAheadMsgInReplayThresholdPerSubscription)"
                     + ".\n"
-                    + "This value should be set to a value less than 2 * 
managedLedgerMaxUnackedRangesToPersist.\n"
-                    + "Setting this value to 0 will disable the limit 
calculated per subscription.\n",
+                    + "Keep this value relatively low to increase the cache 
hit ratio for Key_Shared replay"
+                    + " queue reads; it should also be less than 2 * 
managedLedgerMaxUnackedRangesToPersist.\n"
+                    + "However, with workloads that have low key cardinality 
(few distinct keys), a low value"
+                    + " can leave some consumers idle, since the dispatcher 
pauses once this threshold is"
+                    + " reached and stops reading new messages for the other 
consumers; increasing the value"
+                    + " allows more messages to be in flight in such cases. 
Since the effective threshold is the"
+                    + " minimum of the per-consumer and per-subscription 
limits,"
+                    + " keySharedLookAheadMsgInReplayThresholdPerConsumer 
should also be increased, or set to 0"
+                    + " (disabled), in that case.\n"
+                    + "Setting this value to 0 will disable the limit 
calculated per subscription. Disabling"
+                    + " it might result in the number of unacked ranges to 
persist exceeding"
+                    + " managedLedgerMaxUnackedRangesToPersist, therefore 
disabling is not recommended.\n",
             dynamic = true
     )
-    private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000;
+    private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 40000;
 
     @FieldContext(
             category = CATEGORY_POLICIES,
@@ -2552,8 +2562,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             + " messages are acknowledged is persisted by compressing in 
`ranges` of messages"
             + " that were acknowledged. After the max number of ranges is 
reached, the information"
             + " will only be tracked in memory and messages will be 
redelivered in case of"
-            + " crashes.")
-    private int managedLedgerMaxUnackedRangesToPersist = 10000;
+            + " crashes.\n"
+            + "Note: when managedLedgerPersistIndividualAckAsLongArray is 
enabled (the default), the persisted"
+            + " size is bounded by the backlog size (the range of entries the 
cursor spans), not by this max"
+            + " number of unacked ranges. For BookKeeper ledger storage, with 
the default broker maxMessageSize"
+            + " and BookKeeper nettyMaxFrameSizeBytes, the state fits a 
backlog of about 30M entries (excluding"
+            + " the managedLedgerMaxBatchDeletedIndexToPersist storage, whose 
size is instead relative to the"
+            + " number of acknowledgment holes).")
+    private int managedLedgerMaxUnackedRangesToPersist = 200000;
 
     @FieldContext(category = CATEGORY_STORAGE_ML,
             doc = "Maximum number of partially acknowledged batch messages per 
subscription that will have their batch "
@@ -2562,7 +2578,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
                 + "When this limit is exceeded, remaining batch message 
containing the batch deleted indexes will "
                 + "only be tracked in memory. In case of broker restarts or 
load balancing events, the batch "
                 + "deleted indexes will be cleared while redelivering the 
messages to consumers.")
-    private int managedLedgerMaxBatchDeletedIndexToPersist = 10000;
+    private int managedLedgerMaxBatchDeletedIndexToPersist = 200000;
 
     @FieldContext(category = CATEGORY_STORAGE_ML,
             doc = "When storing acknowledgement state, choose a more compact 
serialization format that stores"
@@ -2589,7 +2605,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             doc = "Max number of `acknowledgment holes` that can be stored in 
MetadataStore.\n\n"
                     + "If number of unack message range is higher than this 
limit then broker will persist"
                     + " unacked ranges into bookkeeper to avoid additional 
data overhead into MetadataStore.")
-    private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000;
+    private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 200000;
     @FieldContext(
             category = CATEGORY_STORAGE_OFFLOADING,
             doc = "When set to true, a BitSet will be used to track 
acknowledged messages that come after the \"mark "
@@ -2668,7 +2684,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(category = CATEGORY_STORAGE_ML,
             doc = "ManagedLedgerInfo compression type, option values (NONE, 
LZ4, ZLIB, ZSTD, SNAPPY). \n"
                     + "If value is invalid or NONE, then save the 
ManagedLedgerInfo bytes data directly.")
-    private String managedLedgerInfoCompressionType = "NONE";
+    private String managedLedgerInfoCompressionType = "LZ4";
 
     @FieldContext(category = CATEGORY_STORAGE_ML,
             doc = "ManagedLedgerInfo compression size threshold (bytes), "
@@ -2680,7 +2696,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(category = CATEGORY_STORAGE_ML,
             doc = "ManagedCursorInfo compression type, option values (NONE, 
LZ4, ZLIB, ZSTD, SNAPPY). \n"
                     + "If value is NONE, then save the ManagedCursorInfo bytes 
data directly.")
-    private String managedCursorInfoCompressionType = "NONE";
+    private String managedCursorInfoCompressionType = "LZ4";
 
 
     @FieldContext(category = CATEGORY_STORAGE_ML,

Reply via email to