This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new a56f04140e9 [Broker] Disable memory limit controller for broker client 
and replication clients (#15723)
a56f04140e9 is described below

commit a56f04140e94cefcaad3ce3d92b70c13337227b7
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 25 03:49:21 2022 +0300

    [Broker] Disable memory limit controller for broker client and replication 
clients (#15723)
    
    - disable memory limit by default for broker client and replication clients
    - restore maxPendingMessages and maxPendingMessagesAcrossPartitions when 
memory limit is
      disabled so that pre-PIP-120 default configuration is restored when limit 
is disabled
---
 .../main/java/org/apache/pulsar/broker/PulsarService.java   |  4 ++++
 .../org/apache/pulsar/broker/service/BrokerService.java     |  5 +++++
 .../org/apache/pulsar/client/impl/PulsarClientImpl.java     | 13 ++++++++++++-
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 73843071f98..1c226b2705a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1364,6 +1364,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         if (this.client == null) {
             try {
                 ClientConfigurationData conf = new ClientConfigurationData();
+
+                // Disable memory limit for broker client
+                conf.setMemoryLimitBytes(0);
+
                 conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
                                 ? this.brokerServiceUrlTls : 
this.brokerServiceUrl);
                 
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ff3d248d592..6fe48994bf0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -127,6 +127,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -1199,6 +1200,10 @@ public class BrokerService implements Closeable {
                         .enableTcpNoDelay(false)
                         
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
                         .statsInterval(0, TimeUnit.SECONDS);
+
+                // Disable memory limit for replication client
+                clientBuilder.memoryLimit(0, SizeUnit.BYTES);
+
                 if (data.getAuthenticationPlugin() != null && 
data.getAuthenticationParameters() != null) {
                     
clientBuilder.authentication(data.getAuthenticationPlugin(), 
data.getAuthenticationParameters());
                 } else if 
(pulsar.getConfiguration().isAuthenticationEnabled()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 7930477b4a9..c2bf8a216e2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -92,6 +92,10 @@ public class PulsarClientImpl implements PulsarClient {
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarClientImpl.class);
 
+    // default limits for producers when memory limit controller is disabled
+    private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES = 
1000;
+    private static final int 
NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;
+
     protected final ClientConfigurationData conf;
     private final boolean createdExecutorProviders;
     private LookupService lookup;
@@ -251,7 +255,14 @@ public class PulsarClientImpl implements PulsarClient {
 
     @Override
     public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
-        return new ProducerBuilderImpl<>(this, schema);
+        ProducerBuilderImpl<T> producerBuilder = new 
ProducerBuilderImpl<>(this, schema);
+        if (!memoryLimitController.isMemoryLimited()) {
+            // set default limits for producers when memory limit controller 
is disabled
+            
producerBuilder.maxPendingMessages(NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES);
+            producerBuilder.maxPendingMessagesAcrossPartitions(
+                    
NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
+        }
+        return producerBuilder;
     }
 
     @Override

Reply via email to