This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a956a49876bb1733bb9354372c25c05d0d96d7be
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Tue Apr 30 14:47:16 2019 +0200

    [FLINK-12219] Log uncaught exceptions and terminate in case 
Dispatcher#jobReachedGloballyTerminalState fails
    
    FutureUtils#assertNoException will assert that the given future has not 
been completed
    exceptionally. If it has been completed exceptionally, then it will call the
    FatalExitExceptionHandler.
    
    This commit uses assertNoException to assert that the 
Dispatcher#jobReachedGloballyTerminalState
    method has not failed.
    
    This closes #8334.
---
 .../flink/runtime/concurrent/FutureUtils.java      | 26 ++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       | 38 ++++++++-------
 .../flink/runtime/concurrent/FutureUtilsTest.java  | 56 ++++++++++++++++++++++
 3 files changed, 104 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 1458eab..c1613c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
@@ -969,4 +970,29 @@ public class FutureUtils {
                        return DELAYER.schedule(runnable, delay, timeUnit);
                }
        }
+
+       /**
+        * Asserts that the given {@link CompletableFuture} is not completed 
exceptionally. If the future
+        * is completed exceptionally, then it will call the {@link 
FatalExitExceptionHandler}.
+        *
+        * @param completableFuture to assert for no exceptions
+        */
+       public static void assertNoException(CompletableFuture<?> 
completableFuture) {
+               handleUncaughtException(completableFuture, 
FatalExitExceptionHandler.INSTANCE);
+       }
+
+       /**
+        * Checks that the given {@link CompletableFuture} is not completed 
exceptionally. If the future
+        * is completed exceptionally, then it will call the given uncaught 
exception handler.
+        *
+        * @param completableFuture to assert for no exceptions
+        * @param uncaughtExceptionHandler to call if the future is completed 
exceptionally
+        */
+       public static void handleUncaughtException(CompletableFuture<?> 
completableFuture, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+               checkNotNull(completableFuture).whenComplete((ignored, 
throwable) -> {
+                       if (throwable != null) {
+                               
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), throwable);
+                       }
+               });
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index a1799d9..c680b57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -364,26 +364,32 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
        private JobManagerRunner startJobManagerRunner(JobManagerRunner 
jobManagerRunner) throws Exception {
                final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
-               jobManagerRunner.getResultFuture().whenCompleteAsync(
-                       (ArchivedExecutionGraph archivedExecutionGraph, 
Throwable throwable) -> {
-                               // check if we are still the active 
JobManagerRunner by checking the identity
-                               //noinspection ObjectEquality
-                               if (jobManagerRunner == 
jobManagerRunnerFutures.get(jobId).getNow(null)) {
-                                       if (archivedExecutionGraph != null) {
-                                               
jobReachedGloballyTerminalState(archivedExecutionGraph);
-                                       } else {
-                                               final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
 
-                                               if (strippedThrowable 
instanceof JobNotFinishedException) {
-                                                       jobNotFinished(jobId);
+               FutureUtils.assertNoException(
+                       jobManagerRunner.getResultFuture().handleAsync(
+                               (ArchivedExecutionGraph archivedExecutionGraph, 
Throwable throwable) -> {
+                                       // check if we are still the active 
JobManagerRunner by checking the identity
+                                       final 
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = 
jobManagerRunnerFutures.get(jobId);
+                                       final JobManagerRunner 
currentJobManagerRunner = jobManagerRunnerFuture != null ? 
jobManagerRunnerFuture.getNow(null) : null;
+                                       //noinspection ObjectEquality
+                                       if (jobManagerRunner == 
currentJobManagerRunner) {
+                                               if (archivedExecutionGraph != 
null) {
+                                                       
jobReachedGloballyTerminalState(archivedExecutionGraph);
                                                } else {
-                                                       jobMasterFailed(jobId, 
strippedThrowable);
+                                                       final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+
+                                                       if (strippedThrowable 
instanceof JobNotFinishedException) {
+                                                               
jobNotFinished(jobId);
+                                                       } else {
+                                                               
jobMasterFailed(jobId, strippedThrowable);
+                                                       }
                                                }
+                                       } else {
+                                               log.debug("There is a newer 
JobManagerRunner for the job {}.", jobId);
                                        }
-                               } else {
-                                       log.debug("There is a newer 
JobManagerRunner for the job {}.", jobId);
-                               }
-                       }, getMainThreadExecutor());
+
+                                       return null;
+                               }, getMainThreadExecutor()));
 
                jobManagerRunner.start();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index bfbd62e..e16771c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -650,4 +650,60 @@ public class FutureUtilsTest extends TestLogger {
                Assert.assertFalse(runWithExecutor.get());
                Assert.assertTrue(continuationFuture.isDone());
        }
+
+       @Test
+       public void testHandleUncaughtExceptionWithCompletedFuture() {
+               final CompletableFuture<String> future = 
CompletableFuture.completedFuture("foobar");
+               final TestingUncaughtExceptionHandler uncaughtExceptionHandler 
= new TestingUncaughtExceptionHandler();
+
+               FutureUtils.handleUncaughtException(future, 
uncaughtExceptionHandler);
+               assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(false));
+       }
+
+       @Test
+       public void testHandleUncaughtExceptionWithNormalCompletion() {
+               final CompletableFuture<String> future = new 
CompletableFuture<>();
+
+               final TestingUncaughtExceptionHandler uncaughtExceptionHandler 
= new TestingUncaughtExceptionHandler();
+
+               FutureUtils.handleUncaughtException(future, 
uncaughtExceptionHandler);
+               future.complete("barfoo");
+               assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(false));
+       }
+
+       @Test
+       public void 
testHandleUncaughtExceptionWithExceptionallyCompletedFuture() {
+               final CompletableFuture<String> future = 
FutureUtils.completedExceptionally(new FlinkException("foobar"));
+
+               final TestingUncaughtExceptionHandler uncaughtExceptionHandler 
= new TestingUncaughtExceptionHandler();
+
+               FutureUtils.handleUncaughtException(future, 
uncaughtExceptionHandler);
+               assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(true));
+       }
+
+       @Test
+       public void testHandleUncaughtExceptionWithExceptionallyCompletion() {
+               final CompletableFuture<String> future = new 
CompletableFuture<>();
+
+               final TestingUncaughtExceptionHandler uncaughtExceptionHandler 
= new TestingUncaughtExceptionHandler();
+
+               FutureUtils.handleUncaughtException(future, 
uncaughtExceptionHandler);
+               assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(false));
+               future.completeExceptionally(new FlinkException("barfoo"));
+               assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(true));
+       }
+
+       private static class TestingUncaughtExceptionHandler implements 
Thread.UncaughtExceptionHandler {
+
+               private Throwable exception = null;
+
+               @Override
+               public void uncaughtException(Thread t, Throwable e) {
+                       exception = e;
+               }
+
+               private boolean hasBeenCalled() {
+                       return exception != null;
+               }
+       }
 }

Reply via email to