This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 47c2d97499ed95a33c071e916eb8bde7b179e879
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Wed Nov 11 13:46:54 2020 +0800

    [FLINK-20081][connector/common][source] Fix the executor notifier to let 
the handler run in main thread when handling exception from the callable.
    
    This closes #14030
---
 .../connector/source/SplitEnumeratorContext.java   |  4 ++++
 .../source/coordinator/ExecutorNotifier.java       |  5 ++---
 .../source/coordinator/ExecutorNotifierTest.java   | 25 +++++++++++++++++++++-
 .../coordinator/SourceCoordinatorContextTest.java  |  3 ++-
 .../coordinator/SourceCoordinatorTestBase.java     |  3 ++-
 5 files changed, 34 insertions(+), 6 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 8ec8618..c85700c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -92,6 +92,8 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
         * the states that will be a part of the {@link 
SplitEnumerator#snapshotState()}. Otherwise the
         * there might be unexpected behavior.
         *
+        * <p>Note that an exception thrown from the handler would result in 
failing the job.
+        *
         * @param callable a callable to call.
         * @param handler a handler that handles the return value of or the 
exception thrown from the callable.
         */
@@ -106,6 +108,8 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
         * the states that will be a part of the {@link 
SplitEnumerator#snapshotState()}. Otherwise the
         * there might be unexpected behavior.
         *
+        * <p>Note that an exception thrown from the handler would result in 
failing the job.
+        *
         * @param callable the callable to call.
         * @param handler a handler that handles the return value of or the 
exception thrown from the callable.
         * @param initialDelay the initial delay of calling the callable.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
index c865df5..35b9a33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -83,8 +83,7 @@ public class ExecutorNotifier implements AutoCloseable {
                                T result = callable.call();
                                executorToNotify.execute(() -> 
handler.accept(result, null));
                        } catch (Throwable t) {
-                               LOG.error("Unexpected exception {}", t);
-                               handler.accept(null, t);
+                               executorToNotify.execute(() -> 
handler.accept(null, t));
                        }
                });
        }
@@ -133,7 +132,7 @@ public class ExecutorNotifier implements AutoCloseable {
                                T result = callable.call();
                                executorToNotify.execute(() -> 
handler.accept(result, null));
                        } catch (Throwable t) {
-                               handler.accept(null, t);
+                               executorToNotify.execute(() -> 
handler.accept(null, t));
                        }
                }, initialDelayMs, periodMs, TimeUnit.MILLISECONDS);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
index bbd9a7d..f5f179a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
@@ -94,7 +94,30 @@ public class ExecutorNotifierTest {
        }
 
        @Test
-       public void testExceptionInHandler() throws InterruptedException {
+       public void testExceptionInHandlerWhenHandlingException() throws 
InterruptedException {
+               Exception exception1 = new Exception("Expected exception.");
+               RuntimeException exception2 =  new RuntimeException("Expected 
exception.");
+               CountDownLatch latch = new CountDownLatch(1);
+               notifier.notifyReadyAsync(
+                       () -> {
+                               throw exception1;
+                       },
+                       (v, e) -> {
+                               assertEquals(exception1, e);
+                               assertNull(v);
+                               latch.countDown();
+                               throw exception2;
+                       });
+               latch.await();
+               closeExecutorToNotify();
+               // The uncaught exception handler may fire after the executor 
has shutdown.
+               // We need to wait on the countdown latch here.
+               exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
+               assertEquals(exception2, exceptionInHandler);
+       }
+
+       @Test
+       public void testExceptionInHandlerWhenHandlingResult() throws 
InterruptedException {
                CountDownLatch latch = new CountDownLatch(1);
                RuntimeException exception =  new RuntimeException("Expected 
exception.");
                notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 2441192..280f6a4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -144,7 +144,8 @@ public class SourceCoordinatorContextTest extends 
SourceCoordinatorTestBase {
                SourceCoordinatorContext<MockSourceSplit> restoredContext;
                SplitAssignmentTracker<MockSourceSplit> restoredTracker = new 
SplitAssignmentTracker<>();
                SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory =
-                               new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString());
+                               new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+                                       TEST_OPERATOR_ID.toHexString(), 
operatorCoordinatorContext);
                try (ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes);
                                DataInputStream in = new DataInputStream(bais)) 
{
                        restoredContext = new SourceCoordinatorContext<>(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index a46b1ad..22fb4a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -57,7 +57,8 @@ public abstract class SourceCoordinatorTestBase {
                splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
                String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
                SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory =
-                               new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(coordinatorThreadName);
+                               new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+                                       coordinatorThreadName, 
operatorCoordinatorContext);
                coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
                context = new SourceCoordinatorContext<>(
                                coordinatorExecutor,

Reply via email to