This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 054e27cc8ed7364b5637fc8debf76236fb3d9263 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Wed Nov 11 13:45:01 2020 +0800 [FLINK-20081][connector/common][source] The SourceCoordinator should fail the job instead of killing JM when it catches an unhandled exception. --- .../coordinator/SourceCoordinatorProvider.java | 11 +++++++---- .../coordinator/SourceCoordinatorProviderTest.java | 21 +++++++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index 0304a88..4047bad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -24,7 +24,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator; -import org.apache.flink.runtime.util.FatalExitExceptionHandler; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -67,7 +66,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception { final String coordinatorThreadName = "SourceCoordinator-" + operatorName; CoordinatorExecutorThreadFactory coordinatorThreadFactory = - new CoordinatorExecutorThreadFactory(coordinatorThreadName); + new CoordinatorExecutorThreadFactory(coordinatorThreadName, context); ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory); SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer(); SourceCoordinatorContext<SplitT> sourceCoordinatorContext = @@ -81,10 +80,14 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre */ public static class CoordinatorExecutorThreadFactory implements ThreadFactory { private final String coordinatorThreadName; + private final OperatorCoordinator.Context context; private Thread t; - CoordinatorExecutorThreadFactory(String coordinatorThreadName) { + CoordinatorExecutorThreadFactory( + String coordinatorThreadName, + OperatorCoordinator.Context context) { this.coordinatorThreadName = coordinatorThreadName; + this.context = context; this.t = null; } @@ -95,7 +98,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre "SingleThreadExecutor."); } t = new Thread(r, coordinatorThreadName); - t.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE); + t.setUncaughtExceptionHandler((thread, throwable) -> context.failJob(throwable)); return t; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java index 3e736bb..c0fc3a8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.source.coordinator; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.mocks.MockSource; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; @@ -30,6 +31,7 @@ import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; import org.junit.Before; import org.junit.Test; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import static org.junit.Assert.assertEquals; @@ -96,4 +98,23 @@ public class SourceCoordinatorProviderTest { restoredSourceCoordinator.getContext().registeredReaders().get(0)); } + @Test + public void testCallAsyncExceptionFailsJob() throws Exception { + MockOperatorCoordinatorContext context = + new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS); + RecreateOnResetOperatorCoordinator coordinator = + (RecreateOnResetOperatorCoordinator) provider.create(context); + SourceCoordinator<?, ?> sourceCoordinator = + (SourceCoordinator<?, ?>) coordinator.getInternalCoordinator(); + sourceCoordinator.getContext().callAsync( + () -> null, + (ignored, e) -> { + throw new RuntimeException(); + }); + CommonTestUtils.waitUtil( + context::isJobFailed, + Duration.ofSeconds(10L), + "The job did not fail before timeout."); + } + }
