This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 054e27cc8ed7364b5637fc8debf76236fb3d9263
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Wed Nov 11 13:45:01 2020 +0800

    [FLINK-20081][connector/common][source] The SourceCoordinator should fail 
the job instead of killing JM when it catches an unhandled exception.
---
 .../coordinator/SourceCoordinatorProvider.java      | 11 +++++++----
 .../coordinator/SourceCoordinatorProviderTest.java  | 21 +++++++++++++++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 0304a88..4047bad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
-import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -67,7 +66,7 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit> extends Recre
        public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) throws Exception  {
                final String coordinatorThreadName = "SourceCoordinator-" + 
operatorName;
                CoordinatorExecutorThreadFactory coordinatorThreadFactory =
-                               new 
CoordinatorExecutorThreadFactory(coordinatorThreadName);
+                               new 
CoordinatorExecutorThreadFactory(coordinatorThreadName, context);
                ExecutorService coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
                SimpleVersionedSerializer<SplitT> splitSerializer = 
source.getSplitSerializer();
                SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
@@ -81,10 +80,14 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit> extends Recre
         */
        public static class CoordinatorExecutorThreadFactory implements 
ThreadFactory {
                private final String coordinatorThreadName;
+               private final OperatorCoordinator.Context context;
                private Thread t;
 
-               CoordinatorExecutorThreadFactory(String coordinatorThreadName) {
+               CoordinatorExecutorThreadFactory(
+                               String coordinatorThreadName,
+                               OperatorCoordinator.Context context) {
                        this.coordinatorThreadName = coordinatorThreadName;
+                       this.context = context;
                        this.t = null;
                }
 
@@ -95,7 +98,7 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit> extends Recre
                                                "SingleThreadExecutor.");
                        }
                        t = new Thread(r, coordinatorThreadName);
-                       
t.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+                       t.setUncaughtExceptionHandler((thread, throwable) -> 
context.failJob(throwable));
                        return t;
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index 3e736bb..c0fc3a8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.source.coordinator;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -30,6 +31,7 @@ import 
org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
@@ -96,4 +98,23 @@ public class SourceCoordinatorProviderTest {
                                
restoredSourceCoordinator.getContext().registeredReaders().get(0));
        }
 
+       @Test
+       public void testCallAsyncExceptionFailsJob() throws Exception {
+               MockOperatorCoordinatorContext context =
+                       new MockOperatorCoordinatorContext(OPERATOR_ID, 
NUM_SPLITS);
+               RecreateOnResetOperatorCoordinator coordinator =
+                       (RecreateOnResetOperatorCoordinator) 
provider.create(context);
+               SourceCoordinator<?, ?> sourceCoordinator =
+                       (SourceCoordinator<?, ?>) 
coordinator.getInternalCoordinator();
+               sourceCoordinator.getContext().callAsync(
+                       () -> null,
+                       (ignored, e) -> {
+                               throw new RuntimeException();
+                       });
+               CommonTestUtils.waitUtil(
+                       context::isJobFailed,
+                       Duration.ofSeconds(10L),
+                       "The job did not fail before timeout.");
+       }
+
 }

Reply via email to