This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b5496a46d0 [Improve][Zeta] Add log to print the each task execute 
location (#9205)
b5496a46d0 is described below

commit b5496a46d0a060e1509e56ab67b72a0a2edbc6f9
Author: CosmosNi <[email protected]>
AuthorDate: Tue Apr 29 09:50:20 2025 +0800

    [Improve][Zeta] Add log to print the each task execute location (#9205)
---
 .../engine/server/dag/physical/ResourceUtils.java  |  3 ++-
 .../engine/server/dag/physical/SubPlan.java        | 24 +++++++++++++++++++++-
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
index c5db9909a5..3e2e1eac17 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
@@ -38,7 +38,7 @@ public class ResourceUtils {
 
     private static final ILogger LOGGER = 
Logger.getLogger(ResourceUtils.class);
 
-    public static void applyResourceForPipeline(
+    public static Map<TaskGroupLocation, SlotProfile> applyResourceForPipeline(
             @NonNull JobMaster jobMaster, @NonNull SubPlan subPlan) {
 
         Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new 
HashMap<>();
@@ -65,6 +65,7 @@ public class ResourceUtils {
         if (futures.size() != slotProfiles.size()) {
             throw new NoEnoughResourceException();
         }
+        return slotProfiles;
     }
 
     private static void allocateResources(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 2dd170956d..abace69bfe 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -625,11 +625,33 @@ public class SubPlan {
                 break;
             case SCHEDULED:
                 try {
-                    ResourceUtils.applyResourceForPipeline(jobMaster, this);
+                    Map<TaskGroupLocation, SlotProfile> slotProfiles =
+                            ResourceUtils.applyResourceForPipeline(jobMaster, 
this);
                     log.debug(
                             "slotProfiles: {}, PipelineLocation: {}",
                             slotProfiles,
                             this.getPipelineLocation());
+
+                    // Log task execution locations for the entire pipeline
+                    if (slotProfiles != null && !slotProfiles.isEmpty()) {
+                        log.info(
+                                "Resource allocation for pipeline {} 
completed. Task execution locations:",
+                                getPipelineFullName());
+                        slotProfiles.forEach(
+                                (taskLocation, slotProfile) -> {
+                                    if (slotProfile != null) {
+                                        log.info(
+                                                "  Task [{}] will be executed 
on worker [{}], slotID [{}], resourceProfile [{}], sequence [{}], assigned 
[{}]",
+                                                taskLocation,
+                                                slotProfile.getWorker(),
+                                                slotProfile.getSlotID(),
+                                                
slotProfile.getResourceProfile(),
+                                                slotProfile.getSequence(),
+                                                slotProfile.getOwnerJobID());
+                                    }
+                                });
+                    }
+
                     updatePipelineState(PipelineStatus.DEPLOYING);
                 } catch (Exception e) {
                     makePipelineFailing(e);

Reply via email to