This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 563a7cb2341 [Broker] Disable memory limit controller for broker client
and replication clients (#15723)
563a7cb2341 is described below
commit 563a7cb2341e74deba4d2aa3ee0470c288ab49eb
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 25 03:49:21 2022 +0300
[Broker] Disable memory limit controller for broker client and replication
clients (#15723)
- disable memory limit by default for broker client and replication clients
- restore maxPendingMessages and maxPendingMessagesAcrossPartitions when
memory limit is
disabled so that pre-PIP-120 default configuration is restored when limit
is disabled
---
.../main/java/org/apache/pulsar/broker/PulsarService.java | 4 ++++
.../org/apache/pulsar/broker/service/BrokerService.java | 5 +++++
.../apache/pulsar/client/impl/MemoryLimitController.java | 4 ++++
.../org/apache/pulsar/client/impl/PulsarClientImpl.java | 13 ++++++++++++-
4 files changed, 25 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b4e13f4fcf7..e3349a9f964 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1397,6 +1397,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (this.client == null) {
try {
ClientConfigurationData conf = new ClientConfigurationData();
+
+ // Disable memory limit for broker client
+ conf.setMemoryLimitBytes(0);
+
conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
? this.brokerServiceUrlTls :
this.brokerServiceUrl);
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bb61e06eada..daea3e43828 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -126,6 +126,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -1176,6 +1177,10 @@ public class BrokerService implements Closeable {
.enableTcpNoDelay(false)
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
.statsInterval(0, TimeUnit.SECONDS);
+
+ // Disable memory limit for replication client
+ clientBuilder.memoryLimit(0, SizeUnit.BYTES);
+
if (data.getAuthenticationPlugin() != null &&
data.getAuthenticationParameters() != null) {
clientBuilder.authentication(data.getAuthenticationPlugin(),
data.getAuthenticationParameters());
} else if
(pulsar.getConfiguration().isAuthenticationEnabled()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index e4d8388a02e..086959078a5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -106,4 +106,8 @@ public class MemoryLimitController {
public double currentUsagePercent() {
return 1.0 * currentUsage.get() / memoryLimit;
}
+
+ public boolean isMemoryLimited() {
+ return memoryLimit > 0;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e00f1f2e97c..b6f4050b452 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -95,6 +95,10 @@ public class PulsarClientImpl implements PulsarClient {
private static final int CLOSE_TIMEOUT_SECONDS = 60;
private static final double
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;
+ // default limits for producers when memory limit controller is disabled
+ private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES =
1000;
+ private static final int
NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;
+
protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;
private LookupService lookup;
@@ -262,7 +266,14 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
- return new ProducerBuilderImpl<>(this, schema);
+ ProducerBuilderImpl<T> producerBuilder = new
ProducerBuilderImpl<>(this, schema);
+ if (!memoryLimitController.isMemoryLimited()) {
+ // set default limits for producers when memory limit controller
is disabled
+
producerBuilder.maxPendingMessages(NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES);
+ producerBuilder.maxPendingMessagesAcrossPartitions(
+
NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
+ }
+ return producerBuilder;
}
@Override