This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b5496a46d0 [Improve][Zeta] Add log to print the each task execute
location (#9205)
b5496a46d0 is described below
commit b5496a46d0a060e1509e56ab67b72a0a2edbc6f9
Author: CosmosNi <[email protected]>
AuthorDate: Tue Apr 29 09:50:20 2025 +0800
[Improve][Zeta] Add log to print the each task execute location (#9205)
---
.../engine/server/dag/physical/ResourceUtils.java | 3 ++-
.../engine/server/dag/physical/SubPlan.java | 24 +++++++++++++++++++++-
2 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
index c5db9909a5..3e2e1eac17 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
@@ -38,7 +38,7 @@ public class ResourceUtils {
private static final ILogger LOGGER =
Logger.getLogger(ResourceUtils.class);
- public static void applyResourceForPipeline(
+ public static Map<TaskGroupLocation, SlotProfile> applyResourceForPipeline(
@NonNull JobMaster jobMaster, @NonNull SubPlan subPlan) {
Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new
HashMap<>();
@@ -65,6 +65,7 @@ public class ResourceUtils {
if (futures.size() != slotProfiles.size()) {
throw new NoEnoughResourceException();
}
+ return slotProfiles;
}
private static void allocateResources(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 2dd170956d..abace69bfe 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -625,11 +625,33 @@ public class SubPlan {
break;
case SCHEDULED:
try {
- ResourceUtils.applyResourceForPipeline(jobMaster, this);
+ Map<TaskGroupLocation, SlotProfile> slotProfiles =
+ ResourceUtils.applyResourceForPipeline(jobMaster,
this);
log.debug(
"slotProfiles: {}, PipelineLocation: {}",
slotProfiles,
this.getPipelineLocation());
+
+ // Log task execution locations for the entire pipeline
+ if (slotProfiles != null && !slotProfiles.isEmpty()) {
+ log.info(
+ "Resource allocation for pipeline {}
completed. Task execution locations:",
+ getPipelineFullName());
+ slotProfiles.forEach(
+ (taskLocation, slotProfile) -> {
+ if (slotProfile != null) {
+ log.info(
+ " Task [{}] will be executed
on worker [{}], slotID [{}], resourceProfile [{}], sequence [{}], assigned
[{}]",
+ taskLocation,
+ slotProfile.getWorker(),
+ slotProfile.getSlotID(),
+
slotProfile.getResourceProfile(),
+ slotProfile.getSequence(),
+ slotProfile.getOwnerJobID());
+ }
+ });
+ }
+
updatePipelineState(PipelineStatus.DEPLOYING);
} catch (Exception e) {
makePipelineFailing(e);