This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a85fdcec5cb3ed311f50206ac650ae0cc0f68b94 Author: Lari Hotari <[email protected]> AuthorDate: Fri May 27 10:52:13 2022 +0300 [Broker, Functions, Websocket] Disable memory limit controller in internal Pulsar clients (#15752) (cherry picked from commit ec52320f16b1f7e04f4cef16dc3082779dfd4d50) --- .../java/org/apache/pulsar/broker/namespace/NamespaceService.java | 6 ++++-- .../src/main/java/org/apache/pulsar/compaction/CompactorTool.java | 5 +++-- .../java/org/apache/pulsar/functions/instance/InstanceUtils.java | 4 +++- .../pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java | 5 ++++- .../main/java/org/apache/pulsar/functions/worker/WorkerUtils.java | 5 ++++- .../src/main/java/org/apache/pulsar/websocket/WebSocketService.java | 2 ++ 6 files changed, 20 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 58abf48d9b0..9cc427aae67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -63,6 +63,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -1253,8 +1254,9 @@ public class NamespaceService implements AutoCloseable { return namespaceClients.computeIfAbsent(cluster, key -> { try { ClientBuilder clientBuilder = PulsarClient.builder() - .enableTcpNoDelay(false) - .statsInterval(0, TimeUnit.SECONDS); + .memoryLimit(0, SizeUnit.BYTES) + .enableTcpNoDelay(false) + .statsInterval(0, TimeUnit.SECONDS); // Apply all arbitrary configuration. This must be called before setting any fields annotated as // @Secret on the ClientConfigurationData object because of the way they are serialized. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index 691217d9d77..59dd4c20aab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -38,6 +38,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.CmdGenerateDocs; @@ -104,8 +105,8 @@ public class CompactorTool { ); } - ClientBuilder clientBuilder = PulsarClient.builder(); - + ClientBuilder clientBuilder = PulsarClient.builder() + .memoryLimit(0, SizeUnit.BYTES); // Apply all arbitrary configuration. This must be called before setting any fields annotated as // @Secret on the ClientConfigurationData object because of the way they are serialized. // See https://github.com/apache/pulsar/issues/8509 for more information. diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index d52c7dbefdb..d19e79d4026 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -161,7 +161,9 @@ public class InstanceUtils { Optional<Long> memoryLimit) throws PulsarClientException { ClientBuilder clientBuilder = null; if (isNotBlank(pulsarServiceUrl)) { - clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl); + clientBuilder = PulsarClient.builder() + .memoryLimit(0, SizeUnit.BYTES) + .serviceUrl(pulsarServiceUrl); if (authConfig != null) { if (isNotBlank(authConfig.getClientAuthenticationPlugin()) && isNotBlank(authConfig.getClientAuthenticationParameters())) { diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java index c86e1d5f2d6..9462463c05a 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java @@ -36,6 +36,8 @@ import org.testng.annotations.Test; import java.util.Map; import java.util.Optional; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mockStatic; @@ -79,7 +81,7 @@ public class ThreadRuntimeFactoryTest { ClientBuilder clientBuilder = testMemoryLimit(null, null); - Mockito.verify(clientBuilder, Mockito.times(0)).memoryLimit(Mockito.anyLong(), Mockito.any()); + Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(0L), Mockito.eq(SizeUnit.BYTES)); } @Test @@ -110,6 +112,7 @@ public class ThreadRuntimeFactoryTest { ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class); mockedPulsarClient.when(() -> PulsarClient.builder()).thenAnswer(i -> clientBuilder); doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString()); + doReturn(clientBuilder).when(clientBuilder).memoryLimit(anyLong(), any()); ThreadRuntimeFactoryConfig threadRuntimeFactoryConfig = new ThreadRuntimeFactoryConfig(); threadRuntimeFactoryConfig.setThreadGroupName("foo"); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index d50d57ee27c..828b4e16516 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.functions.WorkerInfo; @@ -295,7 +296,9 @@ public final class WorkerUtils { WorkerConfig workerConfig) { try { - ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl); + ClientBuilder clientBuilder = PulsarClient.builder() + .memoryLimit(0, SizeUnit.BYTES) + .serviceUrl(pulsarServiceUrl); if (workerConfig != null) { // Apply all arbitrary configuration. This must be called before setting any fields annotated as diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index dbac405a0e0..a57c6c491e7 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -37,6 +37,7 @@ import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; @@ -175,6 +176,7 @@ public class WebSocketService implements Closeable { private PulsarClient createClientInstance(ClusterData clusterData) throws IOException { ClientBuilder clientBuilder = PulsarClient.builder() // + .memoryLimit(0, SizeUnit.BYTES) .statsInterval(0, TimeUnit.SECONDS) // .enableTls(config.isTlsEnabled()) // .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //
