This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b018bd89e557ffc38c5072231db06bde899ccb75 Author: ZhangJian He <[email protected]> AuthorDate: Wed Oct 20 15:57:40 2021 +0800 Allow to config pulsar client allocator out of memory policy (#12200) (cherry picked from commit 3641f29ef9b98bf0a3995dcc413374b1481d4f46) --- pom.xml | 1 + .../common/allocator/PulsarByteBufAllocator.java | 4 ++ .../PulsarByteBufAllocatorDefaultTest.java | 58 ++++++++++++++++++++ ...ulsarByteBufAllocatorOomThrowExceptionTest.java | 63 ++++++++++++++++++++++ 4 files changed, 126 insertions(+) diff --git a/pom.xml b/pom.xml index 52c3ca6..8befa9e 100644 --- a/pom.xml +++ b/pom.xml @@ -1261,6 +1261,7 @@ flexible messaging model and an intuitive client API.</description> -Dpulsar.allocator.pooled=false -Dpulsar.allocator.leak_detection=Advanced -Dpulsar.allocator.exit_on_oom=false + -Dpulsar.allocator.out_of_memory_policy=FallbackToHeap -Dio.netty.tryReflectionSetAccessible=true ${test.additional.args} </argLine> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java index 5f05640..dcb2df5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java @@ -27,6 +27,7 @@ import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder; import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy; +import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy; import org.apache.bookkeeper.common.allocator.PoolingPolicy; /** @@ -39,6 +40,7 @@ public class PulsarByteBufAllocator { public static final String PULSAR_ALLOCATOR_POOLED = "pulsar.allocator.pooled"; public static final String PULSAR_ALLOCATOR_EXIT_ON_OOM = "pulsar.allocator.exit_on_oom"; public static final String PULSAR_ALLOCATOR_LEAK_DETECTION = "pulsar.allocator.leak_detection"; + public static final String PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY = "pulsar.allocator.out_of_memory_policy"; public static final ByteBufAllocator DEFAULT; @@ -53,6 +55,7 @@ public class PulsarByteBufAllocator { static { boolean isPooled = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_POOLED, "true")); EXIT_ON_OOM = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_EXIT_ON_OOM, "false")); + OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.valueOf(System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY, "FallbackToHeap")); LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy .valueOf(System.getProperty(PULSAR_ALLOCATOR_LEAK_DETECTION, "Disabled")); @@ -85,6 +88,7 @@ public class PulsarByteBufAllocator { } else { builder.poolingPolicy(PoolingPolicy.UnpooledHeap); } + builder.outOfMemoryPolicy(outOfMemoryPolicy); DEFAULT = builder.build(); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java new file mode 100644 index 0000000..503a17d --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.allocator; + +import io.netty.buffer.ByteBufAllocator; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy; +import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy; +import org.apache.bookkeeper.common.allocator.PoolingPolicy; +import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl; +import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.testng.IObjectFactory; +import org.testng.annotations.ObjectFactory; +import org.testng.annotations.Test; + +@PrepareForTest({ByteBufAllocatorImpl.class, ByteBufAllocatorBuilderImpl.class}) +@PowerMockIgnore({"javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*"}) +@Slf4j +public class PulsarByteBufAllocatorDefaultTest { + + @ObjectFactory + public IObjectFactory getObjectFactory() { + return new org.powermock.modules.testng.PowerMockObjectFactory(); + } + + @Test + public void testDefaultConfig() throws Exception { + final ByteBufAllocatorImpl mockAllocator = PowerMockito.mock(ByteBufAllocatorImpl.class); + PowerMockito.whenNew(ByteBufAllocatorImpl.class).withAnyArguments().thenReturn(mockAllocator); + final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT; + // use the variable, in case the compiler optimization + log.trace("{}", byteBufAllocator); + PowerMockito.verifyNew(ByteBufAllocatorImpl.class).withArguments(Mockito.any(ByteBufAllocator.class), Mockito.any(), + Mockito.eq(PoolingPolicy.PooledDirect), Mockito.any(), Mockito.eq(OutOfMemoryPolicy.FallbackToHeap), + Mockito.any(), Mockito.eq(LeakDetectionPolicy.Advanced)); + } + +} \ No newline at end of file diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java new file mode 100644 index 0000000..a23e6899 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.allocator; + +import io.netty.buffer.ByteBufAllocator; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy; +import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy; +import org.apache.bookkeeper.common.allocator.PoolingPolicy; +import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl; +import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.testng.IObjectFactory; +import org.testng.annotations.ObjectFactory; +import org.testng.annotations.Test; + +@PrepareForTest({ByteBufAllocatorImpl.class, ByteBufAllocatorBuilderImpl.class}) +@PowerMockIgnore({"javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*"}) +@Slf4j +public class PulsarByteBufAllocatorOomThrowExceptionTest { + + @ObjectFactory + public IObjectFactory getObjectFactory() { + return new org.powermock.modules.testng.PowerMockObjectFactory(); + } + + @Test + public void testDefaultConfig() throws Exception { + try { + System.setProperty("pulsar.allocator.out_of_memory_policy", "ThrowException"); + final ByteBufAllocatorImpl mockAllocator = PowerMockito.mock(ByteBufAllocatorImpl.class); + PowerMockito.whenNew(ByteBufAllocatorImpl.class).withAnyArguments().thenReturn(mockAllocator); + final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT; + // use the variable, in case the compiler optimization + log.trace("{}", byteBufAllocator); + PowerMockito.verifyNew(ByteBufAllocatorImpl.class).withArguments(Mockito.any(ByteBufAllocator.class), Mockito.any(), + Mockito.eq(PoolingPolicy.PooledDirect), Mockito.any(), Mockito.eq(OutOfMemoryPolicy.ThrowException), + Mockito.any(), Mockito.eq(LeakDetectionPolicy.Advanced)); + } finally { + System.clearProperty("pulsar.allocator.out_of_memory_policy"); + } + } + +}
