This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b018bd89e557ffc38c5072231db06bde899ccb75
Author: ZhangJian He <[email protected]>
AuthorDate: Wed Oct 20 15:57:40 2021 +0800

    Allow to config pulsar client allocator out of memory policy (#12200)
    
    
    (cherry picked from commit 3641f29ef9b98bf0a3995dcc413374b1481d4f46)
---
 pom.xml                                            |  1 +
 .../common/allocator/PulsarByteBufAllocator.java   |  4 ++
 .../PulsarByteBufAllocatorDefaultTest.java         | 58 ++++++++++++++++++++
 ...ulsarByteBufAllocatorOomThrowExceptionTest.java | 63 ++++++++++++++++++++++
 4 files changed, 126 insertions(+)

diff --git a/pom.xml b/pom.xml
index 52c3ca6..8befa9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1261,6 +1261,7 @@ flexible messaging model and an intuitive client 
API.</description>
             -Dpulsar.allocator.pooled=false
             -Dpulsar.allocator.leak_detection=Advanced
             -Dpulsar.allocator.exit_on_oom=false
+            -Dpulsar.allocator.out_of_memory_policy=FallbackToHeap
             -Dio.netty.tryReflectionSetAccessible=true
             ${test.additional.args}
           </argLine>
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
index 5f05640..dcb2df5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
@@ -27,6 +27,7 @@ import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
 import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 
 /**
@@ -39,6 +40,7 @@ public class PulsarByteBufAllocator {
     public static final String PULSAR_ALLOCATOR_POOLED = 
"pulsar.allocator.pooled";
     public static final String PULSAR_ALLOCATOR_EXIT_ON_OOM = 
"pulsar.allocator.exit_on_oom";
     public static final String PULSAR_ALLOCATOR_LEAK_DETECTION = 
"pulsar.allocator.leak_detection";
+    public static final String PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY = 
"pulsar.allocator.out_of_memory_policy";
 
     public static final ByteBufAllocator DEFAULT;
 
@@ -53,6 +55,7 @@ public class PulsarByteBufAllocator {
     static {
         boolean isPooled = 
"true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_POOLED, "true"));
         EXIT_ON_OOM = 
"true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_EXIT_ON_OOM, 
"false"));
+        OutOfMemoryPolicy outOfMemoryPolicy = 
OutOfMemoryPolicy.valueOf(System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY,
 "FallbackToHeap"));
 
         LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy
                 .valueOf(System.getProperty(PULSAR_ALLOCATOR_LEAK_DETECTION, 
"Disabled"));
@@ -85,6 +88,7 @@ public class PulsarByteBufAllocator {
         } else {
             builder.poolingPolicy(PoolingPolicy.UnpooledHeap);
         }
+        builder.outOfMemoryPolicy(outOfMemoryPolicy);
 
         DEFAULT = builder.build();
     }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
new file mode 100644
index 0000000..503a17d
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.allocator;
+
+import io.netty.buffer.ByteBufAllocator;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl;
+import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({ByteBufAllocatorImpl.class, 
ByteBufAllocatorBuilderImpl.class})
+@PowerMockIgnore({"javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*"})
+@Slf4j
+public class PulsarByteBufAllocatorDefaultTest {
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testDefaultConfig() throws Exception {
+        final ByteBufAllocatorImpl mockAllocator = 
PowerMockito.mock(ByteBufAllocatorImpl.class);
+        
PowerMockito.whenNew(ByteBufAllocatorImpl.class).withAnyArguments().thenReturn(mockAllocator);
+        final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) 
PulsarByteBufAllocator.DEFAULT;
+        // use the variable, in case the compiler optimization
+        log.trace("{}", byteBufAllocator);
+        
PowerMockito.verifyNew(ByteBufAllocatorImpl.class).withArguments(Mockito.any(ByteBufAllocator.class),
 Mockito.any(),
+                Mockito.eq(PoolingPolicy.PooledDirect), Mockito.any(), 
Mockito.eq(OutOfMemoryPolicy.FallbackToHeap),
+                Mockito.any(), Mockito.eq(LeakDetectionPolicy.Advanced));
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
new file mode 100644
index 0000000..a23e6899
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.allocator;
+
+import io.netty.buffer.ByteBufAllocator;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl;
+import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({ByteBufAllocatorImpl.class, 
ByteBufAllocatorBuilderImpl.class})
+@PowerMockIgnore({"javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*"})
+@Slf4j
+public class PulsarByteBufAllocatorOomThrowExceptionTest {
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testDefaultConfig() throws Exception {
+        try {
+            System.setProperty("pulsar.allocator.out_of_memory_policy", 
"ThrowException");
+            final ByteBufAllocatorImpl mockAllocator = 
PowerMockito.mock(ByteBufAllocatorImpl.class);
+            
PowerMockito.whenNew(ByteBufAllocatorImpl.class).withAnyArguments().thenReturn(mockAllocator);
+            final ByteBufAllocatorImpl byteBufAllocator = 
(ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT;
+            // use the variable, in case the compiler optimization
+            log.trace("{}", byteBufAllocator);
+            
PowerMockito.verifyNew(ByteBufAllocatorImpl.class).withArguments(Mockito.any(ByteBufAllocator.class),
 Mockito.any(),
+                    Mockito.eq(PoolingPolicy.PooledDirect), Mockito.any(), 
Mockito.eq(OutOfMemoryPolicy.ThrowException),
+                    Mockito.any(), Mockito.eq(LeakDetectionPolicy.Advanced));
+        } finally {
+            System.clearProperty("pulsar.allocator.out_of_memory_policy");
+        }
+    }
+
+}

Reply via email to