This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fe505670453 [improve][broker] Improve default limits for
acknowledgment state persistence and Key_Shared look-ahead (#25992)
fe505670453 is described below
commit fe50567045388006b53deffda61fd795f96e6165
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jun 11 01:16:49 2026 +0300
[improve][broker] Improve default limits for acknowledgment state
persistence and Key_Shared look-ahead (#25992)
---
conf/broker.conf | 33 ++++++++++++++------
conf/standalone.conf | 33 ++++++++++++++------
.../apache/pulsar/broker/ServiceConfiguration.java | 36 ++++++++++++++++------
3 files changed, 74 insertions(+), 28 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 4826bbeb5c7..c46bade3ab4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -446,7 +446,7 @@ maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count,
keySharedLookAheadMsgInReplayThresholdPerSubscription).
# Setting this value to 0 will disable the limit calculated per consumer.
-keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
+keySharedLookAheadMsgInReplayThresholdPerConsumer=4000
# For Key_Shared subscriptions, if messages cannot be dispatched to consumers
due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will
continue reading more
@@ -454,9 +454,18 @@ keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
# messages reaches the calculated threshold.
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count,
keySharedLookAheadMsgInReplayThresholdPerSubscription).
-# This value should be set to a value less than 2 *
managedLedgerMaxUnackedRangesToPersist.
-# Setting this value to 0 will disable the limit calculated per subscription.
-keySharedLookAheadMsgInReplayThresholdPerSubscription=20000
+# Keep this value relatively low to increase the cache hit ratio for
Key_Shared replay queue reads; it should
+# also be less than 2 * managedLedgerMaxUnackedRangesToPersist.
+# However, with workloads that have low key cardinality (few distinct keys), a
low value can leave some
+# consumers idle, since the dispatcher pauses once this threshold is reached
and stops reading new messages
+# for the other consumers; increasing the value allows more messages to be in
flight in such cases. Since the
+# effective threshold is the minimum of the per-consumer and per-subscription
limits,
+# keySharedLookAheadMsgInReplayThresholdPerConsumer should also be increased,
or set to 0 (disabled),
+# in that case.
+# Setting this value to 0 will disable the limit calculated per subscription.
Disabling it might result in
+# the number of unacked ranges to persist exceeding
managedLedgerMaxUnackedRangesToPersist, therefore
+# disabling is not recommended.
+keySharedLookAheadMsgInReplayThresholdPerSubscription=40000
# Broker periodically checks if subscription is stuck and unblock if flag is
enabled. (Default is disabled)
unblockStuckSubscriptionEnabled=false
@@ -1495,7 +1504,13 @@ managedLedgerMaxReadsInFlightSizeInMB=
# crashes.
# Truncations emit a WARN log and increment
# pulsar.broker.managed_ledger.cursor.persist.unacked_ranges.truncated.
-managedLedgerMaxUnackedRangesToPersist=10000
+# Note: when managedLedgerPersistIndividualAckAsLongArray is enabled (the
default), the persisted size is
+# bounded by the backlog size (the range of entries the cursor spans), not by
this max number of unacked
+# ranges. For BookKeeper ledger storage, with the default broker
maxMessageSize and BookKeeper
+# nettyMaxFrameSizeBytes, the state fits a backlog of about 30M entries
(excluding the
+# managedLedgerMaxBatchDeletedIndexToPersist storage, whose size is instead
relative to the number of
+# acknowledgment holes).
+managedLedgerMaxUnackedRangesToPersist=200000
# Maximum number of partially acknowledged batch messages per subscription
that will have their batch
# deleted indexes persisted. Batch deleted index state is handled when
acknowledgmentAtBatchIndexLevelEnabled=true.
@@ -1504,7 +1519,7 @@ managedLedgerMaxUnackedRangesToPersist=10000
# deleted indexes will be cleared while redelivering the messages to consumers.
# Truncations emit a WARN log and increment
# pulsar.broker.managed_ledger.cursor.persist.batch_deleted_indexes.truncated.
-managedLedgerMaxBatchDeletedIndexToPersist=10000
+managedLedgerMaxBatchDeletedIndexToPersist=200000
# When storing acknowledgement state, choose a more compact serialization
format that stores
# individual acknowledgements as a bitmap which is serialized to an array of
long values. NOTE: This setting requires
@@ -1519,14 +1534,14 @@ managedLedgerUnackedRangesOpenCacheSetEnabled=true
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If
number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to
avoid additional data overhead into
# MetadataStore.
-managedLedgerMaxUnackedRangesToPersistInMetadataStore=1000
+managedLedgerMaxUnackedRangesToPersistInMetadataStore=200000
# ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD,
SNAPPY).
# If value is NONE, then save the ManagedCursorInfo bytes data directly
without compression.
# Using compression reduces the size of persistent cursor (subscription)
metadata. This enables using a higher
# managedLedgerMaxUnackedRangesToPersistInMetadataStore value and reduces the
overall metadata stored in
# the metadata store such as ZooKeeper.
-managedCursorInfoCompressionType=NONE
+managedCursorInfoCompressionType=LZ4
# ManagedCursorInfo compression size threshold (bytes), only compress metadata
when origin size more then this value.
# 0 means compression will always apply.
@@ -1538,7 +1553,7 @@ managedCursorInfoCompressionThresholdInBytes=16384
# individual ledgers in BookKeeper or tiered storage, compression helps
prevent the metadata size from exceeding
# the maximum size of a metadata store entry (ZNode in ZooKeeper). This also
reduces the overall metadata stored
# in the metadata store such as ZooKeeper.
-managedLedgerInfoCompressionType=NONE
+managedLedgerInfoCompressionType=LZ4
# ManagedLedgerInfo compression size threshold (bytes), only compress metadata
when origin size more then this value.
# 0 means compression will always apply.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 2dba365a5a7..bbce4e908da 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -278,7 +278,7 @@ maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count,
keySharedLookAheadMsgInReplayThresholdPerSubscription).
# Setting this value to 0 will disable the limit calculated per consumer.
-keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
+keySharedLookAheadMsgInReplayThresholdPerConsumer=4000
# For Key_Shared subscriptions, if messages cannot be dispatched to consumers
due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will
continue reading more
@@ -286,9 +286,18 @@ keySharedLookAheadMsgInReplayThresholdPerConsumer=2000
# messages reaches the calculated threshold.
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count,
keySharedLookAheadMsgInReplayThresholdPerSubscription).
-# This value should be set to a value less than 2 *
managedLedgerMaxUnackedRangesToPersist.
-# Setting this value to 0 will disable the limit calculated per subscription.
-keySharedLookAheadMsgInReplayThresholdPerSubscription=20000
+# Keep this value relatively low to increase the cache hit ratio for
Key_Shared replay queue reads; it should
+# also be less than 2 * managedLedgerMaxUnackedRangesToPersist.
+# However, with workloads that have low key cardinality (few distinct keys), a
low value can leave some
+# consumers idle, since the dispatcher pauses once this threshold is reached
and stops reading new messages
+# for the other consumers; increasing the value allows more messages to be in
flight in such cases. Since the
+# effective threshold is the minimum of the per-consumer and per-subscription
limits,
+# keySharedLookAheadMsgInReplayThresholdPerConsumer should also be increased,
or set to 0 (disabled),
+# in that case.
+# Setting this value to 0 will disable the limit calculated per subscription.
Disabling it might result in
+# the number of unacked ranges to persist exceeding
managedLedgerMaxUnackedRangesToPersist, therefore
+# disabling is not recommended.
+keySharedLookAheadMsgInReplayThresholdPerSubscription=40000
# Tick time to schedule task that checks topic publish rate limiting across
all topics
# Reducing to lower value can give more accuracy while throttling publish but
@@ -978,14 +987,20 @@ managedLedgerMaxSizePerLedgerMbytes=2048
# that were acknowledged. After the max number of ranges is reached, the
information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
-managedLedgerMaxUnackedRangesToPersist=10000
+# Note: when managedLedgerPersistIndividualAckAsLongArray is enabled (the
default), the persisted size is
+# bounded by the backlog size (the range of entries the cursor spans), not by
this max number of unacked
+# ranges. For BookKeeper ledger storage, with the default broker
maxMessageSize and BookKeeper
+# nettyMaxFrameSizeBytes, the state fits a backlog of about 30M entries
(excluding the
+# managedLedgerMaxBatchDeletedIndexToPersist storage, whose size is instead
relative to the number of
+# acknowledgment holes).
+managedLedgerMaxUnackedRangesToPersist=200000
# Maximum number of partially acknowledged batch messages per subscription
that will have their batch
# deleted indexes persisted. Batch deleted index state is handled when
acknowledgmentAtBatchIndexLevelEnabled=true.
# When this limit is exceeded, remaining batch message containing the batch
deleted indexes will
# only be tracked in memory. In case of broker restarts or load balancing
events, the batch
# deleted indexes will be cleared while redelivering the messages to consumers.
-managedLedgerMaxBatchDeletedIndexToPersist=10000
+managedLedgerMaxBatchDeletedIndexToPersist=200000
# When storing acknowledgement state, choose a more compact serialization
format that stores
# individual acknowledgements as a bitmap which is serialized to an array of
long values. NOTE: This setting requires
@@ -1000,14 +1015,14 @@ managedLedgerUnackedRangesOpenCacheSetEnabled=true
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If
number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to
avoid additional data overhead into
# MetadataStore.
-managedLedgerMaxUnackedRangesToPersistInMetadataStore=1000
+managedLedgerMaxUnackedRangesToPersistInMetadataStore=200000
# ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD,
SNAPPY).
# If value is NONE, then save the ManagedCursorInfo bytes data directly
without compression.
# Using compression reduces the size of persistent cursor (subscription)
metadata. This enables using a higher
# managedLedgerMaxUnackedRangesToPersistInMetadataStore value and reduces the
overall metadata stored in
# the metadata store such as ZooKeeper.
-managedCursorInfoCompressionType=NONE
+managedCursorInfoCompressionType=LZ4
# ManagedCursorInfo compression size threshold (bytes), only compress metadata
when origin size more then this value.
# 0 means compression will always apply.
@@ -1019,7 +1034,7 @@ managedCursorInfoCompressionThresholdInBytes=16384
# individual ledgers in BookKeeper or tiered storage, compression helps
prevent the metadata size from exceeding
# the maximum size of a metadata store entry (ZNode in ZooKeeper). This also
reduces the overall metadata stored
# in the metadata store such as ZooKeeper.
-managedLedgerInfoCompressionType=NONE
+managedLedgerInfoCompressionType=LZ4
# ManagedLedgerInfo compression size threshold (bytes), only compress metadata
when origin size more then this value.
# 0 means compression will always apply.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 48be38ad809..ae4ec5777de 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1113,7 +1113,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ "Setting this value to 0 will disable the limit
calculated per consumer.",
dynamic = true
)
- private int keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000;
+ private int keySharedLookAheadMsgInReplayThresholdPerConsumer = 4000;
@FieldContext(
category = CATEGORY_POLICIES,
@@ -1124,11 +1124,21 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ "Formula: threshold =
min(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
+ " connected consumer count,
keySharedLookAheadMsgInReplayThresholdPerSubscription)"
+ ".\n"
- + "This value should be set to a value less than 2 *
managedLedgerMaxUnackedRangesToPersist.\n"
- + "Setting this value to 0 will disable the limit
calculated per subscription.\n",
+ + "Keep this value relatively low to increase the cache
hit ratio for Key_Shared replay"
+ + " queue reads; it should also be less than 2 *
managedLedgerMaxUnackedRangesToPersist.\n"
+ + "However, with workloads that have low key cardinality
(few distinct keys), a low value"
+ + " can leave some consumers idle, since the dispatcher
pauses once this threshold is"
+ + " reached and stops reading new messages for the other
consumers; increasing the value"
+ + " allows more messages to be in flight in such cases.
Since the effective threshold is the"
+ + " minimum of the per-consumer and per-subscription
limits,"
+ + " keySharedLookAheadMsgInReplayThresholdPerConsumer
should also be increased, or set to 0"
+ + " (disabled), in that case.\n"
+ + "Setting this value to 0 will disable the limit
calculated per subscription. Disabling"
+ + " it might result in the number of unacked ranges to
persist exceeding"
+ + " managedLedgerMaxUnackedRangesToPersist, therefore
disabling is not recommended.\n",
dynamic = true
)
- private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000;
+ private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 40000;
@FieldContext(
category = CATEGORY_POLICIES,
@@ -2552,8 +2562,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ " messages are acknowledged is persisted by compressing in
`ranges` of messages"
+ " that were acknowledged. After the max number of ranges is
reached, the information"
+ " will only be tracked in memory and messages will be
redelivered in case of"
- + " crashes.")
- private int managedLedgerMaxUnackedRangesToPersist = 10000;
+ + " crashes.\n"
+ + "Note: when managedLedgerPersistIndividualAckAsLongArray is
enabled (the default), the persisted"
+ + " size is bounded by the backlog size (the range of entries the
cursor spans), not by this max"
+ + " number of unacked ranges. For BookKeeper ledger storage, with
the default broker maxMessageSize"
+ + " and BookKeeper nettyMaxFrameSizeBytes, the state fits a
backlog of about 30M entries (excluding"
+ + " the managedLedgerMaxBatchDeletedIndexToPersist storage, whose
size is instead relative to the"
+ + " number of acknowledgment holes).")
+ private int managedLedgerMaxUnackedRangesToPersist = 200000;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Maximum number of partially acknowledged batch messages per
subscription that will have their batch "
@@ -2562,7 +2578,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ "When this limit is exceeded, remaining batch message
containing the batch deleted indexes will "
+ "only be tracked in memory. In case of broker restarts or
load balancing events, the batch "
+ "deleted indexes will be cleared while redelivering the
messages to consumers.")
- private int managedLedgerMaxBatchDeletedIndexToPersist = 10000;
+ private int managedLedgerMaxBatchDeletedIndexToPersist = 200000;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "When storing acknowledgement state, choose a more compact
serialization format that stores"
@@ -2589,7 +2605,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "Max number of `acknowledgment holes` that can be stored in
MetadataStore.\n\n"
+ "If number of unack message range is higher than this
limit then broker will persist"
+ " unacked ranges into bookkeeper to avoid additional
data overhead into MetadataStore.")
- private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000;
+ private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 200000;
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "When set to true, a BitSet will be used to track
acknowledged messages that come after the \"mark "
@@ -2668,7 +2684,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "ManagedLedgerInfo compression type, option values (NONE,
LZ4, ZLIB, ZSTD, SNAPPY). \n"
+ "If value is invalid or NONE, then save the
ManagedLedgerInfo bytes data directly.")
- private String managedLedgerInfoCompressionType = "NONE";
+ private String managedLedgerInfoCompressionType = "LZ4";
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "ManagedLedgerInfo compression size threshold (bytes), "
@@ -2680,7 +2696,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "ManagedCursorInfo compression type, option values (NONE,
LZ4, ZLIB, ZSTD, SNAPPY). \n"
+ "If value is NONE, then save the ManagedCursorInfo bytes
data directly.")
- private String managedCursorInfoCompressionType = "NONE";
+ private String managedCursorInfoCompressionType = "LZ4";
@FieldContext(category = CATEGORY_STORAGE_ML,