This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82b2c874edf27e5d12ec01fc673c32364e5cfa5b
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue Nov 17 23:12:36 2020 +0800

    [FLINK-20193][runtime] Catch all uncaught Throwables from the 
SplitEnumerator in the SourceCoordinator.
    
    This also makes the error message in CoordinatorExecutorThreadFactory 
clearer.
    
    This closes #14104
---
 .../source/mocks/MockSplitEnumeratorContext.java   | 257 +++++++++++++++++++++
 .../source/coordinator/SourceCoordinator.java      | 111 +++++----
 .../coordinator/SourceCoordinatorContext.java      |  22 +-
 .../coordinator/SourceCoordinatorProvider.java     |  32 ++-
 .../MockOperatorCoordinatorContext.java            |   7 +
 .../coordinator/SourceCoordinatorContextTest.java  |   1 -
 .../coordinator/SourceCoordinatorProviderTest.java |   1 +
 .../source/coordinator/SourceCoordinatorTest.java  |  51 +++-
 .../coordinator/SourceCoordinatorTestBase.java     |   1 -
 9 files changed, 417 insertions(+), 66 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
new file mode 100644
index 0000000..19f9ccc
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
@@ -0,0 +1,257 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+
+/**
+ * A mock class for {@link SplitEnumeratorContext}.
+ */
+public class MockSplitEnumeratorContext<SplitT extends SourceSplit> implements 
SplitEnumeratorContext<SplitT> {
+       private final Map<Integer, List<SourceEvent>> sentSourceEvent;
+       private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
+       private final List<SplitsAssignment<SplitT>> splitsAssignmentSequence;
+       private final ExecutorService workerExecutor;
+       private final ExecutorService mainExecutor;
+       private final TestingExecutorThreadFactory mainThreadFactory;
+       private final AtomicReference<Throwable> errorInWorkerThread;
+       private final AtomicReference<Throwable> errorInMainThread;
+       private final BlockingQueue<Callable<Future<?>>> oneTimeCallables;
+       private final List<Callable<Future<?>>> periodicCallables;
+       private final AtomicBoolean stoppedAcceptAsyncCalls;
+
+       private final int parallelism;
+
+       public MockSplitEnumeratorContext(int parallelism) {
+               this.sentSourceEvent = new HashMap<>();
+               this.registeredReaders = new ConcurrentHashMap<>();
+               this.splitsAssignmentSequence = new ArrayList<>();
+               this.parallelism = parallelism;
+               this.errorInWorkerThread = new AtomicReference<>();
+               this.errorInMainThread = new AtomicReference<>();
+               this.oneTimeCallables = new ArrayBlockingQueue<>(100);
+               this.periodicCallables = Collections.synchronizedList(new 
ArrayList<>());
+               this.mainThreadFactory = 
getThreadFactory("SplitEnumerator-main", errorInMainThread);
+               this.workerExecutor = 
getExecutor(getThreadFactory("SplitEnumerator-worker", errorInWorkerThread));
+               this.mainExecutor = getExecutor(mainThreadFactory);
+               this.stoppedAcceptAsyncCalls = new AtomicBoolean(false);
+       }
+
+       @Override
+       public MetricGroup metricGroup() {
+               return new UnregisteredMetricsGroup();
+       }
+
+       @Override
+       public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+               try {
+                       if 
(!mainThreadFactory.isCurrentThreadMainExecutorThread()) {
+                               mainExecutor.submit(() ->
+                                               
sentSourceEvent.computeIfAbsent(subtaskId, id -> new 
ArrayList<>()).add(event)).get();
+                       } else {
+                               sentSourceEvent.computeIfAbsent(subtaskId, id 
-> new ArrayList<>()).add(event);
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Failed to assign splits", 
e);
+               }
+       }
+
+       @Override
+       public int currentParallelism() {
+               return parallelism;
+       }
+
+       @Override
+       public Map<Integer, ReaderInfo> registeredReaders() {
+               return registeredReaders;
+       }
+
+       @Override
+       public void assignSplits(SplitsAssignment<SplitT> newSplitAssignments) {
+               splitsAssignmentSequence.add(newSplitAssignments);
+       }
+
+       @Override
+       public void signalNoMoreSplits(int subtask) {}
+
+       @Override
+       public <T> void callAsync(Callable<T> callable, BiConsumer<T, 
Throwable> handler) {
+               if (stoppedAcceptAsyncCalls.get()) {
+                       return;
+               }
+               oneTimeCallables.add(() ->
+                               workerExecutor.submit(wrap(errorInWorkerThread, 
() -> {
+                                       try {
+                                               T result = callable.call();
+                                               
mainExecutor.submit(wrap(errorInMainThread, () -> handler.accept(result, 
null))).get();
+                                       } catch (Throwable t) {
+                                               handler.accept(null, t);
+                                       }
+                               })));
+       }
+
+       @Override
+       public <T> void callAsync(Callable<T> callable, BiConsumer<T, 
Throwable> handler, long initialDelay, long period) {
+               if (stoppedAcceptAsyncCalls.get()) {
+                       return;
+               }
+               periodicCallables.add(() ->
+                               workerExecutor.submit(wrap(errorInWorkerThread, 
() -> {
+                                       try {
+                                               T result = callable.call();
+                                               
mainExecutor.submit(wrap(errorInMainThread, () -> handler.accept(result, 
null))).get();
+                                       } catch (Throwable t) {
+                                               handler.accept(null, t);
+                                       }
+                               })));
+       }
+
+       @Override
+       public void runInCoordinatorThread(Runnable runnable) {
+               mainExecutor.execute(runnable);
+       }
+
+       public void close() {
+               stoppedAcceptAsyncCalls.set(true);
+       }
+
+       // ------------ helper method to manipulate the context -------------
+
+       public void runNextOneTimeCallable() throws Throwable {
+               oneTimeCallables.take().call().get();
+               checkError();
+       }
+
+       public void runPeriodicCallable(int index) throws Throwable {
+               periodicCallables.get(index).call().get();
+               checkError();
+       }
+
+       public Map<Integer, List<SourceEvent>> getSentSourceEvent() throws 
Exception {
+               return workerExecutor.submit(() -> new 
HashMap<>(sentSourceEvent)).get();
+       }
+
+       public void registerReader(ReaderInfo readerInfo) {
+               registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+       }
+
+       public void unregisterReader(int readerId) {
+               registeredReaders.remove(readerId);
+       }
+
+       public List<Callable<Future<?>>> getPeriodicCallables() {
+               return periodicCallables;
+       }
+
+       public BlockingQueue<Callable<Future<?>>> getOneTimeCallables() {
+               return oneTimeCallables;
+       }
+
+       public List<SplitsAssignment<SplitT>> getSplitsAssignmentSequence() {
+               return splitsAssignmentSequence;
+       }
+
+       // ------------- private helpers -------------
+
+       private void checkError() throws Throwable {
+               if (errorInMainThread.get() != null) {
+                       throw errorInMainThread.get();
+               }
+               if (errorInWorkerThread.get() != null) {
+                       throw errorInWorkerThread.get();
+               }
+       }
+
+       private static TestingExecutorThreadFactory getThreadFactory(String 
threadName, AtomicReference<Throwable> error) {
+               return new TestingExecutorThreadFactory(threadName, error);
+       }
+
+       private static ExecutorService getExecutor(TestingExecutorThreadFactory 
threadFactory) {
+               return 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+       }
+
+       private static ThrowableCatchingRunnable 
wrap(AtomicReference<Throwable> error, Runnable r) {
+               return new ThrowableCatchingRunnable(t -> {
+                       if (!error.compareAndSet(null, t)) {
+                               error.get().addSuppressed(t);
+                       }
+               }, r);
+       }
+
+       // -------- private class -----------
+
+       /**
+        * A thread factory class that provides some helper methods.
+        */
+       public static class TestingExecutorThreadFactory implements 
ThreadFactory {
+               private final String coordinatorThreadName;
+               private final AtomicReference<Throwable> error;
+               private Thread t;
+
+               TestingExecutorThreadFactory(String coordinatorThreadName, 
AtomicReference<Throwable> error) {
+                       this.coordinatorThreadName = coordinatorThreadName;
+                       this.error = error;
+                       this.t = null;
+               }
+
+               @Override
+               public Thread newThread(Runnable r) {
+                       if (t != null) {
+                               throw new IllegalStateException("Should never 
happen. This factory should only be used by a " +
+                                               "SingleThreadExecutor.");
+                       }
+                       t = new Thread(r, coordinatorThreadName);
+                       t.setUncaughtExceptionHandler((t1, e) -> {
+                               if (!error.compareAndSet(null, e)) {
+                                       error.get().addSuppressed(e);
+                               }
+                       });
+                       return t;
+               }
+
+               boolean isCurrentThreadMainExecutorThread() {
+                       return Thread.currentThread() == t;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 2685ca6..07ebbdb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -32,8 +32,10 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.RequestSplitEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TemporaryClassLoaderContext;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,8 +123,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT> implements
                // The start sequence is the first task in the coordinator 
executor.
                // We rely on the single-threaded coordinator executor to 
guarantee
                // the other methods are invoked after the enumerator has 
started.
-               coordinatorExecutor.execute(() -> enumerator.start());
                started = true;
+               runInEventLoop(
+                       () -> enumerator.start(),
+                       "starting the SplitEnumerator."
+               );
        }
 
        @Override
@@ -145,10 +150,9 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT> implements
        }
 
        @Override
