This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f03753348 [BugFix][Zeta] Fix finding TaskGroup deployment node bug 
(#4449)
f03753348 is described below

commit f03753348e6de80e41121016ed685659fd44182d
Author: ic4y <[email protected]>
AuthorDate: Sat Apr 1 15:37:26 2023 +0800

    [BugFix][Zeta] Fix finding TaskGroup deployment node bug (#4449)
---
 .../common/config/server/ServerConfigOptions.java  |  2 +-
 .../server/checkpoint/CheckpointManager.java       | 10 +++++++-
 .../seatunnel/engine/server/master/JobMaster.java  | 29 +++++++++++-----------
 .../server/scheduler/PipelineBaseScheduler.java    |  5 +++-
 .../operation/GetTaskGroupAddressOperation.java    |  5 +---
 5 files changed, 29 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index c49929586..b5d02c034 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -49,7 +49,7 @@ public class ServerConfigOptions {
     public static final Option<Integer> JOB_METRICS_BACKUP_INTERVAL =
             Options.key("job-metrics-backup-interval")
                     .intType()
-                    .defaultValue(60)
+                    .defaultValue(10)
                     .withDescription("The interval (in seconds) of job metrics 
backups");
 
     public static final Option<ThreadShareMode> 
TASK_EXECUTION_THREAD_SHARE_MODE =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 3e18a458e..971266e09 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -269,11 +269,19 @@ public class CheckpointManager {
     }
 
     protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation 
operation) {
+        log.debug(
+                "Sead Operation : "
+                        + operation.getClass().getSimpleName()
+                        + " to "
+                        + jobMaster.queryTaskGroupAddress(
+                                
operation.getTaskLocation().getTaskGroupLocation())
+                        + " for task group:"
+                        + operation.getTaskLocation().getTaskGroupLocation());
         return NodeEngineUtil.sendOperationToMemberNode(
                 nodeEngine,
                 operation,
                 jobMaster.queryTaskGroupAddress(
-                        
operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
+                        operation.getTaskLocation().getTaskGroupLocation()));
     }
 
     /**
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 0b840e22c..23d4010d3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -78,7 +78,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 
@@ -400,23 +399,23 @@ public class JobMaster {
         removeJobIMap();
     }
 
-    public Address queryTaskGroupAddress(long taskGroupId) {
-        for (PipelineLocation pipelineLocation : 
ownedSlotProfilesIMap.keySet()) {
-            Optional<TaskGroupLocation> currentVertex =
-                    
ownedSlotProfilesIMap.get(pipelineLocation).keySet().stream()
-                            .filter(
-                                    taskGroupLocation ->
-                                            taskGroupLocation.getTaskGroupId() 
== taskGroupId)
-                            .findFirst();
-            if (currentVertex.isPresent()) {
-                return ownedSlotProfilesIMap
-                        .get(pipelineLocation)
-                        .get(currentVertex.get())
-                        .getWorker();
+    public Address queryTaskGroupAddress(TaskGroupLocation taskGroupLocation) {
+
+        PipelineLocation pipelineLocation =
+                new PipelineLocation(
+                        taskGroupLocation.getJobId(), 
taskGroupLocation.getPipelineId());
+
+        Map<TaskGroupLocation, SlotProfile> taskGroupLocationSlotProfileMap =
+                ownedSlotProfilesIMap.get(pipelineLocation);
+
+        if (null != taskGroupLocationSlotProfileMap) {
+            SlotProfile slotProfile = 
taskGroupLocationSlotProfileMap.get(taskGroupLocation);
+            if (null != slotProfile) {
+                return slotProfile.getWorker();
             }
         }
         throw new IllegalArgumentException(
-                "can't find task group address from task group id: " + 
taskGroupId);
+                "can't find task group address from taskGroupLocation: " + 
taskGroupLocation);
     }
 
     public ClassLoader getClassLoader() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 2bc3b2a5b..fc6aff87f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -97,7 +97,10 @@ public class PipelineBaseScheduler implements JobScheduler {
                             pipeline,
                             
jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation()));
 
-            log.debug("slotProfiles: {}", slotProfiles);
+            log.debug(
+                    "slotProfiles: {}, PipelineLocation: {}",
+                    slotProfiles,
+                    pipeline.getPipelineLocation());
 
             // To ensure release pipeline resource after new master node 
active, we need store
             // slotProfiles first and then deploy tasks.
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 50a887176..ecb94e85c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -52,10 +52,7 @@ public class GetTaskGroupAddressOperation extends Operation 
implements Identifie
                         () ->
                                 server.getCoordinatorService()
                                         .getJobMaster(taskLocation.getJobId())
-                                        .queryTaskGroupAddress(
-                                                taskLocation
-                                                        .getTaskGroupLocation()
-                                                        .getTaskGroupId()),
+                                        
.queryTaskGroupAddress(taskLocation.getTaskGroupLocation()),
                         new RetryUtils.RetryMaterial(
                                 Constant.OPERATION_RETRY_TIME,
                                 true,

Reply via email to