This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ec52320f16b [Broker, Functions, Websocket] Disable memory limit
controller in internal Pulsar clients (#15752)
ec52320f16b is described below
commit ec52320f16b1f7e04f4cef16dc3082779dfd4d50
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 27 10:52:13 2022 +0300
[Broker, Functions, Websocket] Disable memory limit controller in internal
Pulsar clients (#15752)
---
.../java/org/apache/pulsar/broker/namespace/NamespaceService.java | 6 ++++--
.../src/main/java/org/apache/pulsar/compaction/CompactorTool.java | 4 +++-
.../java/org/apache/pulsar/functions/instance/InstanceUtils.java | 4 +++-
.../pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java | 5 ++++-
.../main/java/org/apache/pulsar/functions/worker/WorkerUtils.java | 5 ++++-
.../src/main/java/org/apache/pulsar/websocket/WebSocketService.java | 2 ++
6 files changed, 20 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 38fda886793..d3811343e4a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -1259,8 +1260,9 @@ public class NamespaceService implements AutoCloseable {
return namespaceClients.computeIfAbsent(cluster, key -> {
try {
ClientBuilder clientBuilder = PulsarClient.builder()
- .enableTcpNoDelay(false)
- .statsInterval(0, TimeUnit.SECONDS);
+ .memoryLimit(0, SizeUnit.BYTES)
+ .enableTcpNoDelay(false)
+ .statsInterval(0, TimeUnit.SECONDS);
if (pulsar.getConfiguration().isAuthenticationEnabled()) {
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 8e397afcf91..562b69c05e4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -103,7 +104,8 @@ public class CompactorTool {
);
}
- ClientBuilder clientBuilder = PulsarClient.builder();
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES);
if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 1ba4ffa473a..7eb21093d90 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -157,7 +157,9 @@ public class InstanceUtils {
Optional<Long>
memoryLimit) throws PulsarClientException {
ClientBuilder clientBuilder = null;
if (isNotBlank(pulsarServiceUrl)) {
- clientBuilder =
PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+ clientBuilder = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES)
+ .serviceUrl(pulsarServiceUrl);
if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&&
isNotBlank(authConfig.getClientAuthenticationParameters())) {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
index f13fd0c646e..430344ff22c 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
@@ -36,6 +36,8 @@ import org.testng.annotations.Test;
import java.util.Map;
import java.util.Optional;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mockStatic;
@@ -79,7 +81,7 @@ public class ThreadRuntimeFactoryTest {
ClientBuilder clientBuilder = testMemoryLimit(null, null);
- Mockito.verify(clientBuilder,
Mockito.times(0)).memoryLimit(Mockito.anyLong(), Mockito.any());
+ Mockito.verify(clientBuilder,
Mockito.times(1)).memoryLimit(Mockito.eq(0L), Mockito.eq(SizeUnit.BYTES));
}
@Test
@@ -110,6 +112,7 @@ public class ThreadRuntimeFactoryTest {
ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class);
mockedPulsarClient.when(() -> PulsarClient.builder()).thenAnswer(i
-> clientBuilder);
doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
+ doReturn(clientBuilder).when(clientBuilder).memoryLimit(anyLong(),
any());
ThreadRuntimeFactoryConfig threadRuntimeFactoryConfig = new
ThreadRuntimeFactoryConfig();
threadRuntimeFactoryConfig.setThreadGroupName("foo");
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index aa8ba57acdb..899e42b6589 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
@@ -263,7 +264,9 @@ public final class WorkerUtils {
Boolean
enableTlsHostnameVerificationEnable) {
try {
- ClientBuilder clientBuilder =
PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES)
+ .serviceUrl(pulsarServiceUrl);
if (isNotBlank(authPlugin)
&& isNotBlank(authParams)) {
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index ab17239da02..a0d477953d4 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -171,6 +172,7 @@ public class WebSocketService implements Closeable {
private PulsarClient createClientInstance(ClusterData clusterData) throws
IOException {
ClientBuilder clientBuilder = PulsarClient.builder() //
+ .memoryLimit(0, SizeUnit.BYTES)
.statsInterval(0, TimeUnit.SECONDS) //
.enableTls(config.isTlsEnabled()) //
.allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //