This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d264f27a71 [Feature][Zeta] Add TaskGroupLocation to the thread name of 
the TaskExecutionService (#6095)
d264f27a71 is described below

commit d264f27a718cce8c74990d3a86abec249647f978
Author: ic4y <[email protected]>
AuthorDate: Fri Dec 29 10:53:50 2023 +0800

    [Feature][Zeta] Add TaskGroupLocation to the thread name of the 
TaskExecutionService (#6095)
---
 .../engine/server/TaskExecutionService.java        | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 0645ae25ce..482c4d6712 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -230,6 +230,13 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                                         new BlockingWorker(
                                                 new TaskTracker(t, 
taskGroupExecutionTracker),
                                                 startedLatch))
+                        .map(
+                                r ->
+                                        new NamedTaskWrapper(
+                                                r,
+                                                "BlockingWorker-"
+                                                        + 
taskGroupExecutionTracker.taskGroup
+                                                                
.getTaskGroupLocation()))
                         .map(executorService::submit)
                         .collect(toList());
 
@@ -911,4 +918,26 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
     public ServerConnectorPackageClient getServerConnectorPackageClient() {
         return serverConnectorPackageClient;
     }
+
+    public static class NamedTaskWrapper implements Runnable {
+        private final Runnable task;
+        private final String threadName;
+
+        public NamedTaskWrapper(Runnable task, String threadName) {
+            this.task = task;
+            this.threadName = threadName;
+        }
+
+        @Override
+        public void run() {
+            Thread currentThread = Thread.currentThread();
+            String originalName = currentThread.getName();
+            try {
+                currentThread.setName(threadName);
+                task.run();
+            } finally {
+                currentThread.setName(originalName);
+            }
+        }
+    }
 }

Reply via email to