This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a56f04140e9 [Broker] Disable memory limit controller for broker client
and replication clients (#15723)
a56f04140e9 is described below
commit a56f04140e94cefcaad3ce3d92b70c13337227b7
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 25 03:49:21 2022 +0300
[Broker] Disable memory limit controller for broker client and replication
clients (#15723)
- disable memory limit by default for broker client and replication clients
- restore maxPendingMessages and maxPendingMessagesAcrossPartitions when
memory limit is
disabled so that pre-PIP-120 default configuration is restored when limit
is disabled
---
.../main/java/org/apache/pulsar/broker/PulsarService.java | 4 ++++
.../org/apache/pulsar/broker/service/BrokerService.java | 5 +++++
.../org/apache/pulsar/client/impl/PulsarClientImpl.java | 13 ++++++++++++-
3 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 73843071f98..1c226b2705a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1364,6 +1364,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (this.client == null) {
try {
ClientConfigurationData conf = new ClientConfigurationData();
+
+ // Disable memory limit for broker client
+ conf.setMemoryLimitBytes(0);
+
conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
? this.brokerServiceUrlTls :
this.brokerServiceUrl);
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ff3d248d592..6fe48994bf0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -127,6 +127,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -1199,6 +1200,10 @@ public class BrokerService implements Closeable {
.enableTcpNoDelay(false)
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
.statsInterval(0, TimeUnit.SECONDS);
+
+ // Disable memory limit for replication client
+ clientBuilder.memoryLimit(0, SizeUnit.BYTES);
+
if (data.getAuthenticationPlugin() != null &&
data.getAuthenticationParameters() != null) {
clientBuilder.authentication(data.getAuthenticationPlugin(),
data.getAuthenticationParameters());
} else if
(pulsar.getConfiguration().isAuthenticationEnabled()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 7930477b4a9..c2bf8a216e2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -92,6 +92,10 @@ public class PulsarClientImpl implements PulsarClient {
private static final Logger log =
LoggerFactory.getLogger(PulsarClientImpl.class);
+ // default limits for producers when memory limit controller is disabled
+ private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES =
1000;
+ private static final int
NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;
+
protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;
private LookupService lookup;
@@ -251,7 +255,14 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
- return new ProducerBuilderImpl<>(this, schema);
+ ProducerBuilderImpl<T> producerBuilder = new
ProducerBuilderImpl<>(this, schema);
+ if (!memoryLimitController.isMemoryLimited()) {
+ // set default limits for producers when memory limit controller
is disabled
+
producerBuilder.maxPendingMessages(NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES);
+ producerBuilder.maxPendingMessagesAcrossPartitions(
+
NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
+ }
+ return producerBuilder;
}
@Override