This is an automated email from the ASF dual-hosted git repository.
lushiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 15b106c761 Allocator support exitOnOutOfMemory config. (#3984)
15b106c761 is described below
commit 15b106c7610f11dfc091c8f73c28d1666113c25f
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Jul 15 14:25:46 2024 +0800
Allocator support exitOnOutOfMemory config. (#3984)
* Allocator support exitOnOutOfMemory config.
---
bookkeeper-common-allocator/pom.xml | 5 ++
.../common/allocator/ByteBufAllocatorBuilder.java | 2 +
.../impl/ByteBufAllocatorBuilderImpl.java | 9 ++-
.../allocator/impl/ByteBufAllocatorImpl.java | 34 +++++++--
.../bookkeeper/common/util/ShutdownUtil.java | 86 ++++++++++++++++++++++
.../bookkeeper/common/util/package-info.java | 21 ++++++
.../impl/ByteBufAllocatorBuilderTest.java | 28 +++++++
.../apache/bookkeeper/bookie/BookieResources.java | 1 +
.../org/apache/bookkeeper/client/BookKeeper.java | 1 +
.../bookkeeper/conf/AbstractConfiguration.java | 10 +++
.../bookkeeper/conf/AbstractConfigurationTest.java | 10 +++
.../tools/perf/journal/JournalWriter.java | 1 +
12 files changed, 201 insertions(+), 7 deletions(-)
diff --git a/bookkeeper-common-allocator/pom.xml
b/bookkeeper-common-allocator/pom.xml
index ecc5699fbd..4cd8416926 100644
--- a/bookkeeper-common-allocator/pom.xml
+++ b/bookkeeper-common-allocator/pom.xml
@@ -29,6 +29,11 @@
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
index 3e36a23d17..cb244140b3 100644
---
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
+++
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
@@ -92,4 +92,6 @@ public interface ByteBufAllocatorBuilder {
* <p>Default is {@link LeakDetectionPolicy#Disabled}
*/
ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy
leakDetectionPolicy);
+
+ ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory);
}
diff --git
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
index 69c57232af..4b5469a3f7 100644
---
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
+++
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
@@ -37,11 +37,12 @@ public class ByteBufAllocatorBuilderImpl implements
ByteBufAllocatorBuilder {
OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.FallbackToHeap;
Consumer<OutOfMemoryError> outOfMemoryListener = null;
LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy.Disabled;
+ boolean exitOnOutOfMemory = false;
@Override
public ByteBufAllocatorWithOomHandler build() {
return new ByteBufAllocatorImpl(pooledAllocator, unpooledAllocator,
poolingPolicy, poolingConcurrency,
- outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy);
+ outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy,
exitOnOutOfMemory);
}
@Override
@@ -86,4 +87,10 @@ public class ByteBufAllocatorBuilderImpl implements
ByteBufAllocatorBuilder {
return this;
}
+ @Override
+ public ByteBufAllocatorBuilder exitOnOutOfMemory(boolean
exitOnOutOfMemory) {
+ this.exitOnOutOfMemory = exitOnOutOfMemory;
+ return this;
+ }
+
}
diff --git
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
index 87582cca92..3bc06f8e7e 100644
---
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
+++
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
@@ -29,6 +29,7 @@ import
org.apache.bookkeeper.common.allocator.ByteBufAllocatorWithOomHandler;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.common.util.ShutdownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,15 +49,24 @@ public class ByteBufAllocatorImpl extends
AbstractByteBufAllocator implements By
private final PoolingPolicy poolingPolicy;
private final OutOfMemoryPolicy outOfMemoryPolicy;
private Consumer<OutOfMemoryError> outOfMemoryListener;
+ private final boolean exitOnOutOfMemory;
ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator
unpooledAllocator,
PoolingPolicy poolingPolicy, int poolingConcurrency,
OutOfMemoryPolicy outOfMemoryPolicy,
Consumer<OutOfMemoryError> outOfMemoryListener,
LeakDetectionPolicy leakDetectionPolicy) {
- super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
+ this(pooledAllocator, unpooledAllocator, poolingPolicy,
poolingConcurrency, outOfMemoryPolicy,
+ outOfMemoryListener, leakDetectionPolicy, false);
+ }
+ ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator
unpooledAllocator,
+ PoolingPolicy poolingPolicy, int poolingConcurrency,
OutOfMemoryPolicy outOfMemoryPolicy,
+ Consumer<OutOfMemoryError> outOfMemoryListener,
+ LeakDetectionPolicy leakDetectionPolicy, boolean
exitOnOutOfMemory) {
+ super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
this.poolingPolicy = poolingPolicy;
this.outOfMemoryPolicy = outOfMemoryPolicy;
+ this.exitOnOutOfMemory = exitOnOutOfMemory;
if (outOfMemoryListener == null) {
this.outOfMemoryListener = (v) -> {
log.error("Unable to allocate memory", v);
@@ -146,7 +156,7 @@ public class ByteBufAllocatorImpl extends
AbstractByteBufAllocator implements By
: unpooledAllocator;
return alloc.heapBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e) {
- outOfMemoryListener.accept(e);
+ consumeOOMError(e);
throw e;
}
}
@@ -166,12 +176,12 @@ public class ByteBufAllocatorImpl extends
AbstractByteBufAllocator implements By
try {
return unpooledAllocator.heapBuffer(initialCapacity,
maxCapacity);
} catch (OutOfMemoryError e2) {
- outOfMemoryListener.accept(e2);
+ consumeOOMError(e2);
throw e2;
}
} else {
// ThrowException
- outOfMemoryListener.accept(e);
+ consumeOOMError(e);
throw e;
}
}
@@ -181,12 +191,24 @@ public class ByteBufAllocatorImpl extends
AbstractByteBufAllocator implements By
try {
return unpooledAllocator.directBuffer(initialCapacity,
maxCapacity);
} catch (OutOfMemoryError e) {
- outOfMemoryListener.accept(e);
- throw e;
+ consumeOOMError(e);
+ throw e;
}
}
}
+ private void consumeOOMError(OutOfMemoryError outOfMemoryError) {
+ try {
+ outOfMemoryListener.accept(outOfMemoryError);
+ } catch (Throwable e) {
+ log.warn("Consume outOfMemory error failed.", e);
+ }
+ if (exitOnOutOfMemory) {
+ log.info("Exiting JVM process for OOM error: {}",
outOfMemoryError.getMessage(), outOfMemoryError);
+ ShutdownUtil.triggerImmediateForcefulShutdown();
+ }
+ }
+
@Override
public boolean isDirectBufferPooled() {
return pooledAllocator != null &&
pooledAllocator.isDirectBufferPooled();
diff --git
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java
new file mode 100644
index 0000000000..a398b57fe7
--- /dev/null
+++
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Forked from <a href="https://github.com/apache/pulsar">Pulsar</a>.
+ */
+@Slf4j
+public class ShutdownUtil {
+ private static final Method log4j2ShutdownMethod;
+
+ static {
+ // use reflection to find org.apache.logging.log4j.LogManager.shutdown
method
+ Method shutdownMethod = null;
+ try {
+ shutdownMethod =
Class.forName("org.apache.logging.log4j.LogManager")
+ .getMethod("shutdown");
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
+ // ignore when Log4j2 isn't found, log at debug level
+ log.debug("Cannot find
org.apache.logging.log4j.LogManager.shutdown method", e);
+ }
+ log4j2ShutdownMethod = shutdownMethod;
+ }
+
+ /**
+ * Triggers an immediate forceful shutdown of the current process.
+ *
+ * @param status Termination status. By convention, a nonzero status code
indicates abnormal termination.
+ * @see Runtime#halt(int)
+ */
+ public static void triggerImmediateForcefulShutdown(int status) {
+ triggerImmediateForcefulShutdown(status, true);
+ }
+ public static void triggerImmediateForcefulShutdown(int status, boolean
logging) {
+ try {
+ if (status != 0 && logging) {
+ log.warn("Triggering immediate shutdown of current process
with status {}", status,
+ new Exception("Stacktrace for immediate shutdown"));
+ }
+ shutdownLogging();
+ } finally {
+ Runtime.getRuntime().halt(status);
+ }
+ }
+
+ private static void shutdownLogging() {
+ // flush log buffers and shutdown log4j2 logging to prevent log
truncation
+ if (log4j2ShutdownMethod != null) {
+ try {
+ // use reflection to call
org.apache.logging.log4j.LogManager.shutdown()
+ log4j2ShutdownMethod.invoke(null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ log.error("Unable to call
org.apache.logging.log4j.LogManager.shutdown using reflection.", e);
+ }
+ }
+ }
+
+ /**
+ * Triggers an immediate forceful shutdown of the current process using 1
as the status code.
+ *
+ * @see Runtime#halt(int)
+ */
+ public static void triggerImmediateForcefulShutdown() {
+ triggerImmediateForcefulShutdown(1);
+ }
+}
\ No newline at end of file
diff --git
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java
new file mode 100644
index 0000000000..55031dd8f8
--- /dev/null
+++
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * defines the utilities for allocator used across the project.
+ */
+package org.apache.bookkeeper.common.util;
\ No newline at end of file
diff --git
a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
index 6f2538d6c8..40c41fa65b 100644
---
a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
+++
b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
@@ -35,7 +36,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.common.util.ShutdownUtil;
import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
/**
* Tests for {@link ByteBufAllocatorBuilderImpl}.
@@ -87,6 +91,30 @@ public class ByteBufAllocatorBuilderTest {
assertEquals(outOfDirectMemException, receivedException.get());
}
+ @Test()
+ public void testOomExit() {
+ ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+ when(baseAlloc.directBuffer(anyInt(),
anyInt())).thenThrow(outOfDirectMemException);
+
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .pooledAllocator(baseAlloc)
+ .outOfMemoryPolicy(OutOfMemoryPolicy.ThrowException)
+ .exitOnOutOfMemory(true)
+ .build();
+
+ MockedStatic<ShutdownUtil> mockedStatic =
mockStatic(ShutdownUtil.class);
+
+ try {
+ alloc.buffer();
+ fail("Should have thrown exception");
+ } catch (OutOfMemoryError e) {
+ // Expected
+ assertEquals(outOfDirectMemException, e);
+ }
+
+ mockedStatic.verify(() ->
ShutdownUtil.triggerImmediateForcefulShutdown(), Mockito.times(1));
+ }
+
@Test
public void testOomWithFallback() {
ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
index c9b71b9968..755efd5be0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
@@ -70,6 +70,7 @@ public class BookieResources {
.poolingConcurrency(conf.getAllocatorPoolingConcurrency())
.outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+ .exitOnOutOfMemory(conf.exitOnOutOfMemory())
.build();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 751d40ef53..a42128ec42 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -478,6 +478,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
.poolingConcurrency(conf.getAllocatorPoolingConcurrency())
.outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+ .exitOnOutOfMemory(conf.exitOnOutOfMemory())
.build();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index ea1576a4c7..c369d7b703 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -184,6 +184,7 @@ public abstract class AbstractConfiguration<T extends
AbstractConfiguration>
protected static final String ALLOCATOR_POOLING_CONCURRENCY =
"allocatorPoolingConcurrency";
protected static final String ALLOCATOR_OOM_POLICY =
"allocatorOutOfMemoryPolicy";
protected static final String ALLOCATOR_LEAK_DETECTION_POLICY =
"allocatorLeakDetectionPolicy";
+ protected static final String ALLOCATOR_EXIT_ON_OUT_OF_MEMORY =
"allocatorExitOnOutOfMemory";
// option to limit stats logging
public static final String LIMIT_STATS_LOGGING = "limitStatsLogging";
@@ -1157,6 +1158,15 @@ public abstract class AbstractConfiguration<T extends
AbstractConfiguration>
return getThis();
}
+ public T setExitOnOutOfMemory(boolean exitOnOutOfMemory) {
+ this.setProperty(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, exitOnOutOfMemory);
+ return getThis();
+ }
+
+ public boolean exitOnOutOfMemory() {
+ return getBoolean(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, false);
+ }
+
/**
* Return whether the busy-wait is enabled for BookKeeper and Netty IO
threads.
*
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
index a6333a47d3..194ab2c68d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
@@ -19,6 +19,8 @@
package org.apache.bookkeeper.conf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;
@@ -179,4 +181,12 @@ public class AbstractConfigurationTest {
System.getProperties().put(nettyLevelKey, nettyLevelStr);
}
}
+
+ @Test
+ public void testExitOnOutOfMemory() {
+ assertFalse(conf.exitOnOutOfMemory());
+ conf.setExitOnOutOfMemory(true);
+ assertTrue(conf.exitOnOutOfMemory());
+ }
+
}
diff --git
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
index 383ccfb982..e287fbd94a 100644
---
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
+++
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
@@ -495,6 +495,7 @@ public class JournalWriter implements Runnable {
log.error("Unable to allocate memory, exiting bookie", ex);
})
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+ .exitOnOutOfMemory(conf.exitOnOutOfMemory())
.build();
}