This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 17a294d [FLINK-22545][coordination] Fix check during creation of
Source Coordinator thread.
17a294d is described below
commit 17a294daacdc67b1939607a41e67e56af2fa6888
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Jul 13 14:36:32 2021 +0200
[FLINK-22545][coordination] Fix check during creation of Source Coordinator
thread.
The check was meant as a safeguard to prevent re-instantiation after fatal
errors killed a previous thread.
But the check was susceptible to thread termination due to idleness in the
executor.
This updates the check to only fail if there is in fact an instantiation
next to a running thread, or after a
previously crashed thread.
---
.../coordinator/SourceCoordinatorProvider.java | 33 +++++++++++++++++-----
1 file changed, 26 insertions(+), 7 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 563ed28..1660027 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -27,6 +27,8 @@ import
org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+import javax.annotation.Nullable;
+
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -85,13 +87,16 @@ public class SourceCoordinatorProvider<SplitT extends
SourceSplit>
}
/** A thread factory class that provides some helper methods. */
- public static class CoordinatorExecutorThreadFactory implements
ThreadFactory {
+ public static class CoordinatorExecutorThreadFactory
+ implements ThreadFactory, Thread.UncaughtExceptionHandler {
private final String coordinatorThreadName;
private final ClassLoader cl;
private final Thread.UncaughtExceptionHandler errorHandler;
- private Thread t;
+ @Nullable private Thread t;
+
+ @Nullable private volatile Throwable previousFailureReason;
CoordinatorExecutorThreadFactory(
final String coordinatorThreadName, final ClassLoader
contextClassLoader) {
@@ -110,18 +115,32 @@ public class SourceCoordinatorProvider<SplitT extends
SourceSplit>
@Override
public synchronized Thread newThread(Runnable r) {
- if (t != null) {
+ if (t != null && t.isAlive()) {
+ throw new Error(
+ "Source Coordinator Thread already exists. There
should never be more than one "
+ + "thread driving the actions of a Source
Coordinator. Existing Thread: "
+ + t);
+ }
+ if (t != null && previousFailureReason != null) {
throw new Error(
- "This indicates that a fatal error has happened and
caused the "
- + "coordinator executor thread to exit. Check
the earlier logs"
- + "to see the root cause of the problem.");
+ "The following fatal error has happened in a
previously spawned "
+ + "Source Coordinator thread. No new thread
can be spawned.",
+ previousFailureReason);
}
t = new Thread(r, coordinatorThreadName);
t.setContextClassLoader(cl);
- t.setUncaughtExceptionHandler(errorHandler);
+ t.setUncaughtExceptionHandler(this);
return t;
}
+ @Override
+ public synchronized void uncaughtException(Thread t, Throwable e) {
+ if (previousFailureReason == null) {
+ previousFailureReason = e;
+ }
+ errorHandler.uncaughtException(t, e);
+ }
+
String getCoordinatorThreadName() {
return coordinatorThreadName;
}