This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 1d85f6ffc00 [FLINK-32029][core] Adds fallback error handling to 
FutureUtils.handleUncaughtException.
1d85f6ffc00 is described below

commit 1d85f6ffc00789e0239f8ed7164af03b81e8dfae
Author: Matthias Pohl <[email protected]>
AuthorDate: Mon May 8 14:25:28 2023 +0200

    [FLINK-32029][core] Adds fallback error handling to 
FutureUtils.handleUncaughtException.
    
    This was added to avoid missing errors that were caused by the error 
handling code.
    
    Signed-off-by: Matthias Pohl <[email protected]>
---
 .../apache/flink/util/concurrent/FutureUtils.java  | 27 +++++++++++++++--
 .../flink/util/concurrent/FutureUtilsTest.java     | 34 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
index cc952da6bab..f1bc4c5c5b1 100644
--- a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.util.concurrent;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FatalExitExceptionHandler;
@@ -1209,12 +1210,34 @@ public class FutureUtils {
     public static void handleUncaughtException(
             CompletableFuture<?> completableFuture,
             Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+        handleUncaughtException(
+                completableFuture, uncaughtExceptionHandler, 
FatalExitExceptionHandler.INSTANCE);
+    }
+
+    @VisibleForTesting
+    static void handleUncaughtException(
+            CompletableFuture<?> completableFuture,
+            Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
+            Thread.UncaughtExceptionHandler fatalErrorHandler) {
         checkNotNull(completableFuture)
                 .whenComplete(
                         (ignored, throwable) -> {
                             if (throwable != null) {
-                                uncaughtExceptionHandler.uncaughtException(
-                                        Thread.currentThread(), throwable);
+                                final Thread currentThread = 
Thread.currentThread();
+                                try {
+                                    uncaughtExceptionHandler.uncaughtException(
+                                            currentThread, throwable);
+                                } catch (Throwable t) {
+                                    final RuntimeException 
errorHandlerException =
+                                            new IllegalStateException(
+                                                    "An error occurred while 
executing the error handling for a "
+                                                            + 
throwable.getClass().getSimpleName()
+                                                            + ".",
+                                                    t);
+                                    
errorHandlerException.addSuppressed(throwable);
+                                    fatalErrorHandler.uncaughtException(
+                                            currentThread, 
errorHandlerException);
+                                }
                             }
                         });
     }
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
 
b/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
index a9e0e47f9f0..3b524aea6ac 100644
--- 
a/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
@@ -55,6 +55,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.emptyArray;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
@@ -814,6 +815,39 @@ public class FutureUtilsTest extends TestLogger {
         assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(true));
     }
 
+    /**
+     * Tests the behavior of {@link 
FutureUtils#handleUncaughtException(CompletableFuture,
+     * Thread.UncaughtExceptionHandler)} with a custom fallback exception 
handler to avoid
+     * triggering {@code System.exit}.
+     */
+    @Test
+    public void testHandleUncaughtExceptionWithBuggyErrorHandlingCode() {
+        final Exception actualProductionCodeError =
+                new Exception(
+                        "Actual production code error that should be caught by 
the error handler.");
+
+        final RuntimeException errorHandlingException =
+                new RuntimeException("Expected test error in error handling 
code.");
+        final Thread.UncaughtExceptionHandler buggyActualExceptionHandler =
+                (thread, ignoredActualException) -> {
+                    throw errorHandlingException;
+                };
+
+        final AtomicReference<Throwable> caughtErrorHandlingException = new 
AtomicReference<>();
+        final Thread.UncaughtExceptionHandler fallbackExceptionHandler =
+                (thread, errorHandlingEx) -> 
caughtErrorHandlingException.set(errorHandlingEx);
+
+        FutureUtils.handleUncaughtException(
+                FutureUtils.completedExceptionally(actualProductionCodeError),
+                buggyActualExceptionHandler,
+                fallbackExceptionHandler);
+
+        final Throwable actualError = caughtErrorHandlingException.get();
+        assertThat(actualError, instanceOf(IllegalStateException.class));
+        assertThat(actualError.getCause(), is(errorHandlingException));
+        assertThat(actualError.getSuppressed(), 
arrayContaining(actualProductionCodeError));
+    }
+
     private static class TestingUncaughtExceptionHandler
             implements Thread.UncaughtExceptionHandler {
 

Reply via email to