This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4351a25f2a Add execute function to handle the workflow instance 
operation (#13610)
4351a25f2a is described below

commit 4351a25f2a1d0eb965ebb0db080c46575b0b0e25
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Feb 24 13:47:41 2023 +0800

    Add execute function to handle the workflow instance operation (#13610)
---
 .../api/controller/ExecutorController.java         |   2 -
 .../dolphinscheduler/api/enums/ExecuteType.java    |  35 ++-
 .../apache/dolphinscheduler/api/enums/Status.java  |   2 +-
 .../api/executor/ExecuteClient.java                |  62 +++++
 .../api/executor/ExecuteContext.java               |  51 +++++
 .../ExecuteFunction.java}                          |  34 ++-
 .../api/executor/ExecuteFunctionBuilder.java       |  15 +-
 .../api/executor/ExecuteRequest.java               |   5 +-
 .../api/executor/ExecuteResult.java                |   5 +-
 .../api/executor/ExecuteRuntimeException.java      |  46 ++++
 .../recovery/FailureRecoveryExecuteFunction.java   |  64 ++++++
 .../FailureRecoveryExecuteFunctionBuilder.java     |  59 +++++
 .../failure/recovery/FailureRecoveryRequest.java   |  29 +--
 .../failure/recovery/FailureRecoveryResult.java    |  13 +-
 .../instance/pause/pause/PauseExecuteFunction.java |  80 +++++++
 .../pause/pause/PauseExecuteFunctionBuilder.java   |  60 +++++
 .../instance/pause/pause/PauseExecuteRequest.java  |  30 +--
 .../instance/pause/pause/PauseExecuteResult.java   |  12 +-
 .../pause/recover/RecoverExecuteFunction.java      |  78 +++++++
 .../recover/RecoverExecuteFunctionBuilder.java     |  60 +++++
 .../pause/recover/RecoverExecuteRequest.java       |  29 +--
 .../pause/recover/RecoverExecuteResult.java        |  12 +-
 .../rerun/RepeatRunningExecuteFunction.java        |  93 ++++++++
 .../rerun/RepeatRunningExecuteFunctionBuilder.java |  59 +++++
 .../instance/rerun/RepeatRunningRequest.java       |  27 +--
 .../instance/rerun/RepeatRunningResult.java        |  13 +-
 .../instance/stop/StopExecuteFunction.java         |  87 +++++++
 .../instance/stop/StopExecuteFunctionBuilder.java  |  56 +++++
 .../workflow/instance/stop/StopRequest.java        |  16 +-
 .../workflow/instance/stop/StopResult.java         |  16 +-
 .../dolphinscheduler/api/rpc/ApiRpcClient.java     |  29 ++-
 .../api/service/ProcessDefinitionService.java      |  11 +-
 .../api/service/ProcessInstanceService.java        |   2 +
 .../api/service/ProjectService.java                |   5 +-
 .../api/service/impl/ExecutorServiceImpl.java      | 252 +++++----------------
 .../service/impl/ProcessDefinitionServiceImpl.java |  20 ++
 .../service/impl/ProcessInstanceServiceImpl.java   |   9 +
 .../api/service/impl/ProjectServiceImpl.java       |   6 +
 ...est.java => ExecuteFunctionControllerTest.java} |   2 +-
 ...ceTest.java => ExecuteFunctionServiceTest.java} |  49 ++--
 .../dolphinscheduler/dao/entity/Command.java       |  21 +-
 .../dao/repository/ProcessDefinitionLogDao.java    |   4 +
 .../dao/repository/ProcessInstanceDao.java         |   1 +
 .../impl/ProcessDefinitionLogDaoImpl.java          |   7 +
 .../repository/impl/ProcessInstanceDaoImpl.java    |   5 +
 .../plugin/task/api/utils/ShellUtils.java          |   7 +-
 46 files changed, 1217 insertions(+), 363 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 16ec43dbdd..4f474821a7 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -306,8 +306,6 @@ public class ExecutorController extends BaseController {
                           @Parameter(name = "projectCode", description = 
"PROJECT_CODE", required = true) @PathVariable long projectCode,
                           @RequestParam("processInstanceId") Integer 
processInstanceId,
                           @RequestParam("executeType") ExecuteType 
executeType) {
-        log.info("Start to execute process instance, projectCode:{}, 
processInstanceId:{}.", projectCode,
-                processInstanceId);
         Map<String, Object> result = execService.execute(loginUser, 
projectCode, processInstanceId, executeType);
         return returnDataList(result);
     }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
index fea69bf22d..ff3f122a23 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
@@ -30,14 +30,43 @@ public enum ExecuteType {
      * 4 stop
      * 5 pause
      */
-    NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, 
START_FAILURE_TASK_PROCESS, STOP, PAUSE, EXECUTE_TASK;
+    NONE(0, "NONE"),
+
+    // ******************************* Workflow ***************************
+    REPEAT_RUNNING(1, "REPEAT_RUNNING"),
+    RECOVER_SUSPENDED_PROCESS(2, "RECOVER_SUSPENDED_PROCESS"),
+    START_FAILURE_TASK_PROCESS(3, "START_FAILURE_TASK_PROCESS"),
+    STOP(4, "STOP"),
+    PAUSE(5, "PAUSE"),
+    // ******************************* Workflow ***************************
+
+    // ******************************* Task *******************************
+    EXECUTE_TASK(6, "EXECUTE_TASK"),
+    // ******************************* Task *******************************
+    ;
+
+    private final int code;
+    private final String desc;
+
+    ExecuteType(int code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
 
     public static ExecuteType getEnum(int value) {
         for (ExecuteType e : ExecuteType.values()) {
-            if (e.ordinal() == value) {
+            if (e.getCode() == value) {
                 return e;
             }
         }
-        return null;
+        return NONE;
     }
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 97db8eb78f..168d0f94ae 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -532,7 +532,7 @@ public enum Status {
 
     NO_CURRENT_OPERATING_PERMISSION(1400001, "The current user does not have 
this permission.", "当前用户无此权限"),
     FUNCTION_DISABLED(1400002, "The current feature is disabled.", "当前功能已被禁用"),
-    SCHEDULE_TIME_NUMBER(1400003, "The number of complement dates exceed 
100.", "补数日期个数超过100"),
+    SCHEDULE_TIME_NUMBER_EXCEED(1400003, "The number of complement dates 
exceed 100.", "补数日期个数超过100"),
     DESCRIPTION_TOO_LONG_ERROR(1400004, "description is too long error", 
"描述过长"),
     DELETE_WORKER_GROUP_BY_ID_FAIL_ENV(1400005,
             "delete worker group fail, for there are [{0}] enviroments 
using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}");
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java
new file mode 100644
index 0000000000..f8f685b4b3
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * This is the main class for executing workflow/workflowInstance/tasks.
+ * <pre>
+ *     ExecuteContext executeContext = ExecuteContext.builder()
+ *         .processInstance(processInstance)
+ *         .executeType(...)
+ *         .build();
+ *     executeClient.execute(executeContext);
+ * </pre>
+ */
+@Component
+@SuppressWarnings("unchecked")
+public class ExecuteClient {
+
+    private final Map<ExecuteType, ExecuteFunctionBuilder> 
executorFunctionBuilderMap;
+
+    public ExecuteClient(List<ExecuteFunctionBuilder> executeFunctionBuilders) 
{
+        executorFunctionBuilderMap = executeFunctionBuilders.stream()
+                
.collect(Collectors.toMap(ExecuteFunctionBuilder::getExecuteType, 
Function.identity()));
+    }
+
+    public ExecuteResult executeWorkflowInstance(ExecuteContext 
executeContext) throws ExecuteRuntimeException {
+        ExecuteFunctionBuilder<ExecuteRequest, ExecuteResult> 
executeFunctionBuilder = checkNotNull(
+                
executorFunctionBuilderMap.get(executeContext.getExecuteType()),
+                String.format("The executeType: %s is not supported", 
executeContext.getExecuteType()));
+
+        return 
executeFunctionBuilder.createWorkflowInstanceExecuteFunction(executeContext)
+                
.thenCombine(executeFunctionBuilder.createWorkflowInstanceExecuteRequest(executeContext),
+                        ExecuteFunction::execute)
+                .join();
+    }
+}
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java
new file mode 100644
index 0000000000..84c100b019
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import lombok.Data;
+
+// todo: to be interface
+@Data
+public class ExecuteContext {
+
+    private final ProcessInstance workflowInstance;
+
+    private final ProcessDefinition workflowDefinition;
+
+    private final User executeUser;
+
+    private final ExecuteType executeType;
+
+    public ExecuteContext(ProcessInstance workflowInstance,
+                          ProcessDefinition workflowDefinition,
+                          User executeUser,
+                          ExecuteType executeType) {
+        this.workflowInstance = checkNotNull(workflowInstance, 
"workflowInstance cannot be null");
+        this.workflowDefinition = checkNotNull(workflowDefinition, 
"workflowDefinition cannot be null");
+        this.executeUser = checkNotNull(executeUser, "executeUser cannot be 
null");
+        this.executeType = checkNotNull(executeType, "executeType cannot be 
null");
+    }
+
+}
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java
similarity index 58%
copy from 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java
index fea69bf22d..8945c0d678 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java
@@ -15,29 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.api.enums;
+package org.apache.dolphinscheduler.api.executor;
 
-/**
- * execute type
- */
-public enum ExecuteType {
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+
+public interface ExecuteFunction<Request extends ExecuteRequest, Result 
extends ExecuteResult> {
 
     /**
-     * operation type
-     * 1 repeat running
-     * 2 resume pause
-     * 3 resume failure
-     * 4 stop
-     * 5 pause
+     * Execute the workflow by the given request.
+     *
+     * @param request execute request
+     * @return execute result
+     * @throws ExecuteRuntimeException If there is an exception during 
execution, it will be thrown.
      */
-    NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, 
START_FAILURE_TASK_PROCESS, STOP, PAUSE, EXECUTE_TASK;
+    Result execute(Request request) throws ExecuteRuntimeException;
 
-    public static ExecuteType getEnum(int value) {
-        for (ExecuteType e : ExecuteType.values()) {
-            if (e.ordinal() == value) {
-                return e;
-            }
-        }
-        return null;
-    }
+    /**
+     * @return the type of the executor
+     */
+    ExecuteType getExecuteType();
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java
similarity index 60%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java
index 7ba19d6892..49dc9e9f8d 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java
@@ -15,9 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor;
 
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface ExecuteFunctionBuilder<Request extends ExecuteRequest, Result 
extends ExecuteResult> {
+
+    CompletableFuture<ExecuteFunction<Request, Result>> 
createWorkflowInstanceExecuteFunction(ExecuteContext executeContext);
+
+    CompletableFuture<Request> 
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext);
+
+    ExecuteType getExecuteType();
 
-    void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java
similarity index 83%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java
index 7ba19d6892..d5b02c5eba 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor;
 
-public interface ProcessDefinitionLogDao {
+public interface ExecuteRequest {
 
-    void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java
similarity index 83%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java
index 7ba19d6892..52ae3c8d5f 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor;
 
-public interface ProcessDefinitionLogDao {
+public interface ExecuteResult {
 
-    void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java
new file mode 100644
index 0000000000..1cdebb0952
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor;
+
+// todo: implement from DolphinSchedulerRuntimeException
+public class ExecuteRuntimeException extends RuntimeException {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String EXECUTE_WORKFLOW_INSTANCE_ERROR =
+            "Execute workflow instance %s failed, execute type is %s";
+
+    public ExecuteRuntimeException(String message) {
+        super(message);
+    }
+
+    public ExecuteRuntimeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public static ExecuteRuntimeException 
executeWorkflowInstanceError(ExecuteContext executeContext) {
+        return executeWorkflowInstanceError(executeContext, null);
+    }
+
+    public static ExecuteRuntimeException 
executeWorkflowInstanceError(ExecuteContext executeContext, Throwable cause) {
+        return new ExecuteRuntimeException(
+                String.format(EXECUTE_WORKFLOW_INSTANCE_ERROR, 
executeContext.getWorkflowInstance().getName(),
+                        executeContext.getExecuteType()),
+                cause);
+    }
+}
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java
new file mode 100644
index 0000000000..546e632b80
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+public class FailureRecoveryExecuteFunction implements 
ExecuteFunction<FailureRecoveryRequest, FailureRecoveryResult> {
+
+    private final CommandService commandService;
+
+    public FailureRecoveryExecuteFunction(CommandService commandService) {
+        this.commandService = commandService;
+    }
+
+    @Override
+    public FailureRecoveryResult execute(FailureRecoveryRequest request) 
throws ExecuteRuntimeException {
+        ProcessInstance workflowInstance = request.getWorkflowInstance();
+        if (!workflowInstance.getState().isFailure()) {
+            throw new ExecuteRuntimeException(
+                    String.format("The workflow instance: %s status is %s, can 
not be recovered",
+                            workflowInstance.getName(), 
workflowInstance.getState()));
+        }
+
+        Command command = Command.builder()
+                .commandType(CommandType.START_FAILURE_TASK_PROCESS)
+                
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
+                
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
+                .processInstanceId(workflowInstance.getId())
+                .executorId(request.getExecuteUser().getId())
+                .testFlag(workflowInstance.getTestFlag())
+                .build();
+        if (commandService.createCommand(command) <= 0) {
+            throw new ExecuteRuntimeException(
+                    "Failure recovery workflow instance failed, due to insert 
command to db failed");
+        }
+        return new FailureRecoveryResult(command.getId());
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return FailureRecoveryExecuteFunctionBuilder.EXECUTE_TYPE;
+    }
+}
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..1509220bee
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class FailureRecoveryExecuteFunctionBuilder
+        implements
+            ExecuteFunctionBuilder<FailureRecoveryRequest, 
FailureRecoveryResult> {
+
+    public static final ExecuteType EXECUTE_TYPE = 
ExecuteType.START_FAILURE_TASK_PROCESS;
+
+    @Autowired
+    private CommandService commandService;
+
+    @Override
+    public CompletableFuture<ExecuteFunction<FailureRecoveryRequest, 
FailureRecoveryResult>> createWorkflowInstanceExecuteFunction(ExecuteContext 
executeContext) {
+        return CompletableFuture.completedFuture(new 
FailureRecoveryExecuteFunction(commandService));
+    }
+
+    @Override
+    public CompletableFuture<FailureRecoveryRequest> 
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+        return CompletableFuture.completedFuture(
+                new FailureRecoveryRequest(
+                        executeContext.getWorkflowInstance(),
+                        executeContext.getWorkflowDefinition(),
+                        executeContext.getExecuteUser()));
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return EXECUTE_TYPE;
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java
similarity index 60%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java
index f9002c823d..e6e7231540 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java
@@ -15,27 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package 
org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
 
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
 
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-public interface ProcessInstanceDao {
+@Data
+@AllArgsConstructor
+public class FailureRecoveryRequest implements ExecuteRequest {
 
-    public int insertProcessInstance(ProcessInstance processInstance);
-
-    public int updateProcessInstance(ProcessInstance processInstance);
-
-    /**
-     * insert or update work process instance to database
-     *
-     * @param processInstance processInstance
-     */
-    public int upsertProcessInstance(ProcessInstance processInstance);
-
-    void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
-
-    void deleteById(Integer workflowInstanceId);
+    private final ProcessInstance workflowInstance;
+    private final ProcessDefinition workflowDefinition;
+    private final User executeUser;
 
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java
similarity index 71%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java
index 7ba19d6892..c75ed591c7 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java
@@ -15,9 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package 
org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
 
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
 
-    void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class FailureRecoveryResult implements ExecuteResult {
+
+    private final Integer commandId;
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
new file mode 100644
index 0000000000..83711660f8
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import 
org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+public class PauseExecuteFunction implements 
ExecuteFunction<PauseExecuteRequest, PauseExecuteResult> {
+
+    private final ProcessInstanceDao processInstanceDao;
+
+    private final ApiRpcClient apiRpcClient;
+
+    public PauseExecuteFunction(ProcessInstanceDao processInstanceDao, 
ApiRpcClient apiRpcClient) {
+        this.processInstanceDao = processInstanceDao;
+        this.apiRpcClient = apiRpcClient;
+    }
+
+    @Override
+    public PauseExecuteResult execute(PauseExecuteRequest request) throws 
ExecuteRuntimeException {
+        ProcessInstance workflowInstance = request.getWorkflowInstance();
+        if (!workflowInstance.getState().isRunning()) {
+            throw new ExecuteRuntimeException(
+                    String.format("The workflow instance: %s status is %s, can 
not pause", workflowInstance.getName(),
+                            workflowInstance.getState()));
+        }
+        workflowInstance.setCommandType(CommandType.PAUSE);
+        workflowInstance.addHistoryCmd(CommandType.PAUSE);
+        workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE,
+                CommandType.PAUSE.getDescp() + " by " + 
request.getExecuteUser().getUserName());
+
+        if (processInstanceDao.updateProcessInstance(workflowInstance) <= 0) {
+            throw new ExecuteRuntimeException(
+                    String.format(
+                            "The workflow instance: %s pause failed, due to 
update the workflow instance status in DB failed",
+                            workflowInstance.getName()));
+        }
+        WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new 
WorkflowStateEventChangeCommand(
+                workflowInstance.getId(), 0, workflowInstance.getState(), 
workflowInstance.getId(), 0);
+        try {
+            apiRpcClient.send(Host.of(workflowInstance.getHost()), 
workflowStateEventChangeCommand.convert2Command());
+        } catch (RemotingException e) {
+            throw new ExecuteRuntimeException(
+                    String.format(
+                            "The workflow instance: %s pause failed, due to 
send rpc request to master: %s failed",
+                            workflowInstance.getName(), 
workflowInstance.getHost()),
+                    e);
+        }
+        return new PauseExecuteResult(workflowInstance);
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return PauseExecuteFunctionBuilder.EXECUTE_TYPE;
+    }
+}
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..2e8cf21afd
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class PauseExecuteFunctionBuilder implements 
ExecuteFunctionBuilder<PauseExecuteRequest, PauseExecuteResult> {
+
+    public static final ExecuteType EXECUTE_TYPE = ExecuteType.PAUSE;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+    @Autowired
+    private ApiRpcClient apiRpcClient;
+
+    @Override
+    public CompletableFuture<ExecuteFunction<PauseExecuteRequest, 
PauseExecuteResult>> createWorkflowInstanceExecuteFunction(ExecuteContext 
executeContext) {
+        return CompletableFuture.completedFuture(new 
PauseExecuteFunction(processInstanceDao, apiRpcClient));
+    }
+
+    @Override
+    public CompletableFuture<PauseExecuteRequest> 
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+        return CompletableFuture.completedFuture(
+                new PauseExecuteRequest(
+                        executeContext.getWorkflowDefinition(),
+                        executeContext.getWorkflowInstance(),
+                        executeContext.getExecuteUser()));
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return EXECUTE_TYPE;
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java
similarity index 60%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java
index f9002c823d..02cb471961 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java
@@ -15,27 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
 
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
 
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-public interface ProcessInstanceDao {
-
-    public int insertProcessInstance(ProcessInstance processInstance);
-
-    public int updateProcessInstance(ProcessInstance processInstance);
-
-    /**
-     * insert or update work process instance to database
-     *
-     * @param processInstance processInstance
-     */
-    public int upsertProcessInstance(ProcessInstance processInstance);
-
-    void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
-
-    void deleteById(Integer workflowInstanceId);
+@Data
+@AllArgsConstructor
+public class PauseExecuteRequest implements ExecuteRequest {
 
+    private final ProcessDefinition processDefinition;
+    private final ProcessInstance workflowInstance;
+    private final User executeUser;
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java
similarity index 68%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java
index 7ba19d6892..38bbc03c07 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java
@@ -15,9 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
 
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 
-    void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+
+@AllArgsConstructor
+public class PauseExecuteResult implements ExecuteResult {
+
+    private final ProcessInstance processInstance;
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java
new file mode 100644
index 0000000000..149e1abd29
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
+
+import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+public class RecoverExecuteFunction implements 
ExecuteFunction<RecoverExecuteRequest, RecoverExecuteResult> {
+
+    private final CommandService commandService;
+
+    public RecoverExecuteFunction(CommandService commandService) {
+        this.commandService = commandService;
+    }
+
+    @Override
+    public RecoverExecuteResult execute(RecoverExecuteRequest request) throws 
ExecuteRuntimeException {
+        ProcessInstance workflowInstance = request.getWorkflowInstance();
+        if (!workflowInstance.getState().isPause()) {
+            throw new ExecuteRuntimeException(
+                    String.format("The workflow instance: %s state is %s, 
cannot recovery", workflowInstance.getName(),
+                            workflowInstance.getState()));
+        }
+        Command command = Command.builder()
+                .commandType(CommandType.RECOVER_SUSPENDED_PROCESS)
+                
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
+                
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
+                .processInstanceId(workflowInstance.getId())
+                
.commandParam(JSONUtils.toJsonString(createCommandParam(workflowInstance)))
+                .executorId(request.getExecuteUser().getId())
+                .testFlag(workflowInstance.getTestFlag())
+                .build();
+        if (commandService.createCommand(command) <= 0) {
+            throw new ExecuteRuntimeException(
+                    String.format("Recovery workflow instance: %s failed, due 
to insert command to db failed",
+                            workflowInstance.getName()));
+        }
+        return new RecoverExecuteResult(command);
+    }
+
+    private Map<String, Object> createCommandParam(ProcessInstance 
workflowInstance) {
+        return new ImmutableMap.Builder<String, Object>()
+                .put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, 
workflowInstance.getId())
+                .build();
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return RecoverExecuteFunctionBuilder.EXECUTE_TYPE;
+    }
+}
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..cbd88a0d2d
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class RecoverExecuteFunctionBuilder
+        implements
+            ExecuteFunctionBuilder<RecoverExecuteRequest, 
RecoverExecuteResult> {
+
+    public static final ExecuteType EXECUTE_TYPE = 
ExecuteType.RECOVER_SUSPENDED_PROCESS;
+
+    @Autowired
+    private CommandService commandService;
+
+    @Override
+    public CompletableFuture<ExecuteFunction<RecoverExecuteRequest, 
RecoverExecuteResult>> createWorkflowInstanceExecuteFunction(ExecuteContext 
executeContext) {
+        return CompletableFuture.completedFuture(
+                new RecoverExecuteFunction(commandService));
+    }
+
+    @Override
+    public CompletableFuture<RecoverExecuteRequest> 
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+        return CompletableFuture.completedFuture(
+                new RecoverExecuteRequest(
+                        executeContext.getWorkflowInstance(),
+                        executeContext.getWorkflowDefinition(),
+                        executeContext.getExecuteUser()));
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return EXECUTE_TYPE;
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java
similarity index 60%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java
index f9002c823d..8f8b8f4e84 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java
@@ -15,27 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package 
org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
 
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
 
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-public interface ProcessInstanceDao {
+@Data
+@AllArgsConstructor
+public class RecoverExecuteRequest implements ExecuteRequest {
 
-    public int insertProcessInstance(ProcessInstance processInstance);
-
-    public int updateProcessInstance(ProcessInstance processInstance);
-
-    /**
-     * insert or update work process instance to database
-     *
-     * @param processInstance processInstance
-     */
-    public int upsertProcessInstance(ProcessInstance processInstance);
-
-    void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
-
-    void deleteById(Integer workflowInstanceId);
+    private final ProcessInstance workflowInstance;
+    private final ProcessDefinition processDefinition;
+    private final User executeUser;
 
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java
similarity index 69%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java
index 7ba19d6892..882174cddd 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java
@@ -15,9 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package 
org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
 
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
+import org.apache.dolphinscheduler.dao.entity.Command;
 
-    void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+
+@AllArgsConstructor
+public class RecoverExecuteResult implements ExecuteResult {
+
+    private final Command command;
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java
new file mode 100644
index 0000000000..82c59b907f
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+public class RepeatRunningExecuteFunction implements 
ExecuteFunction<RepeatRunningRequest, RepeatRunningResult> {
+
+    private final CommandService commandService;
+
+    public RepeatRunningExecuteFunction(CommandService commandService) {
+        this.commandService = commandService;
+    }
+
+    @Override
+    public RepeatRunningResult execute(RepeatRunningRequest request) throws 
ExecuteRuntimeException {
+        checkNotNull(request, "request cannot be null");
+        // todo: check workflow definition valid? or we don't need to do this 
check, since we will check in master
+        // again.
+        // todo: check tenant valid? or we don't need to do this check, since 
we need to check in master again.
+        ProcessInstance workflowInstance = request.getWorkflowInstance();
+        if (workflowInstance.getState() == null || 
!workflowInstance.getState().isFinished()) {
+            throw new ExecuteRuntimeException(
+                    String.format("The workflow instance: %s status is %s, 
cannot repeat running",
+                            workflowInstance.getName(), 
workflowInstance.getState()));
+        }
+        Command command = Command.builder()
+                .commandType(CommandType.REPEAT_RUNNING)
+                
.commandParam(JSONUtils.toJsonString(createCommandParams(workflowInstance)))
+                
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
+                
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
+                .processInstanceId(workflowInstance.getId())
+                
.processInstancePriority(workflowInstance.getProcessInstancePriority())
+                .testFlag(workflowInstance.getTestFlag())
+                .build();
+        if (commandService.createCommand(command) <= 0) {
+            throw new ExecuteRuntimeException(
+                    String.format("Repeat running workflow instance: %s 
failed, due to insert command to db failed",
+                            workflowInstance.getName()));
+        }
+        return new RepeatRunningResult(command.getId());
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return RepeatRunningExecuteFunctionBuilder.EXECUTE_TYPE;
+    }
+
+    private Map<String, Object> createCommandParams(ProcessInstance 
workflowInstance) {
+        Map<String, Object> commandMap =
+                JSONUtils.parseObject(workflowInstance.getCommandParam(), new 
TypeReference<Map<String, Object>>() {
+                });
+        Map<String, Object> repeatRunningCommandParams = new HashMap<>();
+        Optional.ofNullable(MapUtils.getObject(commandMap, 
CMD_PARAM_START_PARAMS))
+                .ifPresent(startParams -> 
repeatRunningCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
+        repeatRunningCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, 
workflowInstance.getId());
+        return repeatRunningCommandParams;
+    }
+}
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..f5363f90da
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class RepeatRunningExecuteFunctionBuilder
+        implements
+            ExecuteFunctionBuilder<RepeatRunningRequest, RepeatRunningResult> {
+
+    public static final ExecuteType EXECUTE_TYPE = ExecuteType.REPEAT_RUNNING;
+
+    @Autowired
+    private CommandService commandService;
+
+    @Override
+    public CompletableFuture<ExecuteFunction<RepeatRunningRequest, 
RepeatRunningResult>> createWorkflowInstanceExecuteFunction(ExecuteContext 
executeContext) {
+        return CompletableFuture.completedFuture(new 
RepeatRunningExecuteFunction(commandService));
+    }
+
+    @Override
+    public CompletableFuture<RepeatRunningRequest> 
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+        return CompletableFuture.completedFuture(
+                new RepeatRunningRequest(
+                        executeContext.getWorkflowInstance(),
+                        executeContext.getWorkflowDefinition(),
+                        executeContext.getExecuteUser()));
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return EXECUTE_TYPE;
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java
similarity index 60%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java
index f9002c823d..c0631190c2 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java
@@ -15,27 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
 
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
 
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-public interface ProcessInstanceDao {
+@Data
+@AllArgsConstructor
+public class RepeatRunningRequest implements ExecuteRequest {
 
-    public int insertProcessInstance(ProcessInstance processInstance);
+    private final ProcessInstance workflowInstance;
 
-    public int updateProcessInstance(ProcessInstance processInstance);
+    private final ProcessDefinition processDefinition;
 
-    /**
-     * insert or update work process instance to database
-     *
-     * @param processInstance processInstance
-     */
-    public int upsertProcessInstance(ProcessInstance processInstance);
-
-    void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
-
-    void deleteById(Integer workflowInstanceId);
+    private final User executeUser;
 
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java
similarity index 71%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java
index 7ba19d6892..9fa7ed0004 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java
@@ -15,9 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
 
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
 
-    void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class RepeatRunningResult implements ExecuteResult {
+
+    private final Integer commandId;
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
new file mode 100644
index 0000000000..2e37423e9a
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import 
org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class StopExecuteFunction implements ExecuteFunction<StopRequest, 
StopResult> {
+
+    private final ProcessInstanceDao processInstanceDao;
+    // todo: Use ApiRpcClient instead of NettyRemotingClient
+    private final ApiRpcClient apiRpcClient;
+
+    public StopExecuteFunction(ProcessInstanceDao processInstanceDao, 
ApiRpcClient apiRpcClient) {
+        this.processInstanceDao = processInstanceDao;
+        this.apiRpcClient = apiRpcClient;
+    }
+
+    @Override
+    public StopResult execute(StopRequest request) throws 
ExecuteRuntimeException {
+        ProcessInstance workflowInstance = request.getWorkflowInstance();
+
+        if (!workflowInstance.getState().canStop()
+                || workflowInstance.getState() == 
WorkflowExecutionStatus.READY_STOP) {
+            throw new ExecuteRuntimeException(
+                    String.format("The workflow instance: %s status is %s, can 
not be stopped",
+                            workflowInstance.getName(), 
workflowInstance.getState()));
+        }
+        // update the workflow instance's status to stop
+        workflowInstance.setCommandType(CommandType.STOP);
+        workflowInstance.addHistoryCmd(CommandType.STOP);
+        workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, 
CommandType.STOP.getDescp() + " by user");
+        if (processInstanceDao.updateProcessInstance(workflowInstance) > 0) {
+            log.info("Workflow instance {} ready to stop success, will call 
master to stop the workflow instance",
+                    workflowInstance.getName());
+            // todo: Use specific stop command instead of 
WorkflowStateEventChangeCommand
+            WorkflowStateEventChangeCommand workflowStateEventChangeCommand = 
new WorkflowStateEventChangeCommand(
+                    workflowInstance.getId(), 0, workflowInstance.getState(), 
workflowInstance.getId(), 0);
+            try {
+                apiRpcClient.send(Host.of(workflowInstance.getHost()),
+                        workflowStateEventChangeCommand.convert2Command());
+            } catch (RemotingException e) {
+                throw new ExecuteRuntimeException(
+                        String.format("Workflow instance: %s stop failed, due 
to send request to master: %s failed",
+                                workflowInstance.getName(), 
workflowInstance.getHost()),
+                        e);
+            }
+            // todo: use async and inject the completeFuture in the result.
+            return new StopResult(workflowInstance);
+        }
+        throw new ExecuteRuntimeException(
+                "Workflow instance stop failed, due to update the workflow 
instance status failed");
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return StopExecuteFunctionBuilder.EXECUTE_TYPE;
+    }
+
+}
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..2fc06493f0
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class StopExecuteFunctionBuilder implements 
ExecuteFunctionBuilder<StopRequest, StopResult> {
+
+    public static final ExecuteType EXECUTE_TYPE = ExecuteType.STOP;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+    @Autowired
+    private ApiRpcClient apiRpcClient;
+
+    @Override
+    public CompletableFuture<ExecuteFunction<StopRequest, StopResult>> 
createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) {
+        return CompletableFuture.completedFuture(new 
StopExecuteFunction(processInstanceDao, apiRpcClient));
+    }
+
+    @Override
+    public CompletableFuture<StopRequest> 
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+        return CompletableFuture.completedFuture(new 
StopRequest(executeContext.getWorkflowInstance()));
+    }
+
+    @Override
+    public ExecuteType getExecuteType() {
+        return EXECUTE_TYPE;
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java
similarity index 65%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java
index 7ba19d6892..55a4b3a108 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java
@@ -15,9 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
 
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 
-    void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NonNull;
+
+@Data
+@AllArgsConstructor
+public class StopRequest implements ExecuteRequest {
+
+    @NonNull
+    private final ProcessInstance workflowInstance;
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java
similarity index 65%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java
index 7ba19d6892..cf7ddb237e 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java
@@ -15,9 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
 
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 
-    void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NonNull;
+
+@Data
+@AllArgsConstructor
+public class StopResult implements ExecuteResult {
+
+    @NonNull
+    private final ProcessInstance workflowInstance;
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java
similarity index 51%
copy from 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
copy to 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java
index 47a1747c4f..17f24dc67e 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java
@@ -15,22 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.dao.repository.impl;
+package org.apache.dolphinscheduler.api.rpc;
 
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
-import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
 
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
+import org.springframework.stereotype.Component;
 
-@Repository
-public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao {
+@Component
+public class ApiRpcClient {
 
-    @Autowired
-    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+    private final NettyRemotingClient nettyRemotingClient;
 
-    @Override
-    public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
-        
processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode);
+    public ApiRpcClient() {
+        this.nettyRemotingClient = new NettyRemotingClient(new 
NettyClientConfig());
     }
+
+    public void send(Host host, Command command) throws RemotingException {
+        nettyRemotingClient.send(host, command);
+    }
+
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 0fc682d403..12cc8a1829 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import javax.servlet.http.HttpServletResponse;
 
@@ -146,18 +147,22 @@ public interface ProcessDefinitionService {
      * Get resource workflow
      *
      * @param loginUser login user
-     * @param code process definition code
+     * @param code      process definition code
      * @return Process definition Object
      */
     ProcessDefinition getProcessDefinition(User loginUser,
                                            long code);
 
+    Optional<ProcessDefinition> queryWorkflowDefinition(long 
workflowDefinitionCode, int workflowDefinitionVersion);
+    ProcessDefinition queryWorkflowDefinitionThrowExceptionIfNotFound(long 
workflowDefinitionCode,
+                                                                      int 
workflowDefinitionVersion);
+
     /**
      * query detail of process definition
      *
-     * @param loginUser login user
+     * @param loginUser   login user
      * @param projectCode project code
-     * @param name process definition name
+     * @param name        process definition name
      * @return process definition detail
      */
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index eb760722fa..f866fc9313 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -57,6 +57,8 @@ public interface ProcessInstanceService {
                                                  long projectCode,
                                                  Integer processId);
 
+    ProcessInstance queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer 
processId);
+
     /**
      * query process instance by id
      *
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
index 54a66c79ec..61e8540eb8 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.User;
@@ -66,7 +67,9 @@ public interface ProjectService {
      */
     Map<String, Object> checkProjectAndAuth(User loginUser, Project project, 
long projectCode, String perm);
 
-    void checkProjectAndAuthThrowException(User loginUser, Project project, 
String permission);
+    void checkProjectAndAuthThrowException(User loginUser, Project project, 
String permission) throws ServiceException;
+
+    void checkProjectAndAuthThrowException(User loginUser, long projectCode, 
String permission) throws ServiceException;
 
     boolean hasProjectAndPerm(User loginUser, Project project, Map<String, 
Object> result, String perm);
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 533a254eea..3f2a5f87df 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
@@ -34,8 +36,11 @@ import 
org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.executor.ExecuteClient;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.service.MonitorService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.service.WorkerGroupService;
 import org.apache.dolphinscheduler.common.constants.Constants;
@@ -91,7 +96,6 @@ import 
org.apache.dolphinscheduler.service.process.TriggerRelationService;
 
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.time.ZonedDateTime;
@@ -101,6 +105,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -111,7 +116,6 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.Lists;
 
 /**
@@ -144,10 +148,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
     private ProcessService processService;
 
     @Autowired
-    private CommandService commandService;
+    private ProcessInstanceDao processInstanceDao;
 
     @Autowired
-    private ProcessInstanceDao processInstanceDao;
+    private ProcessDefinitionService processDefinitionService;
+
+    @Autowired
+    private CommandService commandService;
 
     @Autowired
     private TaskDefinitionLogMapper taskDefinitionLogMapper;
@@ -169,6 +176,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
 
     @Autowired
     private TriggerRelationService triggerRelationService;
+
+    @Autowired
+    private ExecuteClient executeClient;
+
     /**
      * execute process instance
      *
@@ -236,15 +247,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
             return result;
         }
 
-        if (!checkScheduleTimeNum(commandType, cronTime)) {
-            putMsg(result, Status.SCHEDULE_TIME_NUMBER);
-            return result;
-        }
-
-        // check master exists
-        if (!checkMasterExists(result)) {
-            return result;
-        }
+        checkScheduleTimeNumExceed(commandType, cronTime);
+        checkMasterExists();
 
         long triggerCode = CodeGenerateUtils.getInstance().genCode();
 
@@ -275,46 +279,31 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         return result;
     }
 
-    /**
-     * check whether master exists
-     *
-     * @param result result
-     * @return master exists return true , otherwise return false
-     */
-    private boolean checkMasterExists(Map<String, Object> result) {
+    private void checkMasterExists() {
         // check master server exists
         List<Server> masterServers = 
monitorService.getServerListFromRegistry(true);
 
         // no master
         if (masterServers.isEmpty()) {
-            log.error("Master does not exist.");
-            putMsg(result, Status.MASTER_NOT_EXISTS);
-            return false;
+            throw new ServiceException(Status.MASTER_NOT_EXISTS);
         }
-        return true;
     }
 
-    /**
-     * @param complementData
-     * @param cronTime
-     * @return CommandType is COMPLEMENT_DATA and cronTime's number is not 
greater than 100 return true , otherwise return false
-     */
-    private boolean checkScheduleTimeNum(CommandType complementData, String 
cronTime) {
+    private void checkScheduleTimeNumExceed(CommandType complementData, String 
cronTime) {
         if (!CommandType.COMPLEMENT_DATA.equals(complementData)) {
-            return true;
+            return;
         }
         if (cronTime == null) {
-            return true;
+            return;
         }
         Map<String, String> cronMap = JSONUtils.toMap(cronTime);
         if (cronMap.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) 
{
             String[] stringDates = 
cronMap.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA);
             if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) {
                 log.warn("Parameter cornTime is bigger than {}.", 
SCHEDULE_TIME_MAX_LENGTH);
-                return false;
+                throw new ServiceException(Status.SCHEDULE_TIME_NUMBER_EXCEED);
             }
         }
-        return true;
     }
 
     /**
@@ -322,19 +311,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
      *
      * @param projectCode       project code
      * @param processDefinition process definition
-     * @param processDefineCode process definition code
-     * @param version           process instance version
      */
     @Override
     public void checkProcessDefinitionValid(long projectCode, 
ProcessDefinition processDefinition,
                                             long processDefineCode, Integer 
version) {
         // check process definition exists
-        if (processDefinition == null || projectCode != 
processDefinition.getProjectCode()) {
-            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, 
String.valueOf(processDefineCode));
+        if (projectCode != processDefinition.getProjectCode()) {
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, 
processDefinition.getCode());
         }
         // check process definition online
         if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
-            throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, 
String.valueOf(processDefineCode), version);
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, 
processDefinition.getCode(),
+                    processDefinition.getVersion());
         }
         // check sub process definition online
         if (!checkSubProcessDefinitionValid(processDefinition)) {
@@ -381,8 +369,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
 
     /**
      * do action to process instance:pause, stop, repeat, recover from pause, 
recover from stop,rerun failed task
-    
-    
      *
      * @param loginUser         login user
      * @param projectCode       project code
@@ -391,103 +377,35 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
      * @return execute result code
      */
     @Override
-    public Map<String, Object> execute(User loginUser, long projectCode, 
Integer processInstanceId,
+    public Map<String, Object> execute(User loginUser,
+                                       long projectCode,
+                                       Integer processInstanceId,
                                        ExecuteType executeType) {
-        Project project = projectMapper.queryByCode(projectCode);
-        // check user access for project
+        checkNotNull(processInstanceId, "workflowInstanceId cannot be null");
+        checkNotNull(executeType, "executeType cannot be null");
 
-        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode,
+        // check user access for project
+        projectService.checkProjectAndAuthThrowException(loginUser, 
projectCode,
                 ApiFuncIdentificationConstant.map.get(executeType));
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
-        }
+        checkMasterExists();
 
-        // check master exists
-        if (!checkMasterExists(result)) {
-            return result;
-        }
+        ProcessInstance workflowInstance =
+                
Optional.ofNullable(processInstanceDao.queryByWorkflowInstanceId(processInstanceId))
+                        .orElseThrow(() -> new 
ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
 
-        ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(processInstanceId)
-                .orElseThrow(() -> new 
ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
+        checkState(workflowInstance.getProjectCode() == projectCode,
+                "The workflow instance's project code doesn't equals to the 
given project");
+        ProcessDefinition processDefinition = 
processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(
+                workflowInstance.getProcessDefinitionCode(), 
workflowInstance.getProcessDefinitionVersion());
 
-        ProcessDefinition processDefinition =
-                
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-                        processInstance.getProcessDefinitionVersion());
-        processDefinition.setReleaseState(ReleaseState.ONLINE);
-        if (executeType != ExecuteType.STOP && executeType != 
ExecuteType.PAUSE) {
-            this.checkProcessDefinitionValid(projectCode, processDefinition, 
processInstance.getProcessDefinitionCode(),
-                    processInstance.getProcessDefinitionVersion());
-        }
+        executeClient.executeWorkflowInstance(new ExecuteContext(
+                workflowInstance,
+                processDefinition,
+                loginUser,
+                executeType));
 
-        result = checkExecuteType(processInstance, executeType);
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
-        }
-        if (!checkTenantSuitable(processDefinition)) {
-            log.error(
-                    "There is not any valid tenant for the process definition, 
processDefinitionId:{}, processDefinitionCode:{}, ",
-                    processDefinition.getId(), processDefinition.getName());
-            putMsg(result, Status.TENANT_NOT_SUITABLE);
-        }
-
-        // get the startParams user specified at the first starting while 
repeat running is needed
-        Map<String, Object> commandMap =
-                JSONUtils.parseObject(processInstance.getCommandParam(), new 
TypeReference<Map<String, Object>>() {
-                });
-        String startParams = null;
-        if (MapUtils.isNotEmpty(commandMap) && executeType == 
ExecuteType.REPEAT_RUNNING) {
-            Object startParamsJson = commandMap.get(CMD_PARAM_START_PARAMS);
-            if (startParamsJson != null) {
-                startParams = startParamsJson.toString();
-            }
-        }
-
-        switch (executeType) {
-            case REPEAT_RUNNING:
-                result = insertCommand(loginUser, processInstanceId, 
processDefinition.getCode(),
-                        processDefinition.getVersion(), 
CommandType.REPEAT_RUNNING, startParams,
-                        processInstance.getTestFlag());
-                break;
-            case RECOVER_SUSPENDED_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, 
processDefinition.getCode(),
-                        processDefinition.getVersion(), 
CommandType.RECOVER_SUSPENDED_PROCESS, startParams,
-                        processInstance.getTestFlag());
-                break;
-            case START_FAILURE_TASK_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, 
processDefinition.getCode(),
-                        processDefinition.getVersion(), 
CommandType.START_FAILURE_TASK_PROCESS, startParams,
-                        processInstance.getTestFlag());
-                break;
-            case STOP:
-                if (processInstance.getState() == 
WorkflowExecutionStatus.READY_STOP) {
-                    log.warn("Process instance status is already {}, 
processInstanceName:{}.",
-                            WorkflowExecutionStatus.READY_STOP.getDesc(), 
processInstance.getName());
-                    putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, 
processInstance.getName(),
-                            processInstance.getState());
-                } else {
-                    result =
-                            updateProcessInstancePrepare(processInstance, 
CommandType.STOP,
-                                    WorkflowExecutionStatus.READY_STOP);
-                }
-                break;
-            case PAUSE:
-                if (processInstance.getState() == 
WorkflowExecutionStatus.READY_PAUSE) {
-                    log.warn("Process instance status is already {}, 
processInstanceName:{}.",
-                            WorkflowExecutionStatus.READY_STOP.getDesc(), 
processInstance.getName());
-                    putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, 
processInstance.getName(),
-                            processInstance.getState());
-                } else {
-                    result = updateProcessInstancePrepare(processInstance, 
CommandType.PAUSE,
-                            WorkflowExecutionStatus.READY_PAUSE);
-                }
-                break;
-            default:
-                log.warn("Unknown execute type for process instance, 
processInstanceId:{}.",
-                        processInstance.getId());
-                putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown 
execute type");
-
-                break;
-        }
+        Map<String, Object> result = new HashMap<>();
+        result.put(Constants.STATUS, Status.SUCCESS);
         return result;
     }
 
@@ -504,10 +422,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
     @Override
     public Map<String, Object> execute(User loginUser, Integer 
workflowInstanceId, ExecuteType executeType) {
         ProcessInstance processInstance = 
processInstanceMapper.selectById(workflowInstanceId);
-        ProcessDefinition processDefinition =
-                
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
-
-        return execute(loginUser, processDefinition.getProjectCode(), 
workflowInstanceId, executeType);
+        return execute(loginUser, processInstance.getProjectCode(), 
workflowInstanceId, executeType);
     }
 
     /**
@@ -633,10 +548,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
             return result;
         }
 
-        // check master exists
-        if (!checkMasterExists(result)) {
-            return result;
-        }
+        checkMasterExists();
         return forceStart(processInstance, taskGroupQueue);
     }
 
@@ -773,62 +685,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         return result;
     }
 
-    /**
-     * insert command, used in the implementation of the page, rerun, recovery 
(pause / failure) execution
-     *
-     * @param loginUser             login user
-     * @param instanceId            instance id
-     * @param processDefinitionCode process definition code
-     * @param processVersion
-     * @param commandType           command type
-     * @return insert result code
-     */
-    private Map<String, Object> insertCommand(User loginUser, Integer 
instanceId, long processDefinitionCode,
-                                              int processVersion, CommandType 
commandType, String startParams,
-                                              int testFlag) {
-        Map<String, Object> result = new HashMap<>();
-
-        // To add startParams only when repeat running is needed
-        Map<String, Object> cmdParam = new HashMap<>();
-        cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId);
-        if (!StringUtils.isEmpty(startParams)) {
-            cmdParam.put(CMD_PARAM_START_PARAMS, startParams);
-        }
-
-        Command command = new Command();
-        command.setCommandType(commandType);
-        command.setProcessDefinitionCode(processDefinitionCode);
-        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-        command.setExecutorId(loginUser.getId());
-        command.setProcessDefinitionVersion(processVersion);
-        command.setProcessInstanceId(instanceId);
-        command.setTestFlag(testFlag);
-        if (!commandService.verifyIsNeedCreateCommand(command)) {
-            log.warn(
-                    "Process instance is executing the command, 
processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
-                    processDefinitionCode, processVersion, instanceId);
-            putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, 
String.valueOf(processDefinitionCode));
-            return result;
-        }
-
-        log.info("Creating command, commandInfo:{}.", command);
-        int create = commandService.createCommand(command);
-
-        if (create > 0) {
-            log.info("Create {} command complete, processDefinitionCode:{}, 
processDefinitionVersion:{}.",
-                    command.getCommandType().getDescp(), 
command.getProcessDefinitionCode(), processVersion);
-            putMsg(result, Status.SUCCESS);
-        } else {
-            log.error(
-                    "Execute process instance failed because create {} command 
error, processDefinitionCode:{}, processDefinitionVersion:{}, 
processInstanceId:{}.",
-                    command.getCommandType().getDescp(), 
command.getProcessDefinitionCode(), processVersion,
-                    instanceId);
-            putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
-        }
-
-        return result;
-    }
-
     /**
      * check whether sub processes are offline before starting process 
definition
      *
@@ -1319,11 +1175,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
             return result;
         }
 
-        // check master exists
-        if (!checkMasterExists(result)) {
-            return result;
-        }
-
+        checkMasterExists();
         // todo dispatch improvement
         List<Server> masterServerList = 
monitorService.getServerListFromRegistry(true);
         Host host = new Host(masterServerList.get(0).getHost(), 
masterServerList.get(0).getPort());
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index c7e36efb3a..b8def57492 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -236,6 +236,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     @Autowired
     private TaskDefinitionMapper taskDefinitionMapper;
 
+    @Lazy
     @Autowired
     private SchedulerService schedulerService;
 
@@ -724,6 +725,25 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         return processDefinition;
     }
 
+    @Override
+    public Optional<ProcessDefinition> queryWorkflowDefinition(long 
workflowDefinitionCode,
+                                                               int 
workflowDefinitionVersion) {
+        ProcessDefinition workflowDefinition = 
processDefinitionDao.queryByCode(workflowDefinitionCode).orElse(null);
+        if (workflowDefinition == null || workflowDefinition.getVersion() != 
workflowDefinitionVersion) {
+            workflowDefinition = 
processDefinitionLogDao.queryProcessDefinitionLog(workflowDefinitionCode,
+                    workflowDefinitionVersion);
+        }
+        return Optional.ofNullable(workflowDefinition);
+    }
+
+    @Override
+    public ProcessDefinition 
queryWorkflowDefinitionThrowExceptionIfNotFound(long workflowDefinitionCode,
+                                                                             
int workflowDefinitionVersion) {
+        return queryWorkflowDefinition(workflowDefinitionCode, 
workflowDefinitionVersion)
+                .orElseThrow(() -> new 
ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
+                        String.valueOf(workflowDefinitionCode)));
+    }
+
     @Override
     public Map<String, Object> queryProcessDefinitionByName(User loginUser, 
long projectCode, String name) {
         Project project = projectMapper.queryByCode(projectCode);
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 9d151e528f..2f7417a4cd 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -278,6 +278,15 @@ public class ProcessInstanceServiceImpl extends 
BaseServiceImpl implements Proce
         return result;
     }
 
+    @Override
+    public ProcessInstance 
queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer workflowInstanceId) {
+        ProcessInstance processInstance = 
processInstanceDao.queryByWorkflowInstanceId(workflowInstanceId);
+        if (processInstance == null) {
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, 
workflowInstanceId);
+        }
+        return processInstance;
+    }
+
     /**
      * query workflow instance by id
      *
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index bd1349f4fa..35aeacf798 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -242,6 +242,12 @@ public class ProjectServiceImpl extends BaseServiceImpl 
implements ProjectServic
         }
     }
 
+    @Override
+    public void checkProjectAndAuthThrowException(User loginUser, long 
projectCode, String permission) {
+        Project project = projectMapper.queryByCode(projectCode);
+        checkProjectAndAuthThrowException(loginUser, project, permission);
+    }
+
     @Override
     public boolean hasProjectAndPerm(User loginUser, Project project, 
Map<String, Object> result, String permission) {
         boolean checkResult = false;
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
similarity index 99%
rename from 
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
rename to 
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
index 68ec7a5c0d..2f5cfb960a 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
@@ -55,7 +55,7 @@ import com.google.gson.JsonObject;
 /**
  * executor controller test
  */
-public class ExecutorControllerTest extends AbstractControllerTest {
+public class ExecuteFunctionControllerTest extends AbstractControllerTest {
 
     final Gson gson = new Gson();
     final long projectCode = 1L;
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
similarity index 94%
rename from 
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
rename to 
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
index fe19b5b402..21630b0e1d 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
@@ -31,8 +31,8 @@ import 
org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.executor.ExecuteClient;
 import 
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
-import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
 import org.apache.dolphinscheduler.common.constants.Constants;
@@ -64,6 +64,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.process.TriggerRelationService;
@@ -96,11 +97,9 @@ import org.slf4j.LoggerFactory;
  */
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
-public class ExecutorServiceTest {
+public class ExecuteFunctionServiceTest {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ExecutorServiceTest.class);
-
-    private static final Logger baseServiceLogger = 
LoggerFactory.getLogger(BaseServiceImpl.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(ExecuteFunctionServiceTest.class);
 
     @Mock
     private ResourcePermissionCheckService resourcePermissionCheckService;
@@ -147,6 +146,15 @@ public class ExecutorServiceTest {
     @Mock
     private TriggerRelationService triggerRelationService;
 
+    @Mock
+    private ExecuteClient executeClient;
+
+    @Mock
+    private ProcessInstanceDao processInstanceDao;
+
+    @Mock
+    private ProcessDefinitionService processDefinitionService;
+
     private int processDefinitionId = 1;
 
     private int processDefinitionVersion = 1;
@@ -195,6 +203,7 @@ public class ExecutorServiceTest {
 
         // processInstance
         processInstance.setId(processInstanceId);
+        processInstance.setProjectCode(projectCode);
         processInstance.setState(WorkflowExecutionStatus.FAILURE);
         processInstance.setExecutorId(userId);
         processInstance.setTenantId(tenantId);
@@ -453,25 +462,39 @@ public class ExecutorServiceTest {
     public void testNoMasterServers() {
         
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new 
ArrayList<>());
 
-        Map<String, Object> result = 
executorService.execProcessInstance(loginUser, projectCode,
+        Assertions.assertThrows(ServiceException.class, () -> 
executorService.execProcessInstance(
+                loginUser,
+                projectCode,
                 processDefinitionCode,
                 "{\"complementStartDate\":\"2020-01-01 
00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
                 CommandType.COMPLEMENT_DATA,
-                null, null,
-                null, null, null,
+                null,
+                null,
+                null,
+                null,
+                null,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO,
+                Priority.LOW,
+                Constants.DEFAULT_WORKER_GROUP,
+                100L,
+                110,
+                null,
+                0,
+                Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
-                ComplementDependentMode.OFF_MODE, null);
-        Assertions.assertEquals(result.get(Constants.STATUS), 
Status.MASTER_NOT_EXISTS);
+                ComplementDependentMode.OFF_MODE,
+                null));
 
     }
 
     @Test
     public void testExecuteRepeatRunning() {
-        
Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode, RERUN))
+        
when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
+        when(projectService.checkProjectAndAuth(loginUser, project, 
projectCode, RERUN))
                 .thenReturn(checkProjectAndAuth());
+        
when(processInstanceDao.queryByWorkflowInstanceId(processInstanceId)).thenReturn(processInstance);
+        
when(processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(processDefinitionCode,
+                processDefinitionVersion)).thenReturn(processDefinition);
         Map<String, Object> result =
                 executorService.execute(loginUser, projectCode, 
processInstanceId, ExecuteType.REPEAT_RUNNING);
         Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index 290be4377c..12d0a69fb7 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -25,7 +25,10 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
 
 import java.util.Date;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
@@ -33,6 +36,9 @@ import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 
 @Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 @TableName("t_ds_command")
 public class Command {
 
@@ -55,10 +61,10 @@ public class Command {
     private String commandParam;
 
     @TableField("task_depend_type")
-    private TaskDependType taskDependType;
+    private TaskDependType taskDependType = TaskDependType.TASK_POST;
 
     @TableField("failure_strategy")
-    private FailureStrategy failureStrategy;
+    private FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
 
     @TableField("warning_type")
     private WarningType warningType;
@@ -70,13 +76,13 @@ public class Command {
     private Date scheduleTime;
 
     @TableField("start_time")
-    private Date startTime;
+    private Date startTime = new Date();
 
     @TableField("process_instance_priority")
     private Priority processInstancePriority;
 
     @TableField("update_time")
-    private Date updateTime;
+    private Date updateTime = new Date();
 
     @TableField("worker_group")
     private String workerGroup;
@@ -99,13 +105,6 @@ public class Command {
     @TableField("test_flag")
     private int testFlag;
 
-    public Command() {
-        this.taskDependType = TaskDependType.TASK_POST;
-        this.failureStrategy = FailureStrategy.CONTINUE;
-        this.startTime = new Date();
-        this.updateTime = new Date();
-    }
-
     public Command(
                    CommandType commandType,
                    TaskDependType taskDependType,
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
index 7ba19d6892..4f32d142cd 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
@@ -17,7 +17,11 @@
 
 package org.apache.dolphinscheduler.dao.repository;
 
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
 public interface ProcessDefinitionLogDao {
 
+    ProcessDefinitionLog queryProcessDefinitionLog(long 
workflowDefinitionCode, int workflowDefinitionVersion);
+
     void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index f9002c823d..b249e265e7 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -38,4 +38,5 @@ public interface ProcessInstanceDao {
 
     void deleteById(Integer workflowInstanceId);
 
+    ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
index 47a1747c4f..0f757193fb 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.dao.repository.impl;
 
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
 
@@ -29,6 +30,12 @@ public class ProcessDefinitionLogDaoImpl implements 
ProcessDefinitionLogDao {
     @Autowired
     private ProcessDefinitionLogMapper processDefinitionLogMapper;
 
+    @Override
+    public ProcessDefinitionLog queryProcessDefinitionLog(long 
workflowDefinitionCode, int workflowDefinitionVersion) {
+        return 
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(workflowDefinitionCode,
+                workflowDefinitionVersion);
+    }
+
     @Override
     public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
         
processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode);
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index 2832e0a3f4..ae36ca924e 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -69,4 +69,9 @@ public class ProcessInstanceDaoImpl implements 
ProcessInstanceDao {
     public void deleteById(Integer workflowInstanceId) {
         processInstanceMapper.deleteById(workflowInstanceId);
     }
+
+    @Override
+    public ProcessInstance queryByWorkflowInstanceId(Integer 
workflowInstanceId) {
+        return processInstanceMapper.selectById(workflowInstanceId);
+    }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java
index 7988bd0459..595823f7b9 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.api.utils;
 
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -30,8 +32,9 @@ import lombok.experimental.UtilityClass;
 public class ShellUtils {
 
     public List<String> ENV_SOURCE_LIST = Arrays.stream(
-            
Optional.ofNullable(PropertyUtils.getString("shell.env_source_list"))
-                    .map(s -> s.split(",")).orElse(new String[0]))
+            
Optional.ofNullable(PropertyUtils.getString("shell.env_source_list")).map(s -> 
s.split(","))
+                    .orElse(new String[0]))
             .map(String::trim)
+            .filter(StringUtils::isNotBlank)
             .collect(Collectors.toList());
 }

Reply via email to