This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 47c2d97499ed95a33c071e916eb8bde7b179e879 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Wed Nov 11 13:46:54 2020 +0800 [FLINK-20081][connector/common][source] Fix the executor notifier to let the handler run in main thread when handling exception from the callable. This closes #14030 --- .../connector/source/SplitEnumeratorContext.java | 4 ++++ .../source/coordinator/ExecutorNotifier.java | 5 ++--- .../source/coordinator/ExecutorNotifierTest.java | 25 +++++++++++++++++++++- .../coordinator/SourceCoordinatorContextTest.java | 3 ++- .../coordinator/SourceCoordinatorTestBase.java | 3 ++- 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java index 8ec8618..c85700c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java @@ -92,6 +92,8 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> { * the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the * there might be unexpected behavior. * + * <p>Note that an exception thrown from the handler would result in failing the job. + * * @param callable a callable to call. * @param handler a handler that handles the return value of or the exception thrown from the callable. */ @@ -106,6 +108,8 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> { * the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the * there might be unexpected behavior. * + * <p>Note that an exception thrown from the handler would result in failing the job. + * * @param callable the callable to call. * @param handler a handler that handles the return value of or the exception thrown from the callable. * @param initialDelay the initial delay of calling the callable. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java index c865df5..35b9a33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java @@ -83,8 +83,7 @@ public class ExecutorNotifier implements AutoCloseable { T result = callable.call(); executorToNotify.execute(() -> handler.accept(result, null)); } catch (Throwable t) { - LOG.error("Unexpected exception {}", t); - handler.accept(null, t); + executorToNotify.execute(() -> handler.accept(null, t)); } }); } @@ -133,7 +132,7 @@ public class ExecutorNotifier implements AutoCloseable { T result = callable.call(); executorToNotify.execute(() -> handler.accept(result, null)); } catch (Throwable t) { - handler.accept(null, t); + executorToNotify.execute(() -> handler.accept(null, t)); } }, initialDelayMs, periodMs, TimeUnit.MILLISECONDS); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java index bbd9a7d..f5f179a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java @@ -94,7 +94,30 @@ public class ExecutorNotifierTest { } @Test - public void testExceptionInHandler() throws InterruptedException { + public void testExceptionInHandlerWhenHandlingException() throws InterruptedException { + Exception exception1 = new Exception("Expected exception."); + RuntimeException exception2 = new RuntimeException("Expected exception."); + CountDownLatch latch = new CountDownLatch(1); + notifier.notifyReadyAsync( + () -> { + throw exception1; + }, + (v, e) -> { + assertEquals(exception1, e); + assertNull(v); + latch.countDown(); + throw exception2; + }); + latch.await(); + closeExecutorToNotify(); + // The uncaught exception handler may fire after the executor has shutdown. + // We need to wait on the countdown latch here. + exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS); + assertEquals(exception2, exceptionInHandler); + } + + @Test + public void testExceptionInHandlerWhenHandlingResult() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); RuntimeException exception = new RuntimeException("Expected exception."); notifier.notifyReadyAsync(() -> 1234, (v, e) -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java index 2441192..280f6a4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java @@ -144,7 +144,8 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase { SourceCoordinatorContext<MockSourceSplit> restoredContext; SplitAssignmentTracker<MockSourceSplit> restoredTracker = new SplitAssignmentTracker<>(); SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory = - new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString()); + new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory( + TEST_OPERATOR_ID.toHexString(), operatorCoordinatorContext); try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); DataInputStream in = new DataInputStream(bais)) { restoredContext = new SourceCoordinatorContext<>( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index a46b1ad..22fb4a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -57,7 +57,8 @@ public abstract class SourceCoordinatorTestBase { splitSplitAssignmentTracker = new SplitAssignmentTracker<>(); String coordinatorThreadName = TEST_OPERATOR_ID.toHexString(); SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory = - new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(coordinatorThreadName); + new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory( + coordinatorThreadName, operatorCoordinatorContext); coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory); context = new SourceCoordinatorContext<>( coordinatorExecutor,
