This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 44ddb6908edba75163485fcaed7156d75bb17b3f
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Jul 13 22:32:30 2022 +0800

    Fix kill yarn job error when failover caused by doesn't set 
ProcessDefinition (#10948)
    
    (cherry picked from commit b245e7c973a5282a1405097605a4afacac1b5eab)
---
 .../master/service/MasterFailoverService.java       | 21 +++++++++++++++------
 .../master/service/WorkerFailoverService.java       |  9 +++++----
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index c7e5b4ea13..61ba7c3fd6 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -114,16 +115,18 @@ public class MasterFailoverService {
      * @param masterHost master host
      */
     private void doFailoverMaster(@NonNull String masterHost) {
-        LOGGER.info("Master[{}] failover starting, need to failover process", 
masterHost);
         StopWatch failoverTimeCost = StopWatch.createStarted();
 
-        Optional<Date> masterStartupTimeOptional =
-            
getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost);
-        List<ProcessInstance> needFailoverProcessInstanceList =
-            processService.queryNeedFailoverProcessInstances(masterHost);
+        Optional<Date> masterStartupTimeOptional = 
getServerStartupTime(registryClient.getServerList(NodeType.MASTER),
+                                                                        
masterHost);
+        List<ProcessInstance> needFailoverProcessInstanceList = 
processService.queryNeedFailoverProcessInstances(
+            masterHost);
+        if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) {
+            return;
+        }
 
         LOGGER.info(
-            "Master[{}] failover there are {} workflowInstance may need to 
failover, will do a deep check, workflowInstanceIds: {}",
+            "Master[{}] failover starting there are {} workflowInstance may 
need to failover, will do a deep check, workflowInstanceIds: {}",
             masterHost,
             needFailoverProcessInstanceList.size(),
             
needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
@@ -136,6 +139,11 @@ public class MasterFailoverService {
                     LOGGER.info("WorkflowInstance doesn't need to failover");
                     continue;
                 }
+                // todo: use batch query
+                ProcessDefinition processDefinition
+                    = 
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+                                                           
processInstance.getProcessDefinitionVersion());
+                processInstance.setProcessDefinition(processDefinition);
                 int processInstanceId = processInstance.getId();
                 List<TaskInstance> taskInstanceList = 
processService.findValidTaskListByProcessId(processInstanceId);
                 for (TaskInstance taskInstance : taskInstanceList) {
@@ -205,6 +213,7 @@ public class MasterFailoverService {
             TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
                 .buildProcessInstanceRelatedInfo(processInstance)
+                
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
                 .create();
 
             if (masterConfig.isKillYarnJobWhenTaskFailover()) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index ec126a3ec3..402ec43354 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -115,10 +115,10 @@ public class WorkerFailoverService {
         for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
             
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(),
 taskInstance.getId());
             try {
-                ProcessInstance processInstance =
-                    
processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k 
-> {
-                        WorkflowExecuteRunnable workflowExecuteRunnable =
-                            
cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId());
+                ProcessInstance processInstance = 
processInstanceCacheMap.computeIfAbsent(
+                    taskInstance.getProcessInstanceId(), k -> {
+                        WorkflowExecuteRunnable workflowExecuteRunnable = 
cacheManager.getByProcessInstanceId(
+                            taskInstance.getProcessInstanceId());
                         if (workflowExecuteRunnable == null) {
                             return null;
                         }
@@ -167,6 +167,7 @@ public class WorkerFailoverService {
             TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
                 .buildProcessInstanceRelatedInfo(processInstance)
+                
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
                 .create();
 
             if (masterConfig.isKillYarnJobWhenTaskFailover()) {

Reply via email to