This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new d3bb555 [FLINK-25611][core] Remove CoordinatorExecutorThreadFactory
thread creation guards
d3bb555 is described below
commit d3bb5556c7f5330fa76f045d74a3200b2cc45d91
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Jan 11 12:55:44 2022 +0100
[FLINK-25611][core] Remove CoordinatorExecutorThreadFactory thread creation
guards
---
.../source/coordinator/SourceCoordinatorProvider.java | 17 -----------------
1 file changed, 17 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 56248f6..9ffb984 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -96,8 +96,6 @@ public class SourceCoordinatorProvider<SplitT extends
SourceSplit>
@Nullable private Thread t;
- @Nullable private volatile Throwable previousFailureReason;
-
CoordinatorExecutorThreadFactory(
final String coordinatorThreadName, final ClassLoader
contextClassLoader) {
this(coordinatorThreadName, contextClassLoader,
FatalExitExceptionHandler.INSTANCE);
@@ -115,18 +113,6 @@ public class SourceCoordinatorProvider<SplitT extends
SourceSplit>
@Override
public synchronized Thread newThread(Runnable r) {
- if (t != null && t.isAlive()) {
- throw new Error(
- "Source Coordinator Thread already exists. There
should never be more than one "
- + "thread driving the actions of a Source
Coordinator. Existing Thread: "
- + t);
- }
- if (t != null && previousFailureReason != null) {
- throw new Error(
- "The following fatal error has happened in a
previously spawned "
- + "Source Coordinator thread. No new thread
can be spawned.",
- previousFailureReason);
- }
t = new Thread(r, coordinatorThreadName);
t.setContextClassLoader(cl);
t.setUncaughtExceptionHandler(this);
@@ -135,9 +121,6 @@ public class SourceCoordinatorProvider<SplitT extends
SourceSplit>
@Override
public synchronized void uncaughtException(Thread t, Throwable e) {
- if (previousFailureReason == null) {
- previousFailureReason = e;
- }
errorHandler.uncaughtException(t, e);
}