This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a40684a9c78e426b3b1498de1e8c71380fabad2a
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Nov 23 00:18:13 2020 +0100

    [hotfix] Use 'ExecutorThreadFactory' for Source Coordinator worker threads.
    
    This deduplicates some code.
---
 .../runtime/source/coordinator/SourceCoordinatorContext.java   | 10 ++--------
 1 file changed, 2 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 8bb46d6..9199eeb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.io.DataInputStream;
@@ -47,7 +48,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
@@ -116,13 +116,7 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
                this.assignmentTracker = splitAssignmentTracker;
                this.coordinatorThreadName = 
coordinatorThreadFactory.getCoordinatorThreadName();
                this.notifier = new ExecutorNotifier(
-                               
Executors.newScheduledThreadPool(numWorkerThreads, new ThreadFactory() {
-                                       private int index = 0;
-                                       @Override
-                                       public Thread newThread(Runnable r) {
-                                               return new Thread(r, 
coordinatorThreadName + "-worker-" + index++);
-                                       }
-                               }),
+                               
Executors.newScheduledThreadPool(numWorkerThreads, new 
ExecutorThreadFactory(coordinatorThreadName + "-worker")),
                                coordinatorExecutor);
        }
 

Reply via email to