This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new e166f75 Disable channel auto read when publish rate or publish buffer
exceeded (#6550)
e166f75 is described below
commit e166f75ea7dfcaf1170d89955fe2011a3c8dd9c4
Author: lipenghui <[email protected]>
AuthorDate: Thu Mar 19 11:55:16 2020 +0800
Disable channel auto read when publish rate or publish buffer exceeded
(#6550)
Disable channel auto-read when publishing rate or publish buffer exceeded.
Currently, ServerCnx set channel auto-read to false when getting a new message
and publish rate exceeded or publish buffer exceeded. So, it depends on reading
more one message. If there are too many ServerCnx(too many topics or clients),
this will result in publish rate limitations with a large deviation. Here is an
example to show the problem.
Enable publish rate limit in broker.conf
```
brokerPublisherThrottlingTickTimeMillis=1
brokerPublisherThrottlingMaxByteRate=10000000
```
Use Pulsar perf to test 100 partition message publishing:
```
bin/pulsar-perf produce -s 500000 -r 100000 -t 1 100p
```
The test result:
```
10:45:28.844 [main] INFO org.apache.pulsar.testclient.PerformanceProducer
- Throughput produced: 367.8 msg/s --- 1402.9 Mbit/s --- failure 0.0
msg/s --- Latency: mean: 710.008 ms - med: 256.969 - 95pct: 2461.439 - 99pct:
3460.255 - 99.9pct: 4755.007 - 99.99pct: 4755.007 - Max: 4755.007
10:45:38.919 [main] INFO org.apache.pulsar.testclient.PerformanceProducer
- Throughput produced: 456.6 msg/s --- 1741.9 Mbit/s --- failure 0.0
msg/s --- Latency: mean: 2551.341 ms - med: 2347.599 - 95pct: 6852.639 - 99pct:
9630.015 - 99.9pct: 10824.319 - 99.99pct: 10824.319 - Max: 10824.319
10:45:48.959 [main] INFO org.apache.pulsar.testclient.PerformanceProducer
- Throughput produced: 432.0 msg/s --- 1648.0 Mbit/s --- failure 0.0
msg/s --- Latency: mean: 4373.505 ms - med: 3972.047 - 95pct: 11754.687 -
99pct: 15713.663 - 99.9pct: 17638.527 - 99.99pct: 17705.727 - Max: 17705.727
10:45:58.996 [main] INFO org.apache.pulsar.testclient.PerformanceProducer
- Throughput produced: 430.6 msg/s --- 1642.6 Mbit/s --- failure 0.0
msg/s --- Latency: mean: 5993.563 ms - med: 4291.071 - 95pct: 18022.527 -
99pct: 21649.663 - 99.9pct: 24885.375 - 99.99pct: 25335.551 - Max: 25335.551
10:46:09.195 [main] INFO org.apache.pulsar.testclient.PerformanceProducer
- Throughput produced: 403.2 msg/s --- 1538.3 Mbit/s --- failure 0.0
msg/s --- Latency: mean: 7883.304 ms - med: 6184.159 - 95pct: 23625.343 -
99pct: 29524.991 - 99.9pct: 30813.823 - 99.99pct: 31467.775 - Max: 31467.775
10:46:19.314 [main] INFO org.apache.pulsar.testclient.PerformanceProducer
- Throughput produced: 401.1 msg/s --- 1530.1 Mbit/s --- failure 0.0
msg/s --- Latency: mean: 9587.407 ms - med: 6907.007 - 95pct: 28524.927 -
99pct: 34815.999 - 99.9pct: 36759.551 - 99.99pct: 37581.567 - Max: 37581.567
10:46:29.389 [main] INFO org.apache.pulsar.testclient.PerformanceProducer
- Throughput produced: 372.8 msg/s --- 1422.0 Mbit/s --- failure 0.0
msg/s --- Latency: mean: 11984.595 ms - med: 10095.231 - 95pct: 34515.967 -
99pct: 40754.175 - 99.9pct: 43553.535 - 99.99pct: 43603.199 - Max: 43603.199
10:46:39.459 [main] INFO org.apache.pulsar.testclient.PerformanceProducer
- Throughput produced: 374.6 msg/s --- 1429.1 Mbit/s --- failure 0.0
msg/s --- Latency: mean: 12208.459 ms - med: 7807.455 - 95pct: 38799.871 -
99pct: 46936.575 - 99.9pct: 50500.095 - 99.99pct: 50500.095 - Max: 50500.095
10:46:49.537 [main] INFO org.apache.pulsar.testclient.PerformanceProducer
- Throughput produced: 295.6 msg/s --- 1127.5 Mbit/s --- failure 0.0
msg/s --- Latency: mean: 14503.565 ms - med: 10753.087 - 95pct: 45041.407 -
99pct: 54307.327 - 99.9pct: 57786.623 - 99.99pct: 57786.623 - Max: 57786.623
```
Analyze the reasons for such a large deviation is the producer sent batch
messages and ServerCnx read more one message.
This PR can not completely solve the problem but can alleviate this
problem. When the message publish rate exceeded, the broker set channel
auto-read to false for all topics. This will avoid parts of ServerCnx read more
one message.
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
- Does this pull request introduce a new feature? (no)
(cherry picked from commit ec31d549e7aacf8b8fe54be650adc3b36e04446c)
---
.../org/apache/pulsar/broker/service/AbstractTopic.java | 6 ++++++
.../org/apache/pulsar/broker/service/BrokerService.java | 3 +++
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 14 ++++++++++----
3 files changed, 19 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 638c8c7..42d30d9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -296,6 +296,12 @@ public abstract class AbstractTopic implements Topic {
}
}
+ protected void disableProducerRead() {
+ if (producers != null) {
+ producers.values().forEach(producer ->
producer.getCnx().disableCnxAutoRead());
+ }
+ }
+
protected void checkTopicFenced() throws BrokerServiceException {
if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic",
topic);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index eb8a05f..be51fc9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1129,6 +1129,9 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
public void checkBrokerPublishThrottlingRate() {
brokerPublishRateLimiter.checkPublishRate();
+ if (brokerPublishRateLimiter.isPublishRateExceeded()) {
+ forEachTopic(topic -> ((AbstractTopic)
topic).disableProducerRead());
+ }
}
private void refreshBrokerPublishRate() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2a78508..89bf17c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -25,6 +25,7 @@ import static
org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
@@ -143,7 +144,7 @@ public class ServerCnx extends PulsarHandler {
private final boolean schemaValidationEnforced;
private String authMethod = "none";
private final int maxMessageSize;
-
+
// Flag to manage throttling-rate by atomically enable/disable
read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
@@ -1608,10 +1609,15 @@ public class ServerCnx extends PulsarHandler {
ctx.channel().config().setAutoRead(true);
// triggers channel read
ctx.read();
- autoReadDisabledRateLimiting = false;
}
}
-
+
+ void disableCnxAutoRead() {
+ if (ctx.channel().config().isAutoRead() ) {
+ ctx.channel().config().setAutoRead(false);
+ }
+ }
+
private <T> ServerError getErrorCode(CompletableFuture<T> future) {
ServerError error = ServerError.UnknownError;
try {
@@ -1695,4 +1701,4 @@ public class ServerCnx extends PulsarHandler {
public String getClientVersion() {
return clientVersion;
}
-}
\ No newline at end of file
+}