This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 524db74c58f [improve][broker] PIP-434: add configurations to 
broker.conf (#24800)
524db74c58f is described below

commit 524db74c58f86691d85f50b032dd0b0d32abe06b
Author: fengyubiao <[email protected]>
AuthorDate: Thu Oct 9 12:30:37 2025 +0800

    [improve][broker] PIP-434: add configurations to broker.conf (#24800)
---
 conf/broker.conf                                   | 31 ++++++++++++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++---
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../pulsar/broker/service/StandaloneTest.java      |  6 +++++
 .../common/naming/ServiceConfigurationTest.java    |  7 ++++-
 .../configurations/pulsar_broker_test.conf         |  7 +++++
 .../pulsar_broker_test_standalone.conf             |  8 ++++++
 7 files changed, 62 insertions(+), 5 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index a1f59bd3eed..8fd0e18af36 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -330,6 +330,37 @@ maxTopicsPerNamespace=0
 # The maximum number of connections in the broker. If it exceeds, new 
connections are rejected.
 brokerMaxConnections=0
 
+# It relates to configuration "WriteBufferHighWaterMark" of Netty Channel 
Config. If the number of bytes
+# queued in the write buffer exceeds this value, channel writable state will 
start to return "false".
+pulsarChannelWriteBufferHighWaterMark=65536
+
+# It relates to configuration "WriteBufferLowWaterMark" of Netty Channel 
Config. If the number of bytes"
+# queued in the write buffer is smaller than this value, channel writable 
state will start to return "true".
+pulsarChannelWriteBufferLowWaterMark=32768
+
+# If enabled, the broker will pause reading from the channel to deal with new 
request once the writer
+# buffer is full, until it is changed to writable.
+pulsarChannelPauseReceivingRequestsIfUnwritable=false
+
+# After the connection is recovered from a pause receiving state, the channel 
will be rate-limited
+# for a time window to avoid overwhelming due to the backlog of requests. This 
parameter defines
+# how long the rate limiting should last, in millis. Once the bytes that are 
waiting to be sent out
+# reach the "pulsarChannelWriteBufferHighWaterMark"? the timer will be reset. 
Setting a negative
+# value will disable the rate limiting.
+pulsarChannelPauseReceivingCooldownMs=5000
+
+# After the connection is recovered from a pause receiving state, the channel 
will be rate-limited for a
+# period of time to avoid overwhelming due to the backlog of requests. This 
parameter defines how
+# many requests should be allowed in the rate limiting period.
+pulsarChannelPauseReceivingCooldownRateLimitPermits=5
+
+# After the connection is recovered from a pause receiving state, the channel 
will be rate-limited for a
+# period of time defined by pulsarChannelPauseReceivingCooldownMs to avoid 
overwhelming due to the
+# backlog of requests. This parameter defines the period of the rate limiter 
in milliseconds. If the rate
+# limit period is set to 1000, then the unit is requests per 1000 
milliseconds. When it's 10, the unit
+# is requests per every 10ms.
+pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=10
+
 # The maximum number of connections per IP. If it exceeds, new connections are 
rejected.
 brokerMaxConnectionsPerIp=0
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a59cf07075a..5ca0db944a4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -951,8 +951,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 
     @FieldContext(
             category = CATEGORY_POLICIES,
-            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
-                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+            doc = "After the connection is recovered from a pause receiving 
state, the channel will be rate-limited"
+                + " for a time window to avoid overwhelming due to the backlog 
of requests. This parameter defines"
                 + " how long the rate limiting should last, in millis. Once 
the bytes that are waiting to be sent out"
                 + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
                 + " value will disable the rate limiting."
@@ -973,7 +973,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
             + " period of time defined by 
pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the"
             + " backlog of requests. This parameter defines the period of the 
rate limiter in milliseconds. If the rate"
-            + " limit period is set to 1000, then the unit is requests per 
1000 milli seconds. When it's 10, the unit"
+            + " limit period is set to 1000, then the unit is requests per 
1000 milliseconds. When it's 10, the unit"
             + " is requests per every 10ms."
 
     )
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 011b54c0b0a..4d21b2810cd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -489,7 +489,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     ctx.channel().config().setAutoRead(true);
                     pausedDueToRateLimitation = false;
                 }
-            }, 1, TimeUnit.SECONDS);
+            }, requestRateLimiter.getPeriodAtMs(), TimeUnit.MILLISECONDS);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
index dc35e2d382d..9ab972e6be5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
@@ -65,5 +65,11 @@ public class StandaloneTest {
         
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
         assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 
200);
         
assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(),
 true);
+        
assertEquals(standalone.getConfig().getPulsarChannelWriteBufferHighWaterMark(), 
60000);
+        
assertEquals(standalone.getConfig().getPulsarChannelWriteBufferLowWaterMark(), 
120000);
+        
assertEquals(standalone.getConfig().isPulsarChannelPauseReceivingRequestsIfUnwritable(),
 true);
+        
assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownMs(), 
10_000);
+        
assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPermits(),
 100);
+        
assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(),
 200);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 1802bd6f59c..ed551c8c1bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -78,7 +78,12 @@ public class ServiceConfigurationTest {
         assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), 
true);
         assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
         assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
-        assertEquals(config.isCreateTopicToRemoteClusterForReplication(), 
false);
+        assertEquals(config.getPulsarChannelWriteBufferHighWaterMark(), 60000);
+        assertEquals(config.getPulsarChannelWriteBufferLowWaterMark(), 120000);
+        
assertEquals(config.isPulsarChannelPauseReceivingRequestsIfUnwritable(), true);
+        assertEquals(config.getPulsarChannelPauseReceivingCooldownMs(), 
10_000);
+        
assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPermits(), 
100);
+        
assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(), 
200);
         OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.create(config.getProperties());
         
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(),
 "bookkeeper-first");
     }
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index 0fdb29e0686..5ce477550e5 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -95,6 +95,13 @@ 
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
 maxMessagePublishBufferSizeInMB=-1
 dispatcherPauseOnAckStatePersistentEnabled=true
 
+pulsarChannelWriteBufferHighWaterMark=60000
+pulsarChannelWriteBufferLowWaterMark=120000
+pulsarChannelPauseReceivingRequestsIfUnwritable=true
+pulsarChannelPauseReceivingCooldownMs=10000
+pulsarChannelPauseReceivingCooldownRateLimitPermits=100
+pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200
+
 ### --- Transaction config variables --- ###
 transactionLogBatchedWriteEnabled=true
 transactionLogBatchedWriteMaxRecords=11
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index d3f9430f29b..0c393b1db62 100644
--- 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++ 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -95,6 +95,14 @@ 
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
 defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
 maxMessagePublishBufferSizeInMB=-1
 dispatcherPauseOnAckStatePersistentEnabled=true
+
+pulsarChannelWriteBufferHighWaterMark=60000
+pulsarChannelWriteBufferLowWaterMark=120000
+pulsarChannelPauseReceivingRequestsIfUnwritable=true
+pulsarChannelPauseReceivingCooldownMs=10000
+pulsarChannelPauseReceivingCooldownRateLimitPermits=100
+pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200
+
 topicNameCacheMaxCapacity=200
 maxSecondsToClearTopicNameCache=1
 createTopicToRemoteClusterForReplication=true

Reply via email to