This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new aa8b88a875 [Feature-10871] add workflow executing data query (#10875)
aa8b88a875 is described below

commit aa8b88a875f1abc16c22c7d81c77e31d855a2e21
Author: caishunfeng <[email protected]>
AuthorDate: Mon Jul 11 16:55:55 2022 +0800

    [Feature-10871] add workflow executing data query (#10875)
    
    * add workflow executing data query
    * fix sonar check for interrupted
---
 .../api/controller/ExecutorController.java         |  39 ++++--
 .../api/controller/ProcessInstanceController.java  |  48 ++++---
 .../apache/dolphinscheduler/api/enums/Status.java  |   1 +
 .../api/service/ExecutorService.java               |   8 ++
 .../api/service/impl/ExecutorServiceImpl.java      |  25 ++++
 .../service/impl/ProcessInstanceServiceImpl.java   |  66 ++++-----
 .../controller/WorkflowExecuteController.java      |  51 +++++++
 .../WorkflowExecutingDataRequestProcessor.java     |  65 +++++++++
 .../server/master/rpc/MasterRPCServer.java         |   5 +
 .../server/master/service/ExecutingService.java    |  75 ++++++++++
 .../remote/command/CommandType.java                |  12 +-
 .../WorkflowExecutingDataRequestCommand.java       |  50 +++++++
 .../WorkflowExecutingDataResponseCommand.java      |  51 +++++++
 .../remote/dto/TaskInstanceExecuteDto.java         | 109 +++++++++++++++
 .../remote/dto/WorkflowExecuteDto.java             | 154 +++++++++++++++++++++
 .../processor/StateEventCallbackService.java       |  36 +++--
 16 files changed, 719 insertions(+), 76 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 90c4ecab51..aa48c67448 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.controller;
 import static 
org.apache.dolphinscheduler.api.enums.Status.BATCH_EXECUTE_PROCESS_INSTANCE_ERROR;
 import static 
org.apache.dolphinscheduler.api.enums.Status.CHECK_PROCESS_DEFINITION_ERROR;
 import static 
org.apache.dolphinscheduler.api.enums.Status.EXECUTE_PROCESS_INSTANCE_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_EXECUTING_WORKFLOW_ERROR;
 import static 
org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANCE_ERROR;
 
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
@@ -38,11 +39,23 @@ import 
org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestAttribute;
@@ -58,16 +71,6 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import springfox.documentation.annotations.ApiIgnore;
 
-import org.apache.commons.lang3.StringUtils;
-
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
 /**
  * executor controller
  */
@@ -361,4 +364,20 @@ public class ExecutorController extends BaseController {
         Map<String, Object> result = 
execService.startCheckByProcessDefinedCode(processDefinitionCode);
         return returnDataList(result);
     }
+
+    /**
+     * query execute data of processInstance from master
+     */
+    @ApiOperation(value = "queryExecutingWorkflow", notes = 
"QUERY_WORKFLOW_EXECUTE_DATA")
+    @ApiImplicitParams({
+        @ApiImplicitParam(name = "processInstanceId", value = 
"PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
+    })
+    @GetMapping(value = "/query-executing-workflow")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(QUERY_EXECUTING_WORKFLOW_ERROR)
+    @AccessLogAnnotation
+    public Result queryExecutingWorkflow(@RequestParam("id") Integer 
processInstanceId) {
+        WorkflowExecuteDto workflowExecuteDto = 
execService.queryExecutingWorkflowByProcessInstanceId(processInstanceId);
+        return Result.success(workflowExecuteDto);
+    }
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index 47a9f2183c..90711242f1 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -17,19 +17,17 @@
 
 package org.apache.dolphinscheduler.api.controller;
 
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import static 
org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_BY_ID_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_ERROR;
+import static 
org.apache.dolphinscheduler.api.enums.Status.UPDATE_PROCESS_INSTANCE_ERROR;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ApiException;
@@ -40,6 +38,16 @@ import 
org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -54,17 +62,13 @@ import 
org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
 import springfox.documentation.annotations.ApiIgnore;
-import static 
org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR;
-import static 
org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR;
-import static 
org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR;
-import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR;
-import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR;
-import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_BY_ID_ERROR;
-import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR;
-import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR;
-import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_ERROR;
-import static 
org.apache.dolphinscheduler.api.enums.Status.UPDATE_PROCESS_INSTANCE_ERROR;
 
 /**
  * process instance controller
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 91a45a60a5..25212caafb 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -216,6 +216,7 @@ public enum Status {
     QUERY_AUTHORIZED_USER(10183, "query authorized user error", 
"查询拥有项目权限的用户错误"),
     PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh 
page.", "该项目不存在,请刷新页面"),
     TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", 
"任务实例host为空"),
+    QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", 
"查询运行的工作流实例错误"),
 
     UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
     UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 2d5868b251..b5cf04bd5e 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -27,6 +27,7 @@ import 
org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
 
 import java.util.Map;
 
@@ -111,4 +112,11 @@ public interface ExecutorService {
      * @return
      */
     Map<String, Object> forceStartTaskInstance(User loginUser, int queueId);
+
+    /**
+     * query executing workflow data in Master memory
+     * @param processInstanceId
+     * @return
+     */
+    WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer 
processInstanceId);
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 2bdd7eb6f4..a414891d63 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -69,6 +69,9 @@ import 
org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
+import 
org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand;
+import 
org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.cron.CronUtils;
@@ -991,4 +994,26 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         }
         return null;
     }
+
+    /**
+     * query executing data of processInstance by master
+     * @param processInstanceId
+     * @return
+     */
+    @Override
+    public WorkflowExecuteDto 
queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
+        ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(processInstanceId);
+        if (processInstance == null) {
+            return null;
+        }
+        Host host = new Host(processInstance.getHost());
+        WorkflowExecutingDataRequestCommand requestCommand = new 
WorkflowExecutingDataRequestCommand();
+        requestCommand.setProcessInstanceId(processInstanceId);
+        org.apache.dolphinscheduler.remote.command.Command command = 
stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
+        if (command == null) {
+            return null;
+        }
+        WorkflowExecutingDataResponseCommand responseCommand = 
JSONUtils.parseObject(command.getBody(), 
WorkflowExecutingDataResponseCommand.class);
+        return responseCommand.getWorkflowExecuteDto();
+    }
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 19f4df3333..2335ff4c88 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -17,26 +17,17 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
+import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
+import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
+import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static 
org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
+import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
 import org.apache.dolphinscheduler.api.dto.gantt.Task;
 import org.apache.dolphinscheduler.api.enums.Status;
@@ -51,7 +42,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -81,21 +71,34 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
-import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
-import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
-import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
-import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
-import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
-import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static 
org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
-import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 
 /**
  * process instance service impl
@@ -459,7 +462,7 @@ public class ProcessInstanceServiceImpl extends 
BaseServiceImpl implements Proce
                                                      String locations, int 
timeout, String tenantCode) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,INSTANCE_UPDATE );
+        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, 
projectCode,INSTANCE_UPDATE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -833,5 +836,4 @@ public class ProcessInstanceServiceImpl extends 
BaseServiceImpl implements Proce
     public List<ProcessInstance> queryByProcessDefineCode(Long 
processDefinitionCode, int size) {
         return 
processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size);
     }
-
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java
new file mode 100644
index 0000000000..793dc40439
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.controller;
+
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+import org.apache.dolphinscheduler.server.master.service.ExecutingService;
+
+import java.util.Optional;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/workflow/execute")
+public class WorkflowExecuteController {
+
+    @Autowired
+    private ExecutingService executingService;
+
+    /**
+     * query workflow execute data in memory
+     * @param processInstanceId
+     * @return
+     */
+    @GetMapping("")
+    @ResponseStatus(HttpStatus.OK)
+    public WorkflowExecuteDto queryExecuteData(@RequestParam("id") int 
processInstanceId) {
+        Optional<WorkflowExecuteDto> workflowExecuteDtoOptional = 
executingService.queryWorkflowExecutingData(processInstanceId);
+        return workflowExecuteDtoOptional.orElse(null);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java
new file mode 100644
index 0000000000..c8f70d96d0
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import 
org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand;
+import 
org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.service.ExecutingService;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+/**
+ * workflow executing data process from api/master
+ */
+@Component
+public class WorkflowExecutingDataRequestProcessor implements 
NettyRequestProcessor {
+
+    private final Logger logger = 
LoggerFactory.getLogger(WorkflowExecutingDataRequestProcessor.class);
+
+    @Autowired
+    private ExecutingService executingService;
+
+    @Override
+    public void process(Channel channel, Command command) {
+        
Preconditions.checkArgument(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST == 
command.getType(), String.format("invalid command type: %s", 
command.getType()));
+
+        WorkflowExecutingDataRequestCommand requestCommand = 
JSONUtils.parseObject(command.getBody(), 
WorkflowExecutingDataRequestCommand.class);
+
+        logger.info("received command, processInstanceId:{}", 
requestCommand.getProcessInstanceId());
+
+        Optional<WorkflowExecuteDto> workflowExecuteDtoOptional = 
executingService.queryWorkflowExecutingData(requestCommand.getProcessInstanceId());
+
+        WorkflowExecutingDataResponseCommand responseCommand = new 
WorkflowExecutingDataResponseCommand();
+        
workflowExecuteDtoOptional.ifPresent(responseCommand::setWorkflowExecuteDto);
+        
channel.writeAndFlush(responseCommand.convert2Command(command.getOpaque()));
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index e0b65dd077..49d40a4b04 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -29,6 +29,7 @@ import 
org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponsePr
 import 
org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
+import 
org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
 
 import javax.annotation.PostConstruct;
 
@@ -74,6 +75,9 @@ public class MasterRPCServer implements AutoCloseable {
     @Autowired
     private LoggerRequestProcessor loggerRequestProcessor;
 
+    @Autowired
+    private WorkflowExecutingDataRequestProcessor 
workflowExecutingDataRequestProcessor;
+
     @PostConstruct
     private void init() {
         // init remoting server
@@ -88,6 +92,7 @@ public class MasterRPCServer implements AutoCloseable {
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,
 taskEventProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, 
cacheProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, 
taskRecallProcessor);
+        
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST,
 workflowExecutingDataRequestProcessor);
 
         // logger server
         
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, 
loggerRequestProcessor);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java
new file mode 100644
index 0000000000..fa85b66f82
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.dto.TaskInstanceExecuteDto;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import 
org.apache.dolphinscheduler.server.master.controller.WorkflowExecuteController;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.compress.utils.Lists;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * executing service, to query executing data from memory, such workflow 
instance
+ */
+@Component
+public class ExecutingService {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(WorkflowExecuteController.class);
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    public Optional<WorkflowExecuteDto> queryWorkflowExecutingData(Integer 
processInstanceId) {
+        WorkflowExecuteRunnable workflowExecuteRunnable = 
processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteRunnable == null) {
+            logger.info("workflow execute data not found, maybe it has 
finished, workflow id:{}", processInstanceId);
+            return Optional.empty();
+        }
+        try {
+            WorkflowExecuteDto workflowExecuteDto = new WorkflowExecuteDto();
+            BeanUtils.copyProperties(workflowExecuteDto, 
workflowExecuteRunnable.getProcessInstance());
+            List<TaskInstanceExecuteDto> taskInstanceList = 
Lists.newArrayList();
+            if 
(CollectionUtils.isNotEmpty(workflowExecuteRunnable.getAllTaskInstances())) {
+                for (TaskInstance taskInstance : 
workflowExecuteRunnable.getAllTaskInstances()) {
+                    TaskInstanceExecuteDto taskInstanceExecuteDto = new 
TaskInstanceExecuteDto();
+                    BeanUtils.copyProperties(taskInstanceExecuteDto, 
taskInstance);
+                    taskInstanceList.add(taskInstanceExecuteDto);
+                }
+            }
+            workflowExecuteDto.setTaskInstances(taskInstanceList);
+            return Optional.of(workflowExecuteDto);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+            logger.error("query workflow execute data fail, workflow id:{}", 
processInstanceId, e);
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index e540efddfc..43a64fc559 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -158,5 +158,15 @@ public enum CommandType {
     /**
      * task state event request
      */
-    TASK_WAKEUP_EVENT_REQUEST;
+    TASK_WAKEUP_EVENT_REQUEST,
+
+    /**
+     * workflow executing data request, from api to master
+     */
+    WORKFLOW_EXECUTING_DATA_REQUEST,
+
+    /**
+     * workflow executing data response, from master to api
+     */
+    WORKFLOW_EXECUTING_DATA_RESPONSE;
 }
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java
new file mode 100644
index 0000000000..6085322831
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * workflow executing data request, from api to master
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowExecutingDataRequestCommand implements Serializable {
+
+    private Integer processInstanceId;
+
+    /**
+     * package request command
+     *
+     * @return command
+     */
+    public Command convert2Command() {
+        Command command = new Command();
+        command.setType(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java
new file mode 100644
index 0000000000..2d2dfa20b0
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * workflow executing data response, from master to api
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkflowExecutingDataResponseCommand implements Serializable {
+
+    private WorkflowExecuteDto workflowExecuteDto;
+
+    /**
+     * package request command
+     *
+     * @return command
+     */
+    public Command convert2Command(long opaque) {
+        Command command = new Command(opaque);
+        command.setType(CommandType.WORKFLOW_EXECUTING_DATA_RESPONSE);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java
new file mode 100644
index 0000000000..4bfae1036d
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.dto;
+
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
+import java.util.Date;
+import java.util.Map;
+
+import lombok.Data;
+
+@Data
+public class TaskInstanceExecuteDto {
+
+    private int id;
+
+    private String name;
+
+    private String taskType;
+
+    private int processInstanceId;
+
+    private long taskCode;
+
+    private int taskDefinitionVersion;
+
+    private String processInstanceName;
+
+    private int taskGroupPriority;
+
+    private ExecutionStatus state;
+
+    private Date firstSubmitTime;
+
+    private Date submitTime;
+
+    private Date startTime;
+
+    private Date endTime;
+
+    private String host;
+
+    private String executePath;
+
+    private String logPath;
+
+    private int retryTimes;
+
+    private Flag alertFlag;
+
+    private int pid;
+
+    private String appLink;
+
+    private Flag flag;
+
+    private String duration;
+
+    private int maxRetryTimes;
+
+    private int retryInterval;
+
+    private Priority taskInstancePriority;
+
+    private Priority processInstancePriority;
+
+    private String workerGroup;
+
+    private Long environmentCode;
+
+    private String environmentConfig;
+
+    private int executorId;
+
+    private String varPool;
+
+    private String executorName;
+
+    private Map<String, String> resources;
+
+    private int delayTime;
+
+    private String taskParams;
+
+    private int dryRun;
+
+    private int taskGroupId;
+
+    private Integer cpuQuota;
+
+    private Integer memoryMax;
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java
new file mode 100644
index 0000000000..64d5542b94
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.dto;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
+import java.util.Collection;
+import java.util.Date;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class WorkflowExecuteDto {
+
+    private int id;
+
+    private String name;
+
+    private Long processDefinitionCode;
+
+    private int processDefinitionVersion;
+
+    private ExecutionStatus state;
+
+    /**
+     * recovery flag for failover
+     */
+    private Flag recovery;
+
+    private Date startTime;
+
+    private Date endTime;
+
+    private int runTimes;
+
+    private String host;
+
+    private CommandType commandType;
+
+    private String commandParam;
+
+    /**
+     * node depend type
+     */
+    private TaskDependType taskDependType;
+
+    private int maxTryTimes;
+
+    /**
+     * failure strategy when task failed.
+     */
+    private FailureStrategy failureStrategy;
+
+    /**
+     * warning type
+     */
+    private WarningType warningType;
+
+    private Integer warningGroupId;
+
+    private Date scheduleTime;
+
+    private Date commandStartTime;
+
+    /**
+     * user define parameters string
+     */
+    private String globalParams;
+
+    /**
+     * executor id
+     */
+    private int executorId;
+
+    /**
+     * executor name
+     */
+    private String executorName;
+
+    /**
+     * tenant code
+     */
+    private String tenantCode;
+
+    /**
+     * queue
+     */
+    private String queue;
+
+    /**
+     * process is sub process
+     */
+    private Flag isSubProcess;
+
+    /**
+     * history command
+     */
+    private String historyCmd;
+
+    /**
+     * depend processes schedule time
+     */
+    private String dependenceScheduleTimes;
+
+    private String duration;
+
+    private Priority processInstancePriority;
+
+    private String workerGroup;
+
+    private Long environmentCode;
+
+    private int timeout;
+
+    private int tenantId;
+
+    /**
+     * varPool string
+     */
+    private String varPool;
+
+    private int nextProcessInstanceId;
+
+    private int dryRun;
+
+    private Date restartTime;
+
+    private boolean isBlocked;
+
+    private Collection<TaskInstanceExecuteDto> taskInstances;
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
index be564261fb..32a0f4673d 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
@@ -17,11 +17,13 @@
 
 package org.apache.dolphinscheduler.remote.processor;
 
+import static 
org.apache.dolphinscheduler.common.Constants.HTTP_CONNECTION_REQUEST_TIMEOUT;
 import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.remote.utils.Host;
 
 import java.util.Optional;
@@ -110,17 +112,6 @@ public class StateEventCallbackService {
         REMOTE_CHANNELS.remove(host);
     }
 
-    /**
-     * Send the command to target address, this method doesn't guarantee the 
command send success.
-     *
-     * @param command command need tp send
-     */
-    public void sendResult(String address, int port, Command command) {
-        logger.info("send result, host:{}, command:{}", address, 
command.toString());
-        Host host = new Host(address, port);
-        sendResult(host, command);
-    }
-
     /**
      * Send the command to target host, this method doesn't guarantee the 
command send success.
      *
@@ -133,4 +124,27 @@ public class StateEventCallbackService {
             nettyRemoteChannel.writeAndFlush(command);
         });
     }
+
+    /**
+     * send sync and return response command
+     * @param host
+     * @param requestCommand
+     * @return
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    public Command sendSync(Host host, Command requestCommand) {
+        try {
+            return this.nettyRemotingClient.sendSync(host, requestCommand, 
HTTP_CONNECTION_REQUEST_TIMEOUT);
+        } catch (InterruptedException e) {
+            logger.error("send sync fail, host:{}, command:{}", host, 
requestCommand, e);
+            Thread.currentThread().interrupt();
+        } catch (RemotingException e) {
+            logger.error("send sync fail, host:{}, command:{}", host, 
requestCommand, e);
+        }
+        finally {
+            this.nettyRemotingClient.closeChannel(host);
+        }
+        return null;
+    }
 }

Reply via email to