This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1b74fe07bd2 [improve][broker] Part-1 of PIP-434: Expose Netty channel
configuration WRITE_BUFFER_WATER_MARK to pulsar conf and pause receive requests
when channel is unwritable (#24423)
1b74fe07bd2 is described below
commit 1b74fe07bd24e079434769de2c52222761d88ca1
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 25 03:45:17 2025 +0800
[improve][broker] Part-1 of PIP-434: Expose Netty channel configuration
WRITE_BUFFER_WATER_MARK to pulsar conf and pause receive requests when channel
is unwritable (#24423)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 52 ++++
.../broker/service/PulsarChannelInitializer.java | 2 +
.../apache/pulsar/broker/service/ServerCnx.java | 76 ++++-
.../pulsar/utils/TimedSingleThreadRateLimiter.java | 83 +++++
.../broker/service/utils/ClientChannelHelper.java | 3 +-
.../pulsar/client/api/MockBrokerService.java | 3 +-
.../api/PatternConsumerBackPressureTest.java | 99 ++++++
.../utils/TimedSingleThreadRateLimiterTest.java | 337 +++++++++++++++++++++
.../pulsar/common/protocol/PulsarDecoder.java | 4 +-
.../pulsar/common/protocol/PulsarHandler.java | 3 +-
.../pulsar/common/protocol/PulsarDecoderTest.java | 3 +-
.../pulsar/proxy/server/DirectProxyHandler.java | 3 +-
12 files changed, 657 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5d2d94ea477..a59cf07075a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -927,6 +927,58 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int brokerMaxConnections = 0;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferHighWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer exceeds this value, channel
writable state will start to return \"false\"."
+ )
+ private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferLowWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer is smaller than this value,
channel writable state will start to return"
+ + " \"true\"."
+ )
+ private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "If enabled, the broker will pause reading from the channel to
deal with new request once the writer"
+ + " buffer is full, until it is changed to writable."
+ )
+ private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from an pause receiving
state, the channel will be rate-limited"
+ + " for a of time window to avoid overwhelming due to the
backlog of requests. This parameter defines"
+ + " how long the rate limiting should last, in millis. Once
the bytes that are waiting to be sent out"
+ + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the
timer will be reset. Setting a negative"
+ + " value will disable the rate limiting."
+ )
+ private int pulsarChannelPauseReceivingCooldownMs = 5000;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from a pause receiving state,
the channel will be rate-limited for a"
+ + " period of time to avoid overwhelming due to the backlog of
requests. This parameter defines how"
+ + " many requests should be allowed in the rate limiting period."
+
+ )
+ private int pulsarChannelPauseReceivingCooldownRateLimitPermits = 5;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from a pause receiving state,
the channel will be rate-limited for a"
+ + " period of time defined by
pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the"
+ + " backlog of requests. This parameter defines the period of the
rate limiter in milliseconds. If the rate"
+ + " limit period is set to 1000, then the unit is requests per
1000 milli seconds. When it's 10, the unit"
+ + " is requests per every 10ms."
+
+ )
+ private int pulsarChannelPauseReceivingCooldownRateLimitPeriodMs = 10;
+
@FieldContext(
category = CATEGORY_POLICIES,
doc = "The maximum number of connections per IP. If it exceeds, new
connections are rejected."
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 68da1083d22..dffd8260d46 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -81,6 +81,8 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
// disable auto read explicitly so that requests aren't served until
auto read is enabled
// ServerCnx must enable auto read in channelActive after
PulsarService is ready to accept incoming requests
ch.config().setAutoRead(false);
+
ch.config().setWriteBufferHighWaterMark(pulsar.getConfig().getPulsarChannelWriteBufferHighWaterMark());
+
ch.config().setWriteBufferLowWaterMark(pulsar.getConfig().getPulsarChannelWriteBufferLowWaterMark());
ch.pipeline().addLast("consolidation", new
FlushConsolidationHandler(1024, true));
if (this.enableTls) {
ch.pipeline().addLast(TLS_HANDLER, new
SslHandler(this.sslFactory.createServerSslEngine(ch.alloc())));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1c61d4c467f..011b54c0b0a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -37,6 +37,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelOutboundBuffer;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.FastThreadLocal;
@@ -182,6 +183,7 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.utils.TimedSingleThreadRateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -192,6 +194,8 @@ import org.slf4j.LoggerFactory;
* parameter instance lifecycle.
*/
public class ServerCnx extends PulsarHandler implements TransportCnx {
+ private static final Logger PAUSE_RECEIVING_LOG =
LoggerFactory.getLogger(ServerCnx.class.getName()
+ + ".pauseReceiving");
private final BrokerService service;
private final SchemaRegistryService schemaService;
private final String listenerName;
@@ -251,6 +255,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private final long connectionLivenessCheckTimeoutMillis;
private final TopicsPattern.RegexImplementation
topicsPatternImplementation;
+ private final boolean pauseReceivingRequestsIfUnwritable;
+ private final TimedSingleThreadRateLimiter requestRateLimiter;
+ private final int pauseReceivingCooldownMilliSeconds;
+ private boolean pausedDueToRateLimitation = false;
// Tracks and limits number of bytes pending to be published from a single
specific IO thread.
static final class PendingBytesPerThreadTracker {
@@ -314,6 +322,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// the null check is a workaround for #13620
super(pulsar.getBrokerService() != null ?
pulsar.getBrokerService().getKeepAliveIntervalSeconds() : 0,
TimeUnit.SECONDS);
+ this.pauseReceivingRequestsIfUnwritable =
+
pulsar.getConfig().isPulsarChannelPauseReceivingRequestsIfUnwritable();
+ this.requestRateLimiter = new TimedSingleThreadRateLimiter(
+
pulsar.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPermits(),
+
pulsar.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(),
+ TimeUnit.MILLISECONDS);
+ this.pauseReceivingCooldownMilliSeconds =
+ pulsar.getConfig().getPulsarChannelPauseReceivingCooldownMs();
this.service = pulsar.getBrokerService();
this.schemaService = pulsar.getSchemaRegistryService();
this.listenerName = listenerName;
@@ -442,11 +458,62 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
}
+ private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand
cmd) {
+ if (!pauseReceivingRequestsIfUnwritable
+ || pauseReceivingCooldownMilliSeconds <= 0 || cmd.getType() ==
BaseCommand.Type.PONG
+ || cmd.getType() == BaseCommand.Type.PING) {
+ return;
+ }
+ if (PAUSE_RECEIVING_LOG.isDebugEnabled()) {
+ final ChannelOutboundBuffer outboundBuffer =
ctx.channel().unsafe().outboundBuffer();
+ if (outboundBuffer != null) {
+ PAUSE_RECEIVING_LOG.debug("Start to handle request [{}],
totalPendingWriteBytes: {}, channel"
+ + " isWritable: {}", cmd.getType(),
outboundBuffer.totalPendingWriteBytes(),
+ ctx.channel().isWritable());
+ } else {
+ PAUSE_RECEIVING_LOG.debug("Start to handle request [{}],
channel isWritable: {}",
+ cmd.getType(), ctx.channel().isWritable());
+ }
+ }
+ // "requestRateLimiter" will return the permits that you acquired if
it is not opening(has been called
+ // "timingOpen(duration)").
+ if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) {
+ log.warn("[{}] Reached rate limitation", this);
+ // Stop receiving requests.
+ pausedDueToRateLimitation = true;
+ ctx.channel().config().setAutoRead(false);
+ // Resume after 1 second.
+ ctx.channel().eventLoop().schedule(() -> {
+ if (pausedDueToRateLimitation) {
+ log.info("[{}] Resuming connection after rate limitation",
this);
+ ctx.channel().config().setAutoRead(true);
+ pausedDueToRateLimitation = false;
+ }
+ }, 1, TimeUnit.SECONDS);
+ }
+ }
+
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
- if (log.isDebugEnabled()) {
- log.debug("Channel writability has changed to: {}",
ctx.channel().isWritable());
+ if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
+ log.info("[{}] is writable, turn on channel auto-read", this);
+ ctx.channel().config().setAutoRead(true);
+ requestRateLimiter.timingOpen(pauseReceivingCooldownMilliSeconds,
TimeUnit.MILLISECONDS);
+ } else if (pauseReceivingRequestsIfUnwritable &&
!ctx.channel().isWritable()) {
+ final ChannelOutboundBuffer outboundBuffer =
ctx.channel().unsafe().outboundBuffer();
+ if (outboundBuffer != null) {
+ if (PAUSE_RECEIVING_LOG.isDebugEnabled()) {
+ PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off
channel auto-read,"
+ + " totalPendingWriteBytes: {}", this,
outboundBuffer.totalPendingWriteBytes());
+ }
+ } else {
+ if (PAUSE_RECEIVING_LOG.isDebugEnabled()) {
+ PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off
channel auto-read", this);
+ }
+ }
+ ctx.channel().config().setAutoRead(false);
}
+ ctx.fireChannelWritabilityChanged();
}
@Override
@@ -3652,8 +3719,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
@Override
- protected void messageReceived() {
- super.messageReceived();
+ protected void messageReceived(BaseCommand cmd) {
+ checkPauseReceivingRequestsAfterResumeRateLimit(cmd);
+ super.messageReceived(cmd);
if (connectionCheckInProgress != null &&
!connectionCheckInProgress.isDone()) {
connectionCheckInProgress.complete(Optional.of(true));
connectionCheckInProgress = null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiter.java
new file mode 100644
index 00000000000..80043fb67f8
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TimedSingleThreadRateLimiter {
+
+ @Getter
+ private final int rate;
+ @Getter
+ private final long periodAtMs;
+ private long lastTimeReset;
+ @Getter
+ private int remaining;
+ private long closeAfterAtMs;
+
+ public TimedSingleThreadRateLimiter(final int rate, final long period,
final TimeUnit unit) {
+ this.rate = rate;
+ this.periodAtMs = unit.toMillis(period);
+ this.lastTimeReset = System.currentTimeMillis();
+ this.remaining = rate;
+ }
+
+ public int acquire(int permits) {
+ final long now = System.currentTimeMillis();
+ if (permits < 0) {
+ return 0;
+ }
+ if (now > closeAfterAtMs) {
+ return permits;
+ }
+ mayRenew(now);
+ if (remaining > permits) {
+ remaining -= permits;
+ if (log.isDebugEnabled()) {
+ log.debug("acquired: {}, remaining:{}", permits, remaining);
+ }
+ return permits;
+ } else {
+ int acquired = remaining;
+ remaining = 0;
+ if (log.isDebugEnabled()) {
+ log.debug("acquired: {}, remaining:{}", acquired, remaining);
+ }
+ return acquired;
+ }
+ }
+
+ public void timingOpen(long closeAfter, final TimeUnit unit) {
+ if (closeAfter <= 0) {
+ this.closeAfterAtMs = 0;
+ } else {
+ this.closeAfterAtMs = System.currentTimeMillis() +
unit.toMillis(closeAfter);
+ }
+ }
+
+ private void mayRenew(long now) {
+ if (now > lastTimeReset + periodAtMs) {
+ remaining = rate;
+ lastTimeReset = now;
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 0ad8ca8f1c7..d6210767ed1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.util.Queue;
+import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
@@ -71,7 +72,7 @@ public class ClientChannelHelper {
private final PulsarDecoder decoder = new PulsarDecoder() {
@Override
- protected void messageReceived() {
+ protected void messageReceived(BaseCommand cmd) {
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index e66880738cf..b7e0ee42903 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -46,6 +46,7 @@ import
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSendHook;
import
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSubscribeHook;
import
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandTopicLookupHook;
import
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandUnsubscribeHook;
+import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
@@ -132,7 +133,7 @@ public class MockBrokerService {
}
@Override
- protected void messageReceived() {
+ protected void messageReceived(BaseCommand cmd) {
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureTest.java
new file mode 100644
index 00000000000..aa3e17c2ea0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PatternConsumerBackPressureTest extends
MockedPulsarServiceBaseTest {
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ isTcpLookup = true;
+ conf.setEnableBrokerSideSubscriptionPatternEvaluation(false);
+ super.internalSetup();
+ setupDefaultTenantAndNamespace();
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ conf.setPulsarChannelPauseReceivingRequestsIfUnwritable(true);
+ // 5m.
+ conf.setPulsarChannelWriteBufferHighWaterMark(1 * 1024 * 1024);
+ // 32k.
+ conf.setPulsarChannelWriteBufferLowWaterMark(32 * 1024);
+ }
+
+ @Test(timeOut = 60 * 1000)
+ public void testInfiniteGetThousandsTopics() throws PulsarAdminException,
InterruptedException {
+ final int topicCount = 8192;
+ final int requests = 2048;
+ final String topicName = UUID.randomUUID().toString();
+ admin.topics().createPartitionedTopic(topicName, topicCount);
+ final ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime()
+ .availableProcessors());
+
+ final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl)
pulsarClient;
+ final AtomicInteger success = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(requests);
+ for (int i = 0; i < requests; i++) {
+ executorService.execute(() -> {
+ pulsarClientImpl.getLookup()
+ .getTopicsUnderNamespace(NamespaceName.get("public",
"default"),
+ CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*",
"")
+ .whenComplete((result, ex) -> {
+ if (ex == null) {
+ success.incrementAndGet();
+ } else {
+ log.error("Failed to get topic list.", ex);
+ }
+ log.info("latch-count: {}, succeed: {}",
latch.getCount(), success.get());
+ latch.countDown();
+ });
+ });
+ }
+ latch.await();
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(success.get(), requests);
+ });
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiterTest.java
new file mode 100644
index 00000000000..183d68e81cc
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiterTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import static org.testng.Assert.assertEquals;
+import java.util.concurrent.TimeUnit;
+import org.testng.annotations.Test;
+
+/**
+ * Comprehensive test suite for TimedSingleThreadRateLimiter class.
+ */
+public class TimedSingleThreadRateLimiterTest {
+
+ @Test
+ public void testConstructorAndGetters() {
+ int rate = 100;
+ long period = 5;
+ TimeUnit unit = TimeUnit.SECONDS;
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(rate, period, unit);
+ assertEquals(limiter.getRate(), rate);
+ assertEquals(limiter.getPeriodAtMs(), unit.toMillis(period));
+ assertEquals(limiter.getRemaining(), rate); // Initially should have
all permits
+ }
+
+ @Test
+ public void testConstructorWithDifferentTimeUnits() {
+ // Test with milliseconds
+ TimedSingleThreadRateLimiter limiterMs = new
TimedSingleThreadRateLimiter(50, 1000, TimeUnit.MILLISECONDS);
+ assertEquals(limiterMs.getPeriodAtMs(), 1000);
+ // Test with seconds
+ TimedSingleThreadRateLimiter limiterSec = new
TimedSingleThreadRateLimiter(50, 2, TimeUnit.SECONDS);
+ assertEquals(limiterSec.getPeriodAtMs(), 2000);
+ // Test with minutes
+ TimedSingleThreadRateLimiter limiterMin = new
TimedSingleThreadRateLimiter(50, 1, TimeUnit.MINUTES);
+ assertEquals(limiterMin.getPeriodAtMs(), 60000);
+ }
+
+ @Test
+ public void testBasicAcquire() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(100, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Test acquiring single permit
+ int acquired = limiter.acquire(1);
+ assertEquals(acquired, 1);
+ assertEquals(limiter.getRemaining(), 99);
+ // Test acquiring multiple permits
+ acquired = limiter.acquire(10);
+ assertEquals(acquired, 10);
+ assertEquals(limiter.getRemaining(), 89);
+ }
+
+ @Test
+ public void testAcquireMoreThanRemaining() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Acquire most permits
+ int acquired = limiter.acquire(8);
+ assertEquals(acquired, 8);
+ assertEquals(limiter.getRemaining(), 2);
+ // Try to acquire more than remaining
+ acquired = limiter.acquire(5);
+ assertEquals(acquired, 2); // Should only get remaining permits
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testAcquireWhenNoPermitsRemaining() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(5, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Exhaust all permits
+ limiter.acquire(5);
+ assertEquals(limiter.getRemaining(), 0);
+ // Try to acquire when no permits left
+ int acquired = limiter.acquire(3);
+ assertEquals(acquired, 0);
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testAcquireZeroPermits() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ int acquired = limiter.acquire(0);
+ assertEquals(acquired, 0);
+ assertEquals(limiter.getRemaining(), 10); // Should remain unchanged
+ }
+
+ @Test
+ public void testPermitRenewalAfterPeriod() throws InterruptedException {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 100, TimeUnit.MILLISECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Exhaust all permits
+ limiter.acquire(10);
+ assertEquals(limiter.getRemaining(), 0);
+ // Wait for period to pass
+ Thread.sleep(150);
+ // Acquire should trigger renewal
+ int acquired = limiter.acquire(5);
+ assertEquals(acquired, 5);
+ assertEquals(limiter.getRemaining(), 5);
+ }
+
+ @Test
+ public void testNoRenewalBeforePeriodExpires() throws InterruptedException
{
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Exhaust all permits
+ limiter.acquire(10);
+ assertEquals(limiter.getRemaining(), 0);
+ // Should not renew yet
+ int acquired = limiter.acquire(5);
+ assertEquals(acquired, 0);
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testTimingOpen() throws Exception {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+ // Set timing to open for 500ms
+ limiter.timingOpen(500, TimeUnit.MILLISECONDS);
+ // During open period.
+ int acquired = limiter.acquire(15);
+ assertEquals(acquired, 10);
+ assertEquals(limiter.getRemaining(), 0);
+ // Closed.
+ Thread.sleep(1000);
+ int acquired2 = limiter.acquire(1000);
+ assertEquals(acquired2, 1000);
+ }
+
+ @Test
+ public void testTimingOpenWithZeroDuration() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+ // Set timing to open for 0 duration.
+ limiter.timingOpen(0, TimeUnit.MILLISECONDS);
+ // Closed.
+ int acquired = limiter.acquire(7000);
+ assertEquals(acquired, 7000);
+ }
+
+ @Test
+ public void testHighRateAcquisition() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(1000, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Acquire permits in chunks
+ int totalAcquired = 0;
+ for (int i = 0; i < 10; i++) {
+ totalAcquired += limiter.acquire(100);
+ }
+ assertEquals(totalAcquired, 1000);
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testLowRateAcquisition() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(3, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Acquire all permits one by one
+ assertEquals(limiter.acquire(1), 1);
+ assertEquals(limiter.getRemaining(), 2);
+ assertEquals(limiter.acquire(1), 1);
+ assertEquals(limiter.getRemaining(), 1);
+ assertEquals(limiter.acquire(1), 1);
+ assertEquals(limiter.getRemaining(), 0);
+ // No more permits available
+ assertEquals(limiter.acquire(1), 0);
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testRenewalWithPartialAcquisition() throws
InterruptedException {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 100, TimeUnit.MILLISECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Acquire some permits
+ limiter.acquire(6);
+ assertEquals(limiter.getRemaining(), 4);
+ // Wait for renewal
+ Thread.sleep(150);
+ // After renewal, should have full rate again
+ int acquired = limiter.acquire(8);
+ assertEquals(acquired, 8);
+ assertEquals(limiter.getRemaining(), 2);
+ }
+
+ @Test
+ public void testConcurrentBehaviorSimulation() throws InterruptedException
{
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(20, 100, TimeUnit.MILLISECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Simulate rapid acquisitions
+ int totalAcquired = 0;
+ for (int i = 0; i < 5; i++) {
+ totalAcquired += limiter.acquire(5);
+ }
+ assertEquals(totalAcquired, 20);
+ assertEquals(limiter.getRemaining(), 0);
+ // Wait for renewal
+ Thread.sleep(150);
+ // Should be able to acquire again
+ int newAcquired = limiter.acquire(10);
+ assertEquals(newAcquired, 10);
+ assertEquals(limiter.getRemaining(), 10);
+ }
+
+ @Test
+ public void testVeryShortPeriod() throws InterruptedException {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(5, 10, TimeUnit.MILLISECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Exhaust permits
+ limiter.acquire(5);
+ assertEquals(limiter.getRemaining(), 0);
+ // Wait for very short period
+ Thread.sleep(20);
+ // Should renew quickly
+ int acquired = limiter.acquire(3);
+ assertEquals(acquired, 3);
+ assertEquals(limiter.getRemaining(), 2);
+ }
+
+ @Test
+ public void testVeryLongPeriod() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 1, TimeUnit.HOURS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ assertEquals(limiter.getPeriodAtMs(), TimeUnit.HOURS.toMillis(1));
+ // Acquire some permits
+ int acquired = limiter.acquire(7);
+ assertEquals(acquired, 7);
+ assertEquals(limiter.getRemaining(), 3);
+ // Even after a short wait, should not renew (period is 1 hour)
+ int acquired2 = limiter.acquire(5);
+ assertEquals(acquired2, 3); // Only remaining permits
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testSinglePermitRate() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(1, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ assertEquals(limiter.getRate(), 1);
+ assertEquals(limiter.getRemaining(), 1);
+ // Acquire the only permit
+ int acquired = limiter.acquire(1);
+ assertEquals(acquired, 1);
+ assertEquals(limiter.getRemaining(), 0);
+ // Try to acquire more
+ acquired = limiter.acquire(1);
+ assertEquals(acquired, 0);
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testLargePermitRequest() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Request much more than available
+ int acquired = limiter.acquire(1000);
+ assertEquals(acquired, 10); // Should get all available permits
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testNegativePermitRequest() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Request negative permits (edge case)
+ int acquired = limiter.acquire(-5);
+ // The implementation doesn't explicitly handle negative permits
+ // This test documents the current behavior
+ assertEquals(acquired, 0); // Should not return negative
+ assertEquals(limiter.getRemaining(), 10); // Remaining should not go
negative
+ }
+
+ @Test
+ public void testMultipleRenewalCycles() throws InterruptedException {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(5, 50, TimeUnit.MILLISECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // First cycle
+ limiter.acquire(5);
+ assertEquals(limiter.getRemaining(), 0);
+ // Wait for first renewal
+ Thread.sleep(60);
+ limiter.acquire(3);
+ assertEquals(limiter.getRemaining(), 2);
+ // Wait for second renewal
+ Thread.sleep(60);
+ int acquired = limiter.acquire(4);
+ assertEquals(acquired, 4);
+ assertEquals(limiter.getRemaining(), 1);
+ // Wait for third renewal
+ Thread.sleep(60);
+ acquired = limiter.acquire(5);
+ assertEquals(acquired, 5);
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testRapidAcquisitionPattern() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(100, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Simulate rapid small acquisitions
+ int totalAcquired = 0;
+ for (int i = 0; i < 50; i++) {
+ totalAcquired += limiter.acquire(2);
+ }
+ assertEquals(totalAcquired, 100);
+ assertEquals(limiter.getRemaining(), 0);
+ }
+
+ @Test
+ public void testBurstAcquisitionPattern() {
+ TimedSingleThreadRateLimiter limiter = new
TimedSingleThreadRateLimiter(50, 1, TimeUnit.SECONDS);
+ limiter.timingOpen(10, TimeUnit.SECONDS);
+ // Large burst acquisition
+ int acquired1 = limiter.acquire(30);
+ assertEquals(acquired1, 30);
+ assertEquals(limiter.getRemaining(), 20);
+ // Another burst
+ int acquired2 = limiter.acquire(25);
+ assertEquals(acquired2, 20); // Only remaining permits
+ assertEquals(limiter.getRemaining(), 0);
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index c05b1d796df..b61664d9571 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -124,7 +124,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
if (log.isDebugEnabled()) {
log.debug("[{}] Received cmd {}", ctx.channel(),
cmd.getType());
}
- messageReceived();
+ messageReceived(cmd);
switch (cmd.getType()) {
case PARTITIONED_METADATA:
@@ -486,7 +486,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
}
}
- protected abstract void messageReceived();
+ protected abstract void messageReceived(BaseCommand cmd);
private ServerError getServerError(int errorCode) {
ServerError serverError = ServerError.valueOf(errorCode);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 020b753086f..e8010ea1a51 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -26,6 +26,7 @@ import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import lombok.Setter;
+import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
@@ -61,7 +62,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
}
@Override
- protected void messageReceived() {
+ protected void messageReceived(BaseCommand cmd) {
waitingForPingResponse = false;
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
index 6fd77c16624..af68f6cd28b 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
+import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
import org.testng.annotations.Test;
@@ -43,7 +44,7 @@ public class PulsarDecoderTest {
}
@Override
- protected void messageReceived() {
+ protected void messageReceived(BaseCommand cmd) {
}
});
decoder.channelRead(mock(ChannelHandlerContext.class), cmdBuf);
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 775108a75e6..5f4456d356e 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -55,6 +55,7 @@ import
org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.FeatureFlags;
@@ -395,7 +396,7 @@ public class DirectProxyHandler {
}
@Override
- protected void messageReceived() {
+ protected void messageReceived(BaseCommand cmd) {
// no-op
}