This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3582cfe3b6843b274ea5a6d1140bc869b82a6881 Author: Lari Hotari <[email protected]> AuthorDate: Mon Jun 9 10:20:14 2025 +0300 [improve][broker] Make maxBatchDeletedIndexToPersist configurable and document other related configs (#24392) (cherry picked from commit d1bca65f9b8907308efe69c887516d3c6b514a10) --- conf/broker.conf | 34 +++++++++++++++------- conf/standalone.conf | 21 +++++++++++-- .../bookkeeper/mledger/ManagedLedgerConfig.java | 10 +++++++ .../apache/pulsar/broker/ServiceConfiguration.java | 24 +++++++++++---- .../pulsar/broker/service/BrokerService.java | 2 ++ 5 files changed, 73 insertions(+), 18 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index b60870f109f..3c4e28555f6 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1244,6 +1244,14 @@ managedLedgerCursorMaxEntriesPerLedger=50000 # Max time before triggering a rollover on a cursor ledger managedLedgerCursorRolloverTimeInSeconds=14400 +# Maximum amount of memory used hold data read from storage (or from the cache). +# This mechanism prevents the broker to have too many concurrent +# reads from storage and fall into Out of Memory errors in case +# of multiple concurrent reads to multiple concurrent consumers. +# Set 0 in order to disable the feature. +# +managedLedgerMaxReadsInFlightSizeInMB=0 + # Max number of "acknowledgment holes" that are going to be persistently stored. # When acknowledging out of order, a consumer will leave holes that are supposed # to be quickly filled by acking all the messages. The information of which @@ -1253,13 +1261,22 @@ managedLedgerCursorRolloverTimeInSeconds=14400 # crashes. managedLedgerMaxUnackedRangesToPersist=10000 -# Maximum amount of memory used hold data read from storage (or from the cache). -# This mechanism prevents the broker to have too many concurrent -# reads from storage and fall into Out of Memory errors in case -# of multiple concurrent reads to multiple concurrent consumers. -# Set 0 in order to disable the feature. -# -managedLedgerMaxReadsInFlightSizeInMB=0 +# Maximum number of partially acknowledged batch messages per subscription that will have their batch +# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true. +# When this limit is exceeded, remaining batch message containing the batch deleted indexes will +# only be tracked in memory. In case of broker restarts or load balancing events, the batch +# deleted indexes will be cleared while redelivering the messages to consumers. +managedLedgerMaxBatchDeletedIndexToPersist=10000 + +# When storing acknowledgement state, choose a more compact serialization format that stores +# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires +# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective. +managedLedgerPersistIndividualAckAsLongArray=false + +# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position" +# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged +# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages. +managedLedgerUnackedRangesOpenCacheSetEnabled=true # Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into @@ -1721,9 +1738,6 @@ narExtractionDirectory= # Maximum prefetch rounds for ledger reading for offloading managedLedgerOffloadPrefetchRounds=1 -# Use Open Range-Set to cache unacked messages -managedLedgerUnackedRangesOpenCacheSetEnabled=true - # For Amazon S3 ledger offload, AWS region s3ManagedLedgerOffloadRegion= diff --git a/conf/standalone.conf b/conf/standalone.conf index 12c479fce6d..5d3f37ec1c8 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -801,6 +801,23 @@ managedLedgerMaxSizePerLedgerMbytes=2048 # crashes. managedLedgerMaxUnackedRangesToPersist=10000 +# Maximum number of partially acknowledged batch messages per subscription that will have their batch +# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true. +# When this limit is exceeded, remaining batch message containing the batch deleted indexes will +# only be tracked in memory. In case of broker restarts or load balancing events, the batch +# deleted indexes will be cleared while redelivering the messages to consumers. +managedLedgerMaxBatchDeletedIndexToPersist=10000 + +# When storing acknowledgement state, choose a more compact serialization format that stores +# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires +# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective. +managedLedgerPersistIndividualAckAsLongArray=false + +# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position" +# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged +# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages. +managedLedgerUnackedRangesOpenCacheSetEnabled=true + # Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into # MetadataStore. @@ -835,9 +852,6 @@ managedLedgerMinimumBacklogEntriesForCaching=1000 # Maximum backlog entry difference to prevent caching entries that can't be reused. managedLedgerMaxBacklogBetweenCursorsForCaching=1000 -# Use Open Range-Set to cache unacked messages -managedLedgerUnackedRangesOpenCacheSetEnabled=true - # Managed ledger prometheus stats latency rollover seconds (default: 60s) managedLedgerPrometheusStatsLatencyRolloverSeconds=60 @@ -1322,3 +1336,4 @@ disableBrokerInterceptors=true # Whether retain null-key message during topic compaction topicCompactionRetainNullKey=false + diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 89cc7e4fde4..924cc211e54 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -488,6 +488,16 @@ public class ManagedLedgerConfig { return maxBatchDeletedIndexToPersist; } + /** + * Set max batch deleted index that will be persisted and recovered. + * + * @param maxBatchDeletedIndexToPersist + * max batch deleted index that will be persisted and recovered. + */ + public void setMaxBatchDeletedIndexToPersist(int maxBatchDeletedIndexToPersist) { + this.maxBatchDeletedIndexToPersist = maxBatchDeletedIndexToPersist; + } + public boolean isPersistentUnackedRangesWithMultipleEntriesEnabled() { return persistentUnackedRangesWithMultipleEntriesEnabled; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c25d9e9aae1..2e51bec1cda 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2195,10 +2195,22 @@ public class ServiceConfiguration implements PulsarConfiguration { + " will only be tracked in memory and messages will be redelivered in case of" + " crashes.") private int managedLedgerMaxUnackedRangesToPersist = 10000; - @FieldContext( - category = CATEGORY_STORAGE_ML, - doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate") + + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "Maximum number of partially acknowledged batch messages per subscription that will have their batch " + + "deleted indexes persisted. Batch deleted index state is handled when " + + "acknowledgmentAtBatchIndexLevelEnabled=true.\n\n" + + "When this limit is exceeded, remaining batch message containing the batch deleted indexes will " + + "only be tracked in memory. In case of broker restarts or load balancing events, the batch " + + "deleted indexes will be cleared while redelivering the messages to consumers.") + private int managedLedgerMaxBatchDeletedIndexToPersist = 10000; + + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "When storing acknowledgement state, choose a more compact serialization format that stores" + + " individual acknowledgements as a bitmap which is serialized to an array of long values.\n\n" + + "NOTE: This setting requires managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.") private boolean managedLedgerPersistIndividualAckAsLongArray = false; + @FieldContext( category = CATEGORY_STORAGE_ML, doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" " @@ -2221,8 +2233,10 @@ public class ServiceConfiguration implements PulsarConfiguration { private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000; @FieldContext( category = CATEGORY_STORAGE_OFFLOADING, - doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)" - ) + doc = "When set to true, a BitSet will be used to track acknowledged messages that come after the \"mark " + + "delete position\" for each subscription.\n\nRoaringBitmap is used as a memory efficient BitSet " + + "implementation for the acknowledged messages tracking. Unacknowledged ranges are the message " + + "ranges excluding the acknowledged messages.") private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true; @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d1d60095b5b..2427a199f94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1983,6 +1983,8 @@ public class BrokerService implements Closeable { managedLedgerConfig .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); + managedLedgerConfig.setMaxBatchDeletedIndexToPersist( + serviceConfig.getManagedLedgerMaxBatchDeletedIndexToPersist()); managedLedgerConfig .setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray()); managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
