This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 1d85f6ffc00 [FLINK-32029][core] Adds fallback error handling to
FutureUtils.handleUncaughtException.
1d85f6ffc00 is described below
commit 1d85f6ffc00789e0239f8ed7164af03b81e8dfae
Author: Matthias Pohl <[email protected]>
AuthorDate: Mon May 8 14:25:28 2023 +0200
[FLINK-32029][core] Adds fallback error handling to
FutureUtils.handleUncaughtException.
This was added to avoid missing errors that were caused by the error
handling code.
Signed-off-by: Matthias Pohl <[email protected]>
---
.../apache/flink/util/concurrent/FutureUtils.java | 27 +++++++++++++++--
.../flink/util/concurrent/FutureUtilsTest.java | 34 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 2 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
index cc952da6bab..f1bc4c5c5b1 100644
--- a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.util.concurrent;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
@@ -1209,12 +1210,34 @@ public class FutureUtils {
public static void handleUncaughtException(
CompletableFuture<?> completableFuture,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+ handleUncaughtException(
+ completableFuture, uncaughtExceptionHandler,
FatalExitExceptionHandler.INSTANCE);
+ }
+
+ @VisibleForTesting
+ static void handleUncaughtException(
+ CompletableFuture<?> completableFuture,
+ Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
+ Thread.UncaughtExceptionHandler fatalErrorHandler) {
checkNotNull(completableFuture)
.whenComplete(
(ignored, throwable) -> {
if (throwable != null) {
- uncaughtExceptionHandler.uncaughtException(
- Thread.currentThread(), throwable);
+ final Thread currentThread =
Thread.currentThread();
+ try {
+ uncaughtExceptionHandler.uncaughtException(
+ currentThread, throwable);
+ } catch (Throwable t) {
+ final RuntimeException
errorHandlerException =
+ new IllegalStateException(
+ "An error occurred while
executing the error handling for a "
+ +
throwable.getClass().getSimpleName()
+ + ".",
+ t);
+
errorHandlerException.addSuppressed(throwable);
+ fatalErrorHandler.uncaughtException(
+ currentThread,
errorHandlerException);
+ }
}
});
}
diff --git
a/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
index a9e0e47f9f0..3b524aea6ac 100644
---
a/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
+++
b/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
@@ -55,6 +55,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.emptyArray;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
@@ -814,6 +815,39 @@ public class FutureUtilsTest extends TestLogger {
assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(true));
}
+ /**
+ * Tests the behavior of {@link
FutureUtils#handleUncaughtException(CompletableFuture,
+ * Thread.UncaughtExceptionHandler)} with a custom fallback exception
handler to avoid
+ * triggering {@code System.exit}.
+ */
+ @Test
+ public void testHandleUncaughtExceptionWithBuggyErrorHandlingCode() {
+ final Exception actualProductionCodeError =
+ new Exception(
+ "Actual production code error that should be caught by
the error handler.");
+
+ final RuntimeException errorHandlingException =
+ new RuntimeException("Expected test error in error handling
code.");
+ final Thread.UncaughtExceptionHandler buggyActualExceptionHandler =
+ (thread, ignoredActualException) -> {
+ throw errorHandlingException;
+ };
+
+ final AtomicReference<Throwable> caughtErrorHandlingException = new
AtomicReference<>();
+ final Thread.UncaughtExceptionHandler fallbackExceptionHandler =
+ (thread, errorHandlingEx) ->
caughtErrorHandlingException.set(errorHandlingEx);
+
+ FutureUtils.handleUncaughtException(
+ FutureUtils.completedExceptionally(actualProductionCodeError),
+ buggyActualExceptionHandler,
+ fallbackExceptionHandler);
+
+ final Throwable actualError = caughtErrorHandlingException.get();
+ assertThat(actualError, instanceOf(IllegalStateException.class));
+ assertThat(actualError.getCause(), is(errorHandlingException));
+ assertThat(actualError.getSuppressed(),
arrayContaining(actualProductionCodeError));
+ }
+
private static class TestingUncaughtExceptionHandler
implements Thread.UncaughtExceptionHandler {