This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a40684a9c78e426b3b1498de1e8c71380fabad2a Author: Stephan Ewen <[email protected]> AuthorDate: Mon Nov 23 00:18:13 2020 +0100 [hotfix] Use 'ExecutorThreadFactory' for Source Coordinator worker threads. This deduplicates some code. --- .../runtime/source/coordinator/SourceCoordinatorContext.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 8bb46d6..9199eeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.FlinkRuntimeException; import java.io.DataInputStream; @@ -47,7 +48,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -116,13 +116,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> this.assignmentTracker = splitAssignmentTracker; this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName(); this.notifier = new ExecutorNotifier( - Executors.newScheduledThreadPool(numWorkerThreads, new ThreadFactory() { - private int index = 0; - @Override - public Thread newThread(Runnable r) { - return new Thread(r, coordinatorThreadName + "-worker-" + index++); - } - }), + Executors.newScheduledThreadPool(numWorkerThreads, new ExecutorThreadFactory(coordinatorThreadName + "-worker")), coordinatorExecutor); }