-       public void handleEventFromOperator(int subtask, OperatorEvent event) 
throws Exception {
-               ensureStarted();
-               coordinatorExecutor.execute(() -> {
-                       try {
+       public void handleEventFromOperator(int subtask, OperatorEvent event) {
+               runInEventLoop(
+                       () -> {
                                LOG.debug("Handling event from subtask {} of 
source {}: {}", subtask, operatorName, event);
                                if (event instanceof RequestSplitEvent) {
                                        enumerator.handleSplitRequest(subtask, 
((RequestSplitEvent) event).hostName());
@@ -159,76 +163,63 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT> implements
                                } else {
                                        throw new FlinkException("Unrecognized 
Operator Event: " + event);
                                }
-                       } catch (Exception e) {
-                               LOG.error("Failing the job due to exception 
when handling operator event {} from subtask {} " +
-                                                               "of source 
{}.", event, subtask, operatorName, e);
-                               context.failJob(e);
-                       }
-               });
+                       },
+                       "handling operator event %s from subtask %d", event, 
subtask
+               );
        }
 
        @Override
        public void subtaskFailed(int subtaskId, @Nullable Throwable reason) {
-               ensureStarted();
-               coordinatorExecutor.execute(() -> {
-                       try {
+               runInEventLoop(
+                       () -> {
                                LOG.info("Handling subtask {} failure of source 
{}.", subtaskId, operatorName);
                                List<SplitT> splitsToAddBack = 
context.getAndRemoveUncheckpointedAssignment(subtaskId);
                                context.unregisterSourceReader(subtaskId);
                                LOG.debug("Adding {} back to the split 
enumerator of source {}.", splitsToAddBack, operatorName);
                                enumerator.addSplitsBack(splitsToAddBack, 
subtaskId);
-                       } catch (Exception e) {
-                               LOG.error("Failing the job due to exception 
when handling subtask {} failure in source {}.",
-                                               subtaskId, operatorName, e);
-                               context.failJob(e);
-                       }
-               });
+                       },
+                       "handling subtask %d failure", subtaskId
+               );
        }
 
        @Override
-       public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) throws Exception {
-               ensureStarted();
-
-               coordinatorExecutor.execute(() -> {
-                       try {
+       public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) {
+               runInEventLoop(
+                       () -> {
                                LOG.debug("Taking a state snapshot on operator 
{} for checkpoint {}", operatorName, checkpointId);
-                               result.complete(toBytes(checkpointId));
-                       } catch (Exception e) {
-                               result.completeExceptionally(new 
CompletionException(
-                                               String.format("Failed to 
checkpoint coordinator for source %s due to ", operatorName), e));
-                       }
-               });
+                               try {
+                                       result.complete(toBytes(checkpointId));
+                               } catch (Throwable e) {
+                                       
ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
+                                       result.completeExceptionally(new 
CompletionException(
+                                               String.format("Failed to 
checkpoint SplitEnumerator for source %s", operatorName), e));
+                               }
+                       },
+                       "taking checkpoint %d", checkpointId
+               );
        }
 
        @Override
        public void notifyCheckpointComplete(long checkpointId) {
-               ensureStarted();
-               coordinatorExecutor.execute(() -> {
-                       try {
+               runInEventLoop(
+                       () -> {
                                LOG.info("Marking checkpoint {} as completed 
for source {}.", checkpointId, operatorName);
                                context.onCheckpointComplete(checkpointId);
                                
enumerator.notifyCheckpointComplete(checkpointId);
-                       } catch (Exception e) {
-                               LOG.error("Failing the job due to exception 
when notifying the completion of the "
-                                       + "checkpoint {} for source {}.", 
checkpointId, operatorName, e);
-                               context.failJob(e);
-                       }
-               });
+                       },
+                       "notifying the enumerator of completion of checkpoint 
%d", checkpointId
+               );
        }
 
        @Override
        public void notifyCheckpointAborted(long checkpointId) {
-               ensureStarted();
-               coordinatorExecutor.execute(() -> {
-                       try {
+               runInEventLoop(
+                       () -> {
                                LOG.info("Marking checkpoint {} as aborted for 
source {}.", checkpointId, operatorName);
                                
enumerator.notifyCheckpointAborted(checkpointId);
-                       } catch (Exception e) {
-                               LOG.error("Failing the job due to exception 
when notifying abortion of the "
-                                       + "checkpoint {} for source {}.", 
checkpointId, operatorName, e);
-                               context.failJob(e);
-                       }
-               });
+                       },
+                       "calling notifyCheckpointAborted()"
+               );
        }
 
        @Override
@@ -245,6 +236,28 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT> implements
                }
        }
 
+       private void runInEventLoop(
+                       final ThrowingRunnable<Throwable> action,
+                       final String actionName,
+                       final Object... actionNameFormatParameters) {
+
+               ensureStarted();
+               coordinatorExecutor.execute(() -> {
+                       try {
+                               action.run();
+                       } catch (Throwable t) {
+                               // if we have a JVM critical error, promote it 
immediately, there is a good chance the
+                               // logging or job failing will not succeed any 
more
+                               ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+                               final String actionString = 
String.format(actionName, actionNameFormatParameters);
+                               LOG.error("Uncaught exception in the 
SplitEnumerator for Source {} while {}. Triggering job failover.",
+                                               operatorName, actionString, t);
+                               context.failJob(t);
+                       }
+               });
+       }
+
        // ---------------------------------------------------
        @VisibleForTesting
        SplitEnumerator<SplitT, EnumChkT> getEnumerator() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 9199eeb..6d7d860 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -34,7 +34,12 @@ import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -46,6 +51,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -81,6 +87,9 @@ import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerde
 @Internal
 public class SourceCoordinatorContext<SplitT extends SourceSplit>
                implements SplitEnumeratorContext<SplitT>, AutoCloseable {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SourceCoordinatorContext.class);
+
        private final ExecutorService coordinatorExecutor;
        private final ExecutorNotifier notifier;
        private final OperatorCoordinator.Context operatorCoordinatorContext;
@@ -115,9 +124,13 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
                this.registeredReaders = new ConcurrentHashMap<>();
                this.assignmentTracker = splitAssignmentTracker;
                this.coordinatorThreadName = 
coordinatorThreadFactory.getCoordinatorThreadName();
+
+               final Executor errorHandlingCoordinatorExecutor = (runnable) ->
+                               coordinatorExecutor.execute(new 
ThrowableCatchingRunnable(this::handleUncaughtExceptionFromAsyncCall, 
runnable));
+
                this.notifier = new ExecutorNotifier(
                                
Executors.newScheduledThreadPool(numWorkerThreads, new 
ExecutorThreadFactory(coordinatorThreadName + "-worker")),
-                               coordinatorExecutor);
+                               errorHandlingCoordinatorExecutor);
        }
 
        @Override
@@ -227,6 +240,13 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
                operatorCoordinatorContext.failJob(cause);
        }
 
+       void handleUncaughtExceptionFromAsyncCall(Throwable t) {
+               ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+               LOG.error("Exception while handling result from async call in 
{}. Triggering job failover.",
+                               coordinatorThreadName, t);
+               failJob(t);
+       }
+
        /**
         * Take a snapshot of this SourceCoordinatorContext.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index f13aa4e..e019359 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.source.coordinator;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -63,10 +65,10 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit> extends Recre
        }
 
        @Override
-       public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) throws Exception  {
+       public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) {
                final String coordinatorThreadName = "SourceCoordinator-" + 
operatorName;
                CoordinatorExecutorThreadFactory coordinatorThreadFactory =
-                               new 
CoordinatorExecutorThreadFactory(coordinatorThreadName, context, 
context.getUserCodeClassloader());
+                               new 
CoordinatorExecutorThreadFactory(coordinatorThreadName, 
context.getUserCodeClassloader());
                ExecutorService coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
 
                SimpleVersionedSerializer<SplitT> splitSerializer = 
source.getSplitSerializer();
@@ -80,30 +82,40 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit> extends Recre
         * A thread factory class that provides some helper methods.
         */
        public static class CoordinatorExecutorThreadFactory implements 
ThreadFactory {
+
                private final String coordinatorThreadName;
-               private final OperatorCoordinator.Context context;
                private final ClassLoader cl;
+               private final Thread.UncaughtExceptionHandler errorHandler;
+
                private Thread t;
 
                CoordinatorExecutorThreadFactory(
                                final String coordinatorThreadName,
-                               final OperatorCoordinator.Context context,
                                final ClassLoader contextClassLoader) {
+                       this(coordinatorThreadName, contextClassLoader, 
FatalExitExceptionHandler.INSTANCE);
+               }
+
+               @VisibleForTesting
+               CoordinatorExecutorThreadFactory(
+                               final String coordinatorThreadName,
+                               final ClassLoader contextClassLoader,
+                               final Thread.UncaughtExceptionHandler 
errorHandler) {
                        this.coordinatorThreadName = coordinatorThreadName;
-                       this.context = context;
-                       this.t = null;
                        this.cl = contextClassLoader;
+                       this.errorHandler = errorHandler;
                }
 
                @Override
-               public Thread newThread(Runnable r) {
+               public synchronized Thread newThread(Runnable r) {
                        if (t != null) {
-                               throw new IllegalStateException("Should never 
happen. This factory should only be used by a " +
-                                               "SingleThreadExecutor.");
+                               throw new Error(
+                                       "This indicates that a fatal error has 
happened and caused the "
+                                               + "coordinator executor thread 
to exit. Check the earlier logs"
+                                               + "to see the root cause of the 
problem.");
                        }
                        t = new Thread(r, coordinatorThreadName);
                        t.setContextClassLoader(cl);
-                       t.setUncaughtExceptionHandler((thread, throwable) -> 
context.failJob(throwable));
+                       t.setUncaughtExceptionHandler(errorHandler);
                        return t;
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
index e7184c9..c7f769e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
@@ -36,6 +36,7 @@ public class MockOperatorCoordinatorContext implements 
OperatorCoordinator.Conte
 
        private final Map<Integer, List<OperatorEvent>> eventsToOperator;
        private boolean jobFailed;
+       private Throwable jobFailureReason;
 
        public MockOperatorCoordinatorContext(OperatorID operatorID, int 
numSubtasks) {
                this(operatorID, numSubtasks, true);
@@ -61,6 +62,7 @@ public class MockOperatorCoordinatorContext implements 
OperatorCoordinator.Conte
                this.numSubtasks = numSubtasks;
                this.eventsToOperator = new HashMap<>();
                this.jobFailed = false;
+               this.jobFailureReason = null;
                this.failEventSending = failEventSending;
                this.userCodeClassLoader = userCodeClassLoader;
        }
@@ -87,6 +89,7 @@ public class MockOperatorCoordinatorContext implements 
OperatorCoordinator.Conte
        @Override
        public void failJob(Throwable cause) {
                jobFailed = true;
+               jobFailureReason = cause;
        }
 
        @Override
@@ -112,4 +115,8 @@ public class MockOperatorCoordinatorContext implements 
OperatorCoordinator.Conte
        public boolean isJobFailed() {
                return jobFailed;
        }
+
+       public Throwable getJobFailureReason() {
+               return jobFailureReason;
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 1228d85..c7a7c9f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -151,7 +151,6 @@ public class SourceCoordinatorContextTest extends 
SourceCoordinatorTestBase {
                SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory =
                                new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
                                                TEST_OPERATOR_ID.toHexString(),
-                                               operatorCoordinatorContext,
                                                getClass().getClassLoader());
 
                try (ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index cb8bfd8..dbb6a41 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -45,6 +45,7 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("serial")
 public class SourceCoordinatorProviderTest {
        private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 
5678L);
+       private static final String OPERATOR_NAME = 
"SourceCoordinatorProviderTest";
        private static final int NUM_SPLITS = 10;
 
        private SourceCoordinatorProvider<MockSourceSplit> provider;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index d8182d4..bdb8c23 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
 import 
org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -252,6 +253,50 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
        }
 
        @Test
+       public void testFailJobWhenExceptionThrownFromStart() throws Exception {
+               final RuntimeException failureReason = new 
RuntimeException("Artificial Exception");
+
+               final SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> 
splitEnumerator =
+                               new MockSplitEnumerator(1, new 
MockSplitEnumeratorContext<>(1)) {
+                                       @Override
+                                       public void start() {
+                                               throw failureReason;
+                                       }
+                               };
+
+               final SourceCoordinator<?, ?> coordinator = new 
SourceCoordinator<>(
+                               OPERATOR_NAME, coordinatorExecutor, new 
EnumeratorCreatingSource<>(() -> splitEnumerator), context);
+
+               coordinator.start();
+               waitUtil(() -> operatorCoordinatorContext.isJobFailed(), 
Duration.ofSeconds(10),
+                       "The job should have failed due to the artificial 
exception.");
+               assertEquals(failureReason, 
operatorCoordinatorContext.getJobFailureReason());
+       }
+
+       @Test
+       public void testErrorThrownFromSplitEnumerator() throws Exception {
+               final Error error = new Error("Test Error");
+
+               final SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> 
splitEnumerator =
+                       new MockSplitEnumerator(1, new 
MockSplitEnumeratorContext<>(1)) {
+                               @Override
+                               public void handleSourceEvent(int subtaskId, 
SourceEvent sourceEvent) {
+                                       throw error;
+                               }
+                       };
+
+               final SourceCoordinator<?, ?> coordinator = new 
SourceCoordinator<>(
+                       OPERATOR_NAME, coordinatorExecutor, new 
EnumeratorCreatingSource<>(() -> splitEnumerator), context);
+
+               coordinator.start();
+               coordinator.handleEventFromOperator(1, new 
SourceEventWrapper(new SourceEvent() {}));
+
+               waitUtil(() -> operatorCoordinatorContext.isJobFailed(), 
Duration.ofSeconds(10),
+                       "The job should have failed due to the artificial 
exception.");
+               assertEquals(error, 
operatorCoordinatorContext.getJobFailureReason());
+       }
+
+       @Test
        public void testUserClassLoaderWhenCreatingNewEnumerator() throws 
Exception {
                final ClassLoader testClassLoader = new URLClassLoader(new 
URL[0]);
                final OperatorCoordinator.Context context = new 
MockOperatorCoordinatorContext(new OperatorID(), testClassLoader);
@@ -308,13 +353,11 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
        }
 
        private static byte[] createEmptyCheckpoint(long checkpointId) throws 
Exception {
-               final OperatorCoordinator.Context opContext = new 
MockOperatorCoordinatorContext(new OperatorID(), 0);
-
                try (SourceCoordinatorContext<MockSourceSplit> emptyContext = 
new SourceCoordinatorContext<>(
                                Executors.newDirectExecutorService(),
-                               new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory("test", opContext, 
SourceCoordinatorProviderTest.class.getClassLoader()),
+                               new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory("test", 
SourceCoordinatorProviderTest.class.getClassLoader()),
                                1,
-                               opContext,
+                               new MockOperatorCoordinatorContext(new 
OperatorID(), 0),
                                new MockSourceSplitSerializer())) {
 
                        return SourceCoordinator.writeCheckpointBytes(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index 87ae8c9..55e680c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -61,7 +61,6 @@ public abstract class SourceCoordinatorTestBase {
                SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory =
                                new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
                                                coordinatorThreadName,
-                                               operatorCoordinatorContext,
                                                getClass().getClassLoader());
 
                coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);

Reply via email to