This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 44b55f9eaa46f4cc0ad44c8a1a2aad6356bef906 Author: fengyubiao <[email protected]> AuthorDate: Thu Oct 9 12:30:37 2025 +0800 [improve][broker] PIP-434: add configurations to broker.conf (#24800) (cherry picked from commit 524db74c58f86691d85f50b032dd0b0d32abe06b) --- conf/broker.conf | 31 ++++++++++++++++++++++ .../apache/pulsar/broker/ServiceConfiguration.java | 6 ++--- .../apache/pulsar/broker/service/ServerCnx.java | 2 +- .../pulsar/broker/service/StandaloneTest.java | 6 +++++ .../common/naming/ServiceConfigurationTest.java | 7 ++++- .../configurations/pulsar_broker_test.conf | 7 +++++ .../pulsar_broker_test_standalone.conf | 8 ++++++ 7 files changed, 62 insertions(+), 5 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index ec561141ac0..171d1adf464 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -327,6 +327,37 @@ maxTopicsPerNamespace=0 # The maximum number of connections in the broker. If it exceeds, new connections are rejected. brokerMaxConnections=0 +# It relates to configuration "WriteBufferHighWaterMark" of Netty Channel Config. If the number of bytes +# queued in the write buffer exceeds this value, channel writable state will start to return "false". +pulsarChannelWriteBufferHighWaterMark=65536 + +# It relates to configuration "WriteBufferLowWaterMark" of Netty Channel Config. If the number of bytes" +# queued in the write buffer is smaller than this value, channel writable state will start to return "true". +pulsarChannelWriteBufferLowWaterMark=32768 + +# If enabled, the broker will pause reading from the channel to deal with new request once the writer +# buffer is full, until it is changed to writable. +pulsarChannelPauseReceivingRequestsIfUnwritable=false + +# After the connection is recovered from a pause receiving state, the channel will be rate-limited +# for a time window to avoid overwhelming due to the backlog of requests. This parameter defines +# how long the rate limiting should last, in millis. Once the bytes that are waiting to be sent out +# reach the "pulsarChannelWriteBufferHighWaterMark"? the timer will be reset. Setting a negative +# value will disable the rate limiting. +pulsarChannelPauseReceivingCooldownMs=5000 + +# After the connection is recovered from a pause receiving state, the channel will be rate-limited for a +# period of time to avoid overwhelming due to the backlog of requests. This parameter defines how +# many requests should be allowed in the rate limiting period. +pulsarChannelPauseReceivingCooldownRateLimitPermits=5 + +# After the connection is recovered from a pause receiving state, the channel will be rate-limited for a +# period of time defined by pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the +# backlog of requests. This parameter defines the period of the rate limiter in milliseconds. If the rate +# limit period is set to 1000, then the unit is requests per 1000 milliseconds. When it's 10, the unit +# is requests per every 10ms. +pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=10 + # The maximum number of connections per IP. If it exceeds, new connections are rejected. brokerMaxConnectionsPerIp=0 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 4ce68ca2ad1..aed216aa309 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -943,8 +943,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, - doc = "After the connection is recovered from an pause receiving state, the channel will be rate-limited" - + " for a of time window to avoid overwhelming due to the backlog of requests. This parameter defines" + doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited" + + " for a time window to avoid overwhelming due to the backlog of requests. This parameter defines" + " how long the rate limiting should last, in millis. Once the bytes that are waiting to be sent out" + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the timer will be reset. Setting a negative" + " value will disable the rate limiting." @@ -965,7 +965,7 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a" + " period of time defined by pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the" + " backlog of requests. This parameter defines the period of the rate limiter in milliseconds. If the rate" - + " limit period is set to 1000, then the unit is requests per 1000 milli seconds. When it's 10, the unit" + + " limit period is set to 1000, then the unit is requests per 1000 milliseconds. When it's 10, the unit" + " is requests per every 10ms." ) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 074ed2617e9..1b38d91f888 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -490,7 +490,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ctx.channel().config().setAutoRead(true); pausedDueToRateLimitation = false; } - }, 1, TimeUnit.SECONDS); + }, requestRateLimiter.getPeriodAtMs(), TimeUnit.MILLISECONDS); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index dc35e2d382d..9ab972e6be5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -65,5 +65,11 @@ public class StandaloneTest { assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1); assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200); assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(), true); + assertEquals(standalone.getConfig().getPulsarChannelWriteBufferHighWaterMark(), 60000); + assertEquals(standalone.getConfig().getPulsarChannelWriteBufferLowWaterMark(), 120000); + assertEquals(standalone.getConfig().isPulsarChannelPauseReceivingRequestsIfUnwritable(), true); + assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownMs(), 10_000); + assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPermits(), 100); + assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(), 200); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 1802bd6f59c..ed551c8c1bb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -78,7 +78,12 @@ public class ServiceConfigurationTest { assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true); assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1); assertEquals(config.getTopicNameCacheMaxCapacity(), 200); - assertEquals(config.isCreateTopicToRemoteClusterForReplication(), false); + assertEquals(config.getPulsarChannelWriteBufferHighWaterMark(), 60000); + assertEquals(config.getPulsarChannelWriteBufferLowWaterMark(), 120000); + assertEquals(config.isPulsarChannelPauseReceivingRequestsIfUnwritable(), true); + assertEquals(config.getPulsarChannelPauseReceivingCooldownMs(), 10_000); + assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPermits(), 100); + assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(), 200); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 0fdb29e0686..5ce477550e5 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -95,6 +95,13 @@ defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true +pulsarChannelWriteBufferHighWaterMark=60000 +pulsarChannelWriteBufferLowWaterMark=120000 +pulsarChannelPauseReceivingRequestsIfUnwritable=true +pulsarChannelPauseReceivingCooldownMs=10000 +pulsarChannelPauseReceivingCooldownRateLimitPermits=100 +pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200 + ### --- Transaction config variables --- ### transactionLogBatchedWriteEnabled=true transactionLogBatchedWriteMaxRecords=11 diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index d3f9430f29b..0c393b1db62 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -95,6 +95,14 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true + +pulsarChannelWriteBufferHighWaterMark=60000 +pulsarChannelWriteBufferLowWaterMark=120000 +pulsarChannelPauseReceivingRequestsIfUnwritable=true +pulsarChannelPauseReceivingCooldownMs=10000 +pulsarChannelPauseReceivingCooldownRateLimitPermits=100 +pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200 + topicNameCacheMaxCapacity=200 maxSecondsToClearTopicNameCache=1 createTopicToRemoteClusterForReplication=true
