This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new d3bb555  [FLINK-25611][core] Remove CoordinatorExecutorThreadFactory 
thread creation guards
d3bb555 is described below

commit d3bb5556c7f5330fa76f045d74a3200b2cc45d91
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Jan 11 12:55:44 2022 +0100

    [FLINK-25611][core] Remove CoordinatorExecutorThreadFactory thread creation 
guards
---
 .../source/coordinator/SourceCoordinatorProvider.java   | 17 -----------------
 1 file changed, 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 56248f6..9ffb984 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -96,8 +96,6 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit>
 
         @Nullable private Thread t;
 
-        @Nullable private volatile Throwable previousFailureReason;
-
         CoordinatorExecutorThreadFactory(
                 final String coordinatorThreadName, final ClassLoader 
contextClassLoader) {
             this(coordinatorThreadName, contextClassLoader, 
FatalExitExceptionHandler.INSTANCE);
@@ -115,18 +113,6 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit>
 
         @Override
         public synchronized Thread newThread(Runnable r) {
-            if (t != null && t.isAlive()) {
-                throw new Error(
-                        "Source Coordinator Thread already exists. There 
should never be more than one "
-                                + "thread driving the actions of a Source 
Coordinator. Existing Thread: "
-                                + t);
-            }
-            if (t != null && previousFailureReason != null) {
-                throw new Error(
-                        "The following fatal error has happened in a 
previously spawned "
-                                + "Source Coordinator thread. No new thread 
can be spawned.",
-                        previousFailureReason);
-            }
             t = new Thread(r, coordinatorThreadName);
             t.setContextClassLoader(cl);
             t.setUncaughtExceptionHandler(this);
@@ -135,9 +121,6 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit>
 
         @Override
         public synchronized void uncaughtException(Thread t, Throwable e) {
-            if (previousFailureReason == null) {
-                previousFailureReason = e;
-            }
             errorHandler.uncaughtException(t, e);
         }
 

Reply via email to