This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3582cfe3b6843b274ea5a6d1140bc869b82a6881
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jun 9 10:20:14 2025 +0300

    [improve][broker] Make maxBatchDeletedIndexToPersist configurable and 
document other related configs (#24392)
    
    (cherry picked from commit d1bca65f9b8907308efe69c887516d3c6b514a10)
---
 conf/broker.conf                                   | 34 +++++++++++++++-------
 conf/standalone.conf                               | 21 +++++++++++--
 .../bookkeeper/mledger/ManagedLedgerConfig.java    | 10 +++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 24 +++++++++++----
 .../pulsar/broker/service/BrokerService.java       |  2 ++
 5 files changed, 73 insertions(+), 18 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index b60870f109f..3c4e28555f6 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1244,6 +1244,14 @@ managedLedgerCursorMaxEntriesPerLedger=50000
 # Max time before triggering a rollover on a cursor ledger
 managedLedgerCursorRolloverTimeInSeconds=14400
 
+# Maximum amount of memory used hold data read from storage (or from the 
cache).
+# This mechanism prevents the broker to have too many concurrent
+# reads from storage and fall into Out of Memory errors in case
+# of multiple concurrent reads to multiple concurrent consumers.
+# Set 0 in order to disable the feature.
+#
+managedLedgerMaxReadsInFlightSizeInMB=0
+
 # Max number of "acknowledgment holes" that are going to be persistently 
stored.
 # When acknowledging out of order, a consumer will leave holes that are 
supposed
 # to be quickly filled by acking all the messages. The information of which
@@ -1253,13 +1261,22 @@ managedLedgerCursorRolloverTimeInSeconds=14400
 # crashes.
 managedLedgerMaxUnackedRangesToPersist=10000
 
-# Maximum amount of memory used hold data read from storage (or from the 
cache).
-# This mechanism prevents the broker to have too many concurrent
-# reads from storage and fall into Out of Memory errors in case
-# of multiple concurrent reads to multiple concurrent consumers.
-# Set 0 in order to disable the feature.
-#
-managedLedgerMaxReadsInFlightSizeInMB=0
+# Maximum number of partially acknowledged batch messages per subscription 
that will have their batch
+# deleted indexes persisted. Batch deleted index state is handled when 
acknowledgmentAtBatchIndexLevelEnabled=true.
+# When this limit is exceeded, remaining batch message containing the batch 
deleted indexes will
+# only be tracked in memory. In case of broker restarts or load balancing 
events, the batch
+# deleted indexes will be cleared while redelivering the messages to consumers.
+managedLedgerMaxBatchDeletedIndexToPersist=10000
+
+# When storing acknowledgement state, choose a more compact serialization 
format that stores
+# individual acknowledgements as a bitmap which is serialized to an array of 
long values. NOTE: This setting requires
+# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
+managedLedgerPersistIndividualAckAsLongArray=false
+
+# When set to true, a BitSet will be used to track acknowledged messages that 
come after the "mark delete position"
+# for each subscription. RoaringBitmap is used as a memory efficient BitSet 
implementation for the acknowledged
+# messages tracking. Unacknowledged ranges are the message ranges excluding 
the acknowledged messages.
+managedLedgerUnackedRangesOpenCacheSetEnabled=true
 
 # Max number of "acknowledgment holes" that can be stored in MetadataStore. If 
number of unack message range is higher
 # than this limit then broker will persist unacked ranges into bookkeeper to 
avoid additional data overhead into
@@ -1721,9 +1738,6 @@ narExtractionDirectory=
 # Maximum prefetch rounds for ledger reading for offloading
 managedLedgerOffloadPrefetchRounds=1
 
-# Use Open Range-Set to cache unacked messages
-managedLedgerUnackedRangesOpenCacheSetEnabled=true
-
 # For Amazon S3 ledger offload, AWS region
 s3ManagedLedgerOffloadRegion=
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 12c479fce6d..5d3f37ec1c8 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -801,6 +801,23 @@ managedLedgerMaxSizePerLedgerMbytes=2048
 # crashes.
 managedLedgerMaxUnackedRangesToPersist=10000
 
+# Maximum number of partially acknowledged batch messages per subscription 
that will have their batch
+# deleted indexes persisted. Batch deleted index state is handled when 
acknowledgmentAtBatchIndexLevelEnabled=true.
+# When this limit is exceeded, remaining batch message containing the batch 
deleted indexes will
+# only be tracked in memory. In case of broker restarts or load balancing 
events, the batch
+# deleted indexes will be cleared while redelivering the messages to consumers.
+managedLedgerMaxBatchDeletedIndexToPersist=10000
+
+# When storing acknowledgement state, choose a more compact serialization 
format that stores
+# individual acknowledgements as a bitmap which is serialized to an array of 
long values. NOTE: This setting requires
+# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
+managedLedgerPersistIndividualAckAsLongArray=false
+
+# When set to true, a BitSet will be used to track acknowledged messages that 
come after the "mark delete position"
+# for each subscription. RoaringBitmap is used as a memory efficient BitSet 
implementation for the acknowledged
+# messages tracking. Unacknowledged ranges are the message ranges excluding 
the acknowledged messages.
+managedLedgerUnackedRangesOpenCacheSetEnabled=true
+
 # Max number of "acknowledgment holes" that can be stored in MetadataStore. If 
number of unack message range is higher
 # than this limit then broker will persist unacked ranges into bookkeeper to 
avoid additional data overhead into
 # MetadataStore.
@@ -835,9 +852,6 @@ managedLedgerMinimumBacklogEntriesForCaching=1000
 # Maximum backlog entry difference to prevent caching entries that can't be 
reused.
 managedLedgerMaxBacklogBetweenCursorsForCaching=1000
 
-# Use Open Range-Set to cache unacked messages
-managedLedgerUnackedRangesOpenCacheSetEnabled=true
-
 # Managed ledger prometheus stats latency rollover seconds (default: 60s)
 managedLedgerPrometheusStatsLatencyRolloverSeconds=60
 
@@ -1322,3 +1336,4 @@ disableBrokerInterceptors=true
 
 # Whether retain null-key message during topic compaction
 topicCompactionRetainNullKey=false
+
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 89cc7e4fde4..924cc211e54 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -488,6 +488,16 @@ public class ManagedLedgerConfig {
         return maxBatchDeletedIndexToPersist;
     }
 
+    /**
+     * Set max batch deleted index that will be persisted and recovered.
+     *
+     * @param maxBatchDeletedIndexToPersist
+     *            max batch deleted index that will be persisted and recovered.
+     */
+    public void setMaxBatchDeletedIndexToPersist(int 
maxBatchDeletedIndexToPersist) {
+        this.maxBatchDeletedIndexToPersist = maxBatchDeletedIndexToPersist;
+    }
+
     public boolean isPersistentUnackedRangesWithMultipleEntriesEnabled() {
         return persistentUnackedRangesWithMultipleEntriesEnabled;
     }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c25d9e9aae1..2e51bec1cda 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2195,10 +2195,22 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             + " will only be tracked in memory and messages will be 
redelivered in case of"
             + " crashes.")
     private int managedLedgerMaxUnackedRangesToPersist = 10000;
-    @FieldContext(
-            category = CATEGORY_STORAGE_ML,
-            doc = "Whether persist cursor ack stats as long arrays, which will 
compress the data and reduce GC rate")
+
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "Maximum number of partially acknowledged batch messages per 
subscription that will have their batch "
+                + "deleted indexes persisted. Batch deleted index state is 
handled when "
+                + "acknowledgmentAtBatchIndexLevelEnabled=true.\n\n"
+                + "When this limit is exceeded, remaining batch message 
containing the batch deleted indexes will "
+                + "only be tracked in memory. In case of broker restarts or 
load balancing events, the batch "
+                + "deleted indexes will be cleared while redelivering the 
messages to consumers.")
+    private int managedLedgerMaxBatchDeletedIndexToPersist = 10000;
+
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "When storing acknowledgement state, choose a more compact 
serialization format that stores"
+                    + " individual acknowledgements as a bitmap which is 
serialized to an array of long values.\n\n"
+                    + "NOTE: This setting requires 
managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.")
     private boolean managedLedgerPersistIndividualAckAsLongArray = false;
+
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
         doc = "If enabled, the maximum \"acknowledgment holes\" will not be 
limited and \"acknowledgment holes\" "
@@ -2221,8 +2233,10 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000;
     @FieldContext(
             category = CATEGORY_STORAGE_OFFLOADING,
-            doc = "Use Open Range-Set to cache unacked messages (it is memory 
efficient but it can take more cpu)"
-        )
+            doc = "When set to true, a BitSet will be used to track 
acknowledged messages that come after the \"mark "
+                    + "delete position\" for each 
subscription.\n\nRoaringBitmap is used as a memory efficient BitSet "
+                    + "implementation for the acknowledged messages tracking. 
Unacknowledged ranges are the message "
+                    + "ranges excluding the acknowledged messages.")
     private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
     @FieldContext(
         dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d1d60095b5b..2427a199f94 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1983,6 +1983,8 @@ public class BrokerService implements Closeable {
 
             managedLedgerConfig
                     
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+            managedLedgerConfig.setMaxBatchDeletedIndexToPersist(
+                    
serviceConfig.getManagedLedgerMaxBatchDeletedIndexToPersist());
             managedLedgerConfig
                     
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
             
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(

Reply via email to