This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ec52320f16b [Broker, Functions, Websocket] Disable memory limit 
controller in internal Pulsar clients (#15752)
ec52320f16b is described below

commit ec52320f16b1f7e04f4cef16dc3082779dfd4d50
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 27 10:52:13 2022 +0300

    [Broker, Functions, Websocket] Disable memory limit controller in internal 
Pulsar clients (#15752)
---
 .../java/org/apache/pulsar/broker/namespace/NamespaceService.java   | 6 ++++--
 .../src/main/java/org/apache/pulsar/compaction/CompactorTool.java   | 4 +++-
 .../java/org/apache/pulsar/functions/instance/InstanceUtils.java    | 4 +++-
 .../pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java   | 5 ++++-
 .../main/java/org/apache/pulsar/functions/worker/WorkerUtils.java   | 5 ++++-
 .../src/main/java/org/apache/pulsar/websocket/WebSocketService.java | 2 ++
 6 files changed, 20 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 38fda886793..d3811343e4a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -1259,8 +1260,9 @@ public class NamespaceService implements AutoCloseable {
         return namespaceClients.computeIfAbsent(cluster, key -> {
             try {
                 ClientBuilder clientBuilder = PulsarClient.builder()
-                    .enableTcpNoDelay(false)
-                    .statsInterval(0, TimeUnit.SECONDS);
+                        .memoryLimit(0, SizeUnit.BYTES)
+                        .enableTcpNoDelay(false)
+                        .statsInterval(0, TimeUnit.SECONDS);
 
                 if (pulsar.getConfiguration().isAuthenticationEnabled()) {
                     
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 8e397afcf91..562b69c05e4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -103,7 +104,8 @@ public class CompactorTool {
             );
         }
 
-        ClientBuilder clientBuilder = PulsarClient.builder();
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .memoryLimit(0, SizeUnit.BYTES);
 
         if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
             
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 1ba4ffa473a..7eb21093d90 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -157,7 +157,9 @@ public class InstanceUtils {
                                                           Optional<Long> 
memoryLimit) throws PulsarClientException {
         ClientBuilder clientBuilder = null;
         if (isNotBlank(pulsarServiceUrl)) {
-            clientBuilder = 
PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+            clientBuilder = PulsarClient.builder()
+                    .memoryLimit(0, SizeUnit.BYTES)
+                    .serviceUrl(pulsarServiceUrl);
             if (authConfig != null) {
                 if (isNotBlank(authConfig.getClientAuthenticationPlugin())
                         && 
isNotBlank(authConfig.getClientAuthenticationParameters())) {
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
index f13fd0c646e..430344ff22c 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
@@ -36,6 +36,8 @@ import org.testng.annotations.Test;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mockStatic;
@@ -79,7 +81,7 @@ public class ThreadRuntimeFactoryTest {
 
         ClientBuilder clientBuilder = testMemoryLimit(null, null);
 
-        Mockito.verify(clientBuilder, 
Mockito.times(0)).memoryLimit(Mockito.anyLong(), Mockito.any());
+        Mockito.verify(clientBuilder, 
Mockito.times(1)).memoryLimit(Mockito.eq(0L), Mockito.eq(SizeUnit.BYTES));
     }
 
     @Test
@@ -110,6 +112,7 @@ public class ThreadRuntimeFactoryTest {
             ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class);
             mockedPulsarClient.when(() -> PulsarClient.builder()).thenAnswer(i 
-> clientBuilder);
             
doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
+            doReturn(clientBuilder).when(clientBuilder).memoryLimit(anyLong(), 
any());
 
             ThreadRuntimeFactoryConfig threadRuntimeFactoryConfig = new 
ThreadRuntimeFactoryConfig();
             threadRuntimeFactoryConfig.setThreadGroupName("foo");
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index aa8ba57acdb..899e42b6589 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
@@ -263,7 +264,9 @@ public final class WorkerUtils {
                                                Boolean 
enableTlsHostnameVerificationEnable) {
 
         try {
-            ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+            ClientBuilder clientBuilder = PulsarClient.builder()
+                    .memoryLimit(0, SizeUnit.BYTES)
+                    .serviceUrl(pulsarServiceUrl);
 
             if (isNotBlank(authPlugin)
                     && isNotBlank(authParams)) {
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index ab17239da02..a0d477953d4 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -171,6 +172,7 @@ public class WebSocketService implements Closeable {
 
     private PulsarClient createClientInstance(ClusterData clusterData) throws 
IOException {
         ClientBuilder clientBuilder = PulsarClient.builder() //
+                .memoryLimit(0, SizeUnit.BYTES)
                 .statsInterval(0, TimeUnit.SECONDS) //
                 .enableTls(config.isTlsEnabled()) //
                 
.allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //

Reply via email to