This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25ec89b8dae175246fd28972f8dbf7d479e89b5f
Author: Andrey Zagrebin <azagre...@apache.org>
AuthorDate: Thu Mar 19 16:16:58 2020 +0300

    [FLINK-15989] Improve direct out-of-memory error handling in 
MemorySegmentFactory
---
 .../flink/core/memory/MemorySegmentFactory.java    | 25 +++++++++++++++++++-
 .../java/org/apache/flink/util/ExceptionUtils.java | 27 +++++++++++++++++++++-
 2 files changed, 50 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index c297a26..760d2ac 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -19,6 +19,10 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 
@@ -32,6 +36,7 @@ import java.nio.ByteBuffer;
  */
 @Internal
 public final class MemorySegmentFactory {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MemorySegmentFactory.class);
 
        /**
         * Creates a new memory segment that targets the given heap memory 
region.
@@ -94,10 +99,28 @@ public final class MemorySegmentFactory {
         * @return A new memory segment, backed by unpooled off-heap memory.
         */
        public static MemorySegment allocateUnpooledOffHeapMemory(int size, 
Object owner) {
-               ByteBuffer memory = ByteBuffer.allocateDirect(size);
+               ByteBuffer memory = allocateDirectMemory(size);
                return new HybridMemorySegment(memory, owner, null);
        }
 
+       private static ByteBuffer allocateDirectMemory(int size) {
+               //noinspection ErrorNotRethrown
+               try {
+                       return ByteBuffer.allocateDirect(size);
+               } catch (OutOfMemoryError outOfMemoryError) {
+                       // TODO: this error handling can be removed in future,
+                       // once we find a common way to handle OOM errors in 
netty threads.
+                       // Here we enrich it to propagate better OOM message to 
the receiver
+                       // if it happens in a netty thread.
+                       OutOfMemoryError enrichedOutOfMemoryError = 
(OutOfMemoryError) ExceptionUtils
+                               
.enrichTaskManagerOutOfMemoryError(outOfMemoryError);
+                       if 
(ExceptionUtils.isDirectOutOfMemoryError(outOfMemoryError)) {
+                               LOG.error("Cannot allocate direct memory 
segment", enrichedOutOfMemoryError);
+                       }
+                       throw enrichedOutOfMemoryError;
+               }
+       }
+
        /**
         * Allocates an off-heap unsafe memory and creates a new memory segment 
to represent that memory.
         *
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 5fc1bfe..0a63ae5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -49,6 +49,19 @@ public final class ExceptionUtils {
        /** The stringified representation of a null exception reference. */
        public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
 
+       private static final String TM_DIRECT_OOM_ERROR_MESSAGE = String.format(
+               "Direct buffer memory. The direct out-of-memory error has 
occurred. This can mean two things: either job(s) require(s) " +
+                       "a larger size of JVM direct memory or there is a 
direct memory leak. The direct memory can be " +
+                       "allocated by user code or some of its dependencies. In 
this case '%s' configuration option should be " +
+                       "increased. Flink framework and its dependencies also 
consume the direct memory, mostly for network " +
+                       "communication. The most of network memory is managed 
by Flink and should not result in out-of-memory " +
+                       "error. In certain special cases, in particular for 
jobs with high parallelism, the framework may " +
+                       "require more direct memory which is not managed by 
Flink. In this case '%s' configuration option " +
+                       "should be increased. If the error persists then there 
is probably a direct memory leak which has to " +
+                       "be investigated and fixed. The task executor has to be 
shutdown...",
+               TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(),
+               TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key());
+
        private static final String TM_METASPACE_OOM_ERROR_MESSAGE = 
String.format(
                "Metaspace. The metaspace out-of-memory error has occurred. 
This can mean two things: either the job requires " +
                        "a larger size of JVM metaspace to load classes or 
there is a class loading leak. In the first case " +
@@ -121,7 +134,7 @@ public final class ExceptionUtils {
        /**
         * Generates new {@link OutOfMemoryError} with more detailed message.
         *
-        * <p>This method improves error message for metaspace {@link 
OutOfMemoryError}.
+        * <p>This method improves error message for direct and metaspace 
{@link OutOfMemoryError}.
         * It adds description of possible causes and ways of resolution.
         *
         * @param exception The exception to enrich.
@@ -130,6 +143,8 @@ public final class ExceptionUtils {
        public static Throwable enrichTaskManagerOutOfMemoryError(Throwable 
exception) {
                if (isMetaspaceOutOfMemoryError(exception)) {
                        return changeOutOfMemoryErrorMessage(exception, 
TM_METASPACE_OOM_ERROR_MESSAGE);
+               } else if (isDirectOutOfMemoryError(exception)) {
+                       return changeOutOfMemoryErrorMessage(exception, 
TM_DIRECT_OOM_ERROR_MESSAGE);
                }
                return exception;
        }
@@ -155,6 +170,16 @@ public final class ExceptionUtils {
                return isOutOfMemoryErrorWithMessageStartingWith(t, 
"Metaspace");
        }
 
+       /**
+        * Checks whether the given exception indicates a JVM direct 
out-of-memory error.
+        *
+        * @param t The exception to check.
+        * @return True, if the exception is the direct {@link 
OutOfMemoryError}, false otherwise.
+        */
+       public static boolean isDirectOutOfMemoryError(Throwable t) {
+               return isOutOfMemoryErrorWithMessageStartingWith(t, "Direct 
buffer memory");
+       }
+
        private static boolean 
isOutOfMemoryErrorWithMessageStartingWith(Throwable t, String prefix) {
                // the exact matching of the class is checked to avoid matching 
any custom subclasses of OutOfMemoryError
                // as we are interested in the original exceptions, generated 
by JVM.

Reply via email to