This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4351a25f2a Add execute function to handle the workflow instance
operation (#13610)
4351a25f2a is described below
commit 4351a25f2a1d0eb965ebb0db080c46575b0b0e25
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Feb 24 13:47:41 2023 +0800
Add execute function to handle the workflow instance operation (#13610)
---
.../api/controller/ExecutorController.java | 2 -
.../dolphinscheduler/api/enums/ExecuteType.java | 35 ++-
.../apache/dolphinscheduler/api/enums/Status.java | 2 +-
.../api/executor/ExecuteClient.java | 62 +++++
.../api/executor/ExecuteContext.java | 51 +++++
.../ExecuteFunction.java} | 34 ++-
.../api/executor/ExecuteFunctionBuilder.java | 15 +-
.../api/executor/ExecuteRequest.java | 5 +-
.../api/executor/ExecuteResult.java | 5 +-
.../api/executor/ExecuteRuntimeException.java | 46 ++++
.../recovery/FailureRecoveryExecuteFunction.java | 64 ++++++
.../FailureRecoveryExecuteFunctionBuilder.java | 59 +++++
.../failure/recovery/FailureRecoveryRequest.java | 29 +--
.../failure/recovery/FailureRecoveryResult.java | 13 +-
.../instance/pause/pause/PauseExecuteFunction.java | 80 +++++++
.../pause/pause/PauseExecuteFunctionBuilder.java | 60 +++++
.../instance/pause/pause/PauseExecuteRequest.java | 30 +--
.../instance/pause/pause/PauseExecuteResult.java | 12 +-
.../pause/recover/RecoverExecuteFunction.java | 78 +++++++
.../recover/RecoverExecuteFunctionBuilder.java | 60 +++++
.../pause/recover/RecoverExecuteRequest.java | 29 +--
.../pause/recover/RecoverExecuteResult.java | 12 +-
.../rerun/RepeatRunningExecuteFunction.java | 93 ++++++++
.../rerun/RepeatRunningExecuteFunctionBuilder.java | 59 +++++
.../instance/rerun/RepeatRunningRequest.java | 27 +--
.../instance/rerun/RepeatRunningResult.java | 13 +-
.../instance/stop/StopExecuteFunction.java | 87 +++++++
.../instance/stop/StopExecuteFunctionBuilder.java | 56 +++++
.../workflow/instance/stop/StopRequest.java | 16 +-
.../workflow/instance/stop/StopResult.java | 16 +-
.../dolphinscheduler/api/rpc/ApiRpcClient.java | 29 ++-
.../api/service/ProcessDefinitionService.java | 11 +-
.../api/service/ProcessInstanceService.java | 2 +
.../api/service/ProjectService.java | 5 +-
.../api/service/impl/ExecutorServiceImpl.java | 252 +++++----------------
.../service/impl/ProcessDefinitionServiceImpl.java | 20 ++
.../service/impl/ProcessInstanceServiceImpl.java | 9 +
.../api/service/impl/ProjectServiceImpl.java | 6 +
...est.java => ExecuteFunctionControllerTest.java} | 2 +-
...ceTest.java => ExecuteFunctionServiceTest.java} | 49 ++--
.../dolphinscheduler/dao/entity/Command.java | 21 +-
.../dao/repository/ProcessDefinitionLogDao.java | 4 +
.../dao/repository/ProcessInstanceDao.java | 1 +
.../impl/ProcessDefinitionLogDaoImpl.java | 7 +
.../repository/impl/ProcessInstanceDaoImpl.java | 5 +
.../plugin/task/api/utils/ShellUtils.java | 7 +-
46 files changed, 1217 insertions(+), 363 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 16ec43dbdd..4f474821a7 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -306,8 +306,6 @@ public class ExecutorController extends BaseController {
@Parameter(name = "projectCode", description =
"PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processInstanceId") Integer
processInstanceId,
@RequestParam("executeType") ExecuteType
executeType) {
- log.info("Start to execute process instance, projectCode:{},
processInstanceId:{}.", projectCode,
- processInstanceId);
Map<String, Object> result = execService.execute(loginUser,
projectCode, processInstanceId, executeType);
return returnDataList(result);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
index fea69bf22d..ff3f122a23 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
@@ -30,14 +30,43 @@ public enum ExecuteType {
* 4 stop
* 5 pause
*/
- NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS,
START_FAILURE_TASK_PROCESS, STOP, PAUSE, EXECUTE_TASK;
+ NONE(0, "NONE"),
+
+ // ******************************* Workflow ***************************
+ REPEAT_RUNNING(1, "REPEAT_RUNNING"),
+ RECOVER_SUSPENDED_PROCESS(2, "RECOVER_SUSPENDED_PROCESS"),
+ START_FAILURE_TASK_PROCESS(3, "START_FAILURE_TASK_PROCESS"),
+ STOP(4, "STOP"),
+ PAUSE(5, "PAUSE"),
+ // ******************************* Workflow ***************************
+
+ // ******************************* Task *******************************
+ EXECUTE_TASK(6, "EXECUTE_TASK"),
+ // ******************************* Task *******************************
+ ;
+
+ private final int code;
+ private final String desc;
+
+ ExecuteType(int code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
public static ExecuteType getEnum(int value) {
for (ExecuteType e : ExecuteType.values()) {
- if (e.ordinal() == value) {
+ if (e.getCode() == value) {
return e;
}
}
- return null;
+ return NONE;
}
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 97db8eb78f..168d0f94ae 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -532,7 +532,7 @@ public enum Status {
NO_CURRENT_OPERATING_PERMISSION(1400001, "The current user does not have
this permission.", "当前用户无此权限"),
FUNCTION_DISABLED(1400002, "The current feature is disabled.", "当前功能已被禁用"),
- SCHEDULE_TIME_NUMBER(1400003, "The number of complement dates exceed
100.", "补数日期个数超过100"),
+ SCHEDULE_TIME_NUMBER_EXCEED(1400003, "The number of complement dates
exceed 100.", "补数日期个数超过100"),
DESCRIPTION_TOO_LONG_ERROR(1400004, "description is too long error",
"描述过长"),
DELETE_WORKER_GROUP_BY_ID_FAIL_ENV(1400005,
"delete worker group fail, for there are [{0}] enviroments
using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}");
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java
new file mode 100644
index 0000000000..f8f685b4b3
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * This is the main class for executing workflow/workflowInstance/tasks.
+ * <pre>
+ * ExecuteContext executeContext = ExecuteContext.builder()
+ * .processInstance(processInstance)
+ * .executeType(...)
+ * .build();
+ * executeClient.execute(executeContext);
+ * </pre>
+ */
+@Component
+@SuppressWarnings("unchecked")
+public class ExecuteClient {
+
+ private final Map<ExecuteType, ExecuteFunctionBuilder>
executorFunctionBuilderMap;
+
+ public ExecuteClient(List<ExecuteFunctionBuilder> executeFunctionBuilders)
{
+ executorFunctionBuilderMap = executeFunctionBuilders.stream()
+
.collect(Collectors.toMap(ExecuteFunctionBuilder::getExecuteType,
Function.identity()));
+ }
+
+ public ExecuteResult executeWorkflowInstance(ExecuteContext
executeContext) throws ExecuteRuntimeException {
+ ExecuteFunctionBuilder<ExecuteRequest, ExecuteResult>
executeFunctionBuilder = checkNotNull(
+
executorFunctionBuilderMap.get(executeContext.getExecuteType()),
+ String.format("The executeType: %s is not supported",
executeContext.getExecuteType()));
+
+ return
executeFunctionBuilder.createWorkflowInstanceExecuteFunction(executeContext)
+
.thenCombine(executeFunctionBuilder.createWorkflowInstanceExecuteRequest(executeContext),
+ ExecuteFunction::execute)
+ .join();
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java
new file mode 100644
index 0000000000..84c100b019
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import lombok.Data;
+
+// todo: to be interface
+@Data
+public class ExecuteContext {
+
+ private final ProcessInstance workflowInstance;
+
+ private final ProcessDefinition workflowDefinition;
+
+ private final User executeUser;
+
+ private final ExecuteType executeType;
+
+ public ExecuteContext(ProcessInstance workflowInstance,
+ ProcessDefinition workflowDefinition,
+ User executeUser,
+ ExecuteType executeType) {
+ this.workflowInstance = checkNotNull(workflowInstance,
"workflowInstance cannot be null");
+ this.workflowDefinition = checkNotNull(workflowDefinition,
"workflowDefinition cannot be null");
+ this.executeUser = checkNotNull(executeUser, "executeUser cannot be
null");
+ this.executeType = checkNotNull(executeType, "executeType cannot be
null");
+ }
+
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java
similarity index 58%
copy from
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java
index fea69bf22d..8945c0d678 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java
@@ -15,29 +15,23 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api.enums;
+package org.apache.dolphinscheduler.api.executor;
-/**
- * execute type
- */
-public enum ExecuteType {
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+
+public interface ExecuteFunction<Request extends ExecuteRequest, Result
extends ExecuteResult> {
/**
- * operation type
- * 1 repeat running
- * 2 resume pause
- * 3 resume failure
- * 4 stop
- * 5 pause
+ * Execute the workflow by the given request.
+ *
+ * @param request execute request
+ * @return execute result
+ * @throws ExecuteRuntimeException If there is an exception during
execution, it will be thrown.
*/
- NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS,
START_FAILURE_TASK_PROCESS, STOP, PAUSE, EXECUTE_TASK;
+ Result execute(Request request) throws ExecuteRuntimeException;
- public static ExecuteType getEnum(int value) {
- for (ExecuteType e : ExecuteType.values()) {
- if (e.ordinal() == value) {
- return e;
- }
- }
- return null;
- }
+ /**
+ * @return the type of the executor
+ */
+ ExecuteType getExecuteType();
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java
similarity index 60%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java
index 7ba19d6892..49dc9e9f8d 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java
@@ -15,9 +15,18 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor;
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface ExecuteFunctionBuilder<Request extends ExecuteRequest, Result
extends ExecuteResult> {
+
+ CompletableFuture<ExecuteFunction<Request, Result>>
createWorkflowInstanceExecuteFunction(ExecuteContext executeContext);
+
+ CompletableFuture<Request>
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext);
+
+ ExecuteType getExecuteType();
- void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java
similarity index 83%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java
index 7ba19d6892..d5b02c5eba 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor;
-public interface ProcessDefinitionLogDao {
+public interface ExecuteRequest {
- void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java
similarity index 83%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java
index 7ba19d6892..52ae3c8d5f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor;
-public interface ProcessDefinitionLogDao {
+public interface ExecuteResult {
- void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java
new file mode 100644
index 0000000000..1cdebb0952
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor;
+
+// todo: implement from DolphinSchedulerRuntimeException
+public class ExecuteRuntimeException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String EXECUTE_WORKFLOW_INSTANCE_ERROR =
+ "Execute workflow instance %s failed, execute type is %s";
+
+ public ExecuteRuntimeException(String message) {
+ super(message);
+ }
+
+ public ExecuteRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public static ExecuteRuntimeException
executeWorkflowInstanceError(ExecuteContext executeContext) {
+ return executeWorkflowInstanceError(executeContext, null);
+ }
+
+ public static ExecuteRuntimeException
executeWorkflowInstanceError(ExecuteContext executeContext, Throwable cause) {
+ return new ExecuteRuntimeException(
+ String.format(EXECUTE_WORKFLOW_INSTANCE_ERROR,
executeContext.getWorkflowInstance().getName(),
+ executeContext.getExecuteType()),
+ cause);
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java
new file mode 100644
index 0000000000..546e632b80
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+public class FailureRecoveryExecuteFunction implements
ExecuteFunction<FailureRecoveryRequest, FailureRecoveryResult> {
+
+ private final CommandService commandService;
+
+ public FailureRecoveryExecuteFunction(CommandService commandService) {
+ this.commandService = commandService;
+ }
+
+ @Override
+ public FailureRecoveryResult execute(FailureRecoveryRequest request)
throws ExecuteRuntimeException {
+ ProcessInstance workflowInstance = request.getWorkflowInstance();
+ if (!workflowInstance.getState().isFailure()) {
+ throw new ExecuteRuntimeException(
+ String.format("The workflow instance: %s status is %s, can
not be recovered",
+ workflowInstance.getName(),
workflowInstance.getState()));
+ }
+
+ Command command = Command.builder()
+ .commandType(CommandType.START_FAILURE_TASK_PROCESS)
+
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
+
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
+ .processInstanceId(workflowInstance.getId())
+ .executorId(request.getExecuteUser().getId())
+ .testFlag(workflowInstance.getTestFlag())
+ .build();
+ if (commandService.createCommand(command) <= 0) {
+ throw new ExecuteRuntimeException(
+ "Failure recovery workflow instance failed, due to insert
command to db failed");
+ }
+ return new FailureRecoveryResult(command.getId());
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return FailureRecoveryExecuteFunctionBuilder.EXECUTE_TYPE;
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..1509220bee
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class FailureRecoveryExecuteFunctionBuilder
+ implements
+ ExecuteFunctionBuilder<FailureRecoveryRequest,
FailureRecoveryResult> {
+
+ public static final ExecuteType EXECUTE_TYPE =
ExecuteType.START_FAILURE_TASK_PROCESS;
+
+ @Autowired
+ private CommandService commandService;
+
+ @Override
+ public CompletableFuture<ExecuteFunction<FailureRecoveryRequest,
FailureRecoveryResult>> createWorkflowInstanceExecuteFunction(ExecuteContext
executeContext) {
+ return CompletableFuture.completedFuture(new
FailureRecoveryExecuteFunction(commandService));
+ }
+
+ @Override
+ public CompletableFuture<FailureRecoveryRequest>
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+ return CompletableFuture.completedFuture(
+ new FailureRecoveryRequest(
+ executeContext.getWorkflowInstance(),
+ executeContext.getWorkflowDefinition(),
+ executeContext.getExecuteUser()));
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return EXECUTE_TYPE;
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java
similarity index 60%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java
index f9002c823d..e6e7231540 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java
@@ -15,27 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package
org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-public interface ProcessInstanceDao {
+@Data
+@AllArgsConstructor
+public class FailureRecoveryRequest implements ExecuteRequest {
- public int insertProcessInstance(ProcessInstance processInstance);
-
- public int updateProcessInstance(ProcessInstance processInstance);
-
- /**
- * insert or update work process instance to database
- *
- * @param processInstance processInstance
- */
- public int upsertProcessInstance(ProcessInstance processInstance);
-
- void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
-
- void deleteById(Integer workflowInstanceId);
+ private final ProcessInstance workflowInstance;
+ private final ProcessDefinition workflowDefinition;
+ private final User executeUser;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java
similarity index 71%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java
index 7ba19d6892..c75ed591c7 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java
@@ -15,9 +15,16 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package
org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
- void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class FailureRecoveryResult implements ExecuteResult {
+
+ private final Integer commandId;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
new file mode 100644
index 0000000000..83711660f8
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import
org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+public class PauseExecuteFunction implements
ExecuteFunction<PauseExecuteRequest, PauseExecuteResult> {
+
+ private final ProcessInstanceDao processInstanceDao;
+
+ private final ApiRpcClient apiRpcClient;
+
+ public PauseExecuteFunction(ProcessInstanceDao processInstanceDao,
ApiRpcClient apiRpcClient) {
+ this.processInstanceDao = processInstanceDao;
+ this.apiRpcClient = apiRpcClient;
+ }
+
+ @Override
+ public PauseExecuteResult execute(PauseExecuteRequest request) throws
ExecuteRuntimeException {
+ ProcessInstance workflowInstance = request.getWorkflowInstance();
+ if (!workflowInstance.getState().isRunning()) {
+ throw new ExecuteRuntimeException(
+ String.format("The workflow instance: %s status is %s, can
not pause", workflowInstance.getName(),
+ workflowInstance.getState()));
+ }
+ workflowInstance.setCommandType(CommandType.PAUSE);
+ workflowInstance.addHistoryCmd(CommandType.PAUSE);
+ workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE,
+ CommandType.PAUSE.getDescp() + " by " +
request.getExecuteUser().getUserName());
+
+ if (processInstanceDao.updateProcessInstance(workflowInstance) <= 0) {
+ throw new ExecuteRuntimeException(
+ String.format(
+ "The workflow instance: %s pause failed, due to
update the workflow instance status in DB failed",
+ workflowInstance.getName()));
+ }
+ WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new
WorkflowStateEventChangeCommand(
+ workflowInstance.getId(), 0, workflowInstance.getState(),
workflowInstance.getId(), 0);
+ try {
+ apiRpcClient.send(Host.of(workflowInstance.getHost()),
workflowStateEventChangeCommand.convert2Command());
+ } catch (RemotingException e) {
+ throw new ExecuteRuntimeException(
+ String.format(
+ "The workflow instance: %s pause failed, due to
send rpc request to master: %s failed",
+ workflowInstance.getName(),
workflowInstance.getHost()),
+ e);
+ }
+ return new PauseExecuteResult(workflowInstance);
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return PauseExecuteFunctionBuilder.EXECUTE_TYPE;
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..2e8cf21afd
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class PauseExecuteFunctionBuilder implements
ExecuteFunctionBuilder<PauseExecuteRequest, PauseExecuteResult> {
+
+ public static final ExecuteType EXECUTE_TYPE = ExecuteType.PAUSE;
+
+ @Autowired
+ private ProcessInstanceDao processInstanceDao;
+ @Autowired
+ private ApiRpcClient apiRpcClient;
+
+ @Override
+ public CompletableFuture<ExecuteFunction<PauseExecuteRequest,
PauseExecuteResult>> createWorkflowInstanceExecuteFunction(ExecuteContext
executeContext) {
+ return CompletableFuture.completedFuture(new
PauseExecuteFunction(processInstanceDao, apiRpcClient));
+ }
+
+ @Override
+ public CompletableFuture<PauseExecuteRequest>
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+ return CompletableFuture.completedFuture(
+ new PauseExecuteRequest(
+ executeContext.getWorkflowDefinition(),
+ executeContext.getWorkflowInstance(),
+ executeContext.getExecuteUser()));
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return EXECUTE_TYPE;
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java
similarity index 60%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java
index f9002c823d..02cb471961 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java
@@ -15,27 +15,21 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-public interface ProcessInstanceDao {
-
- public int insertProcessInstance(ProcessInstance processInstance);
-
- public int updateProcessInstance(ProcessInstance processInstance);
-
- /**
- * insert or update work process instance to database
- *
- * @param processInstance processInstance
- */
- public int upsertProcessInstance(ProcessInstance processInstance);
-
- void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
-
- void deleteById(Integer workflowInstanceId);
+@Data
+@AllArgsConstructor
+public class PauseExecuteRequest implements ExecuteRequest {
+ private final ProcessDefinition processDefinition;
+ private final ProcessInstance workflowInstance;
+ private final User executeUser;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java
similarity index 68%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java
index 7ba19d6892..38bbc03c07 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java
@@ -15,9 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
- void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+
+@AllArgsConstructor
+public class PauseExecuteResult implements ExecuteResult {
+
+ private final ProcessInstance processInstance;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java
new file mode 100644
index 0000000000..149e1abd29
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
+
+import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+public class RecoverExecuteFunction implements
ExecuteFunction<RecoverExecuteRequest, RecoverExecuteResult> {
+
+ private final CommandService commandService;
+
+ public RecoverExecuteFunction(CommandService commandService) {
+ this.commandService = commandService;
+ }
+
+ @Override
+ public RecoverExecuteResult execute(RecoverExecuteRequest request) throws
ExecuteRuntimeException {
+ ProcessInstance workflowInstance = request.getWorkflowInstance();
+ if (!workflowInstance.getState().isPause()) {
+ throw new ExecuteRuntimeException(
+ String.format("The workflow instance: %s state is %s,
cannot recovery", workflowInstance.getName(),
+ workflowInstance.getState()));
+ }
+ Command command = Command.builder()
+ .commandType(CommandType.RECOVER_SUSPENDED_PROCESS)
+
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
+
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
+ .processInstanceId(workflowInstance.getId())
+
.commandParam(JSONUtils.toJsonString(createCommandParam(workflowInstance)))
+ .executorId(request.getExecuteUser().getId())
+ .testFlag(workflowInstance.getTestFlag())
+ .build();
+ if (commandService.createCommand(command) <= 0) {
+ throw new ExecuteRuntimeException(
+ String.format("Recovery workflow instance: %s failed, due
to insert command to db failed",
+ workflowInstance.getName()));
+ }
+ return new RecoverExecuteResult(command);
+ }
+
+ private Map<String, Object> createCommandParam(ProcessInstance
workflowInstance) {
+ return new ImmutableMap.Builder<String, Object>()
+ .put(CMD_PARAM_RECOVER_PROCESS_ID_STRING,
workflowInstance.getId())
+ .build();
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return RecoverExecuteFunctionBuilder.EXECUTE_TYPE;
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..cbd88a0d2d
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class RecoverExecuteFunctionBuilder
+ implements
+ ExecuteFunctionBuilder<RecoverExecuteRequest,
RecoverExecuteResult> {
+
+ public static final ExecuteType EXECUTE_TYPE =
ExecuteType.RECOVER_SUSPENDED_PROCESS;
+
+ @Autowired
+ private CommandService commandService;
+
+ @Override
+ public CompletableFuture<ExecuteFunction<RecoverExecuteRequest,
RecoverExecuteResult>> createWorkflowInstanceExecuteFunction(ExecuteContext
executeContext) {
+ return CompletableFuture.completedFuture(
+ new RecoverExecuteFunction(commandService));
+ }
+
+ @Override
+ public CompletableFuture<RecoverExecuteRequest>
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+ return CompletableFuture.completedFuture(
+ new RecoverExecuteRequest(
+ executeContext.getWorkflowInstance(),
+ executeContext.getWorkflowDefinition(),
+ executeContext.getExecuteUser()));
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return EXECUTE_TYPE;
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java
similarity index 60%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java
index f9002c823d..8f8b8f4e84 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java
@@ -15,27 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package
org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-public interface ProcessInstanceDao {
+@Data
+@AllArgsConstructor
+public class RecoverExecuteRequest implements ExecuteRequest {
- public int insertProcessInstance(ProcessInstance processInstance);
-
- public int updateProcessInstance(ProcessInstance processInstance);
-
- /**
- * insert or update work process instance to database
- *
- * @param processInstance processInstance
- */
- public int upsertProcessInstance(ProcessInstance processInstance);
-
- void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
-
- void deleteById(Integer workflowInstanceId);
+ private final ProcessInstance workflowInstance;
+ private final ProcessDefinition processDefinition;
+ private final User executeUser;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java
similarity index 69%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java
index 7ba19d6892..882174cddd 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java
@@ -15,9 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package
org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
+import org.apache.dolphinscheduler.dao.entity.Command;
- void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+
+@AllArgsConstructor
+public class RecoverExecuteResult implements ExecuteResult {
+
+ private final Command command;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java
new file mode 100644
index 0000000000..82c59b907f
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+public class RepeatRunningExecuteFunction implements
ExecuteFunction<RepeatRunningRequest, RepeatRunningResult> {
+
+ private final CommandService commandService;
+
+ public RepeatRunningExecuteFunction(CommandService commandService) {
+ this.commandService = commandService;
+ }
+
+ @Override
+ public RepeatRunningResult execute(RepeatRunningRequest request) throws
ExecuteRuntimeException {
+ checkNotNull(request, "request cannot be null");
+ // todo: check workflow definition valid? or we don't need to do this
check, since we will check in master
+ // again.
+ // todo: check tenant valid? or we don't need to do this check, since
we need to check in master again.
+ ProcessInstance workflowInstance = request.getWorkflowInstance();
+ if (workflowInstance.getState() == null ||
!workflowInstance.getState().isFinished()) {
+ throw new ExecuteRuntimeException(
+ String.format("The workflow instance: %s status is %s,
cannot repeat running",
+ workflowInstance.getName(),
workflowInstance.getState()));
+ }
+ Command command = Command.builder()
+ .commandType(CommandType.REPEAT_RUNNING)
+
.commandParam(JSONUtils.toJsonString(createCommandParams(workflowInstance)))
+
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
+
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
+ .processInstanceId(workflowInstance.getId())
+
.processInstancePriority(workflowInstance.getProcessInstancePriority())
+ .testFlag(workflowInstance.getTestFlag())
+ .build();
+ if (commandService.createCommand(command) <= 0) {
+ throw new ExecuteRuntimeException(
+ String.format("Repeat running workflow instance: %s
failed, due to insert command to db failed",
+ workflowInstance.getName()));
+ }
+ return new RepeatRunningResult(command.getId());
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return RepeatRunningExecuteFunctionBuilder.EXECUTE_TYPE;
+ }
+
+ private Map<String, Object> createCommandParams(ProcessInstance
workflowInstance) {
+ Map<String, Object> commandMap =
+ JSONUtils.parseObject(workflowInstance.getCommandParam(), new
TypeReference<Map<String, Object>>() {
+ });
+ Map<String, Object> repeatRunningCommandParams = new HashMap<>();
+ Optional.ofNullable(MapUtils.getObject(commandMap,
CMD_PARAM_START_PARAMS))
+ .ifPresent(startParams ->
repeatRunningCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
+ repeatRunningCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING,
workflowInstance.getId());
+ return repeatRunningCommandParams;
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..f5363f90da
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.service.command.CommandService;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class RepeatRunningExecuteFunctionBuilder
+ implements
+ ExecuteFunctionBuilder<RepeatRunningRequest, RepeatRunningResult> {
+
+ public static final ExecuteType EXECUTE_TYPE = ExecuteType.REPEAT_RUNNING;
+
+ @Autowired
+ private CommandService commandService;
+
+ @Override
+ public CompletableFuture<ExecuteFunction<RepeatRunningRequest,
RepeatRunningResult>> createWorkflowInstanceExecuteFunction(ExecuteContext
executeContext) {
+ return CompletableFuture.completedFuture(new
RepeatRunningExecuteFunction(commandService));
+ }
+
+ @Override
+ public CompletableFuture<RepeatRunningRequest>
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+ return CompletableFuture.completedFuture(
+ new RepeatRunningRequest(
+ executeContext.getWorkflowInstance(),
+ executeContext.getWorkflowDefinition(),
+ executeContext.getExecuteUser()));
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return EXECUTE_TYPE;
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java
similarity index 60%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java
index f9002c823d..c0631190c2 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java
@@ -15,27 +15,24 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
-import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-public interface ProcessInstanceDao {
+@Data
+@AllArgsConstructor
+public class RepeatRunningRequest implements ExecuteRequest {
- public int insertProcessInstance(ProcessInstance processInstance);
+ private final ProcessInstance workflowInstance;
- public int updateProcessInstance(ProcessInstance processInstance);
+ private final ProcessDefinition processDefinition;
- /**
- * insert or update work process instance to database
- *
- * @param processInstance processInstance
- */
- public int upsertProcessInstance(ProcessInstance processInstance);
-
- void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
-
- void deleteById(Integer workflowInstanceId);
+ private final User executeUser;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java
similarity index 71%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java
index 7ba19d6892..9fa7ed0004 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java
@@ -15,9 +15,16 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
- void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class RepeatRunningResult implements ExecuteResult {
+
+ private final Integer commandId;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
new file mode 100644
index 0000000000..2e37423e9a
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
+import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import
org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class StopExecuteFunction implements ExecuteFunction<StopRequest,
StopResult> {
+
+ private final ProcessInstanceDao processInstanceDao;
+ // todo: Use ApiRpcClient instead of NettyRemotingClient
+ private final ApiRpcClient apiRpcClient;
+
+ public StopExecuteFunction(ProcessInstanceDao processInstanceDao,
ApiRpcClient apiRpcClient) {
+ this.processInstanceDao = processInstanceDao;
+ this.apiRpcClient = apiRpcClient;
+ }
+
+ @Override
+ public StopResult execute(StopRequest request) throws
ExecuteRuntimeException {
+ ProcessInstance workflowInstance = request.getWorkflowInstance();
+
+ if (!workflowInstance.getState().canStop()
+ || workflowInstance.getState() ==
WorkflowExecutionStatus.READY_STOP) {
+ throw new ExecuteRuntimeException(
+ String.format("The workflow instance: %s status is %s, can
not be stopped",
+ workflowInstance.getName(),
workflowInstance.getState()));
+ }
+ // update the workflow instance's status to stop
+ workflowInstance.setCommandType(CommandType.STOP);
+ workflowInstance.addHistoryCmd(CommandType.STOP);
+ workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP,
CommandType.STOP.getDescp() + " by user");
+ if (processInstanceDao.updateProcessInstance(workflowInstance) > 0) {
+ log.info("Workflow instance {} ready to stop success, will call
master to stop the workflow instance",
+ workflowInstance.getName());
+ // todo: Use specific stop command instead of
WorkflowStateEventChangeCommand
+ WorkflowStateEventChangeCommand workflowStateEventChangeCommand =
new WorkflowStateEventChangeCommand(
+ workflowInstance.getId(), 0, workflowInstance.getState(),
workflowInstance.getId(), 0);
+ try {
+ apiRpcClient.send(Host.of(workflowInstance.getHost()),
+ workflowStateEventChangeCommand.convert2Command());
+ } catch (RemotingException e) {
+ throw new ExecuteRuntimeException(
+ String.format("Workflow instance: %s stop failed, due
to send request to master: %s failed",
+ workflowInstance.getName(),
workflowInstance.getHost()),
+ e);
+ }
+ // todo: use async and inject the completeFuture in the result.
+ return new StopResult(workflowInstance);
+ }
+ throw new ExecuteRuntimeException(
+ "Workflow instance stop failed, due to update the workflow
instance status failed");
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return StopExecuteFunctionBuilder.EXECUTE_TYPE;
+ }
+
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java
new file mode 100644
index 0000000000..2fc06493f0
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
+
+import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
+import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
+import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class StopExecuteFunctionBuilder implements
ExecuteFunctionBuilder<StopRequest, StopResult> {
+
+ public static final ExecuteType EXECUTE_TYPE = ExecuteType.STOP;
+
+ @Autowired
+ private ProcessInstanceDao processInstanceDao;
+ @Autowired
+ private ApiRpcClient apiRpcClient;
+
+ @Override
+ public CompletableFuture<ExecuteFunction<StopRequest, StopResult>>
createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) {
+ return CompletableFuture.completedFuture(new
StopExecuteFunction(processInstanceDao, apiRpcClient));
+ }
+
+ @Override
+ public CompletableFuture<StopRequest>
createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
+ return CompletableFuture.completedFuture(new
StopRequest(executeContext.getWorkflowInstance()));
+ }
+
+ @Override
+ public ExecuteType getExecuteType() {
+ return EXECUTE_TYPE;
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java
similarity index 65%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java
index 7ba19d6892..55a4b3a108 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java
@@ -15,9 +15,19 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
- void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NonNull;
+
+@Data
+@AllArgsConstructor
+public class StopRequest implements ExecuteRequest {
+
+ @NonNull
+ private final ProcessInstance workflowInstance;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java
similarity index 65%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java
index 7ba19d6892..cf7ddb237e 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java
@@ -15,9 +15,19 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository;
+package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
-public interface ProcessDefinitionLogDao {
+import org.apache.dolphinscheduler.api.executor.ExecuteResult;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
- void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NonNull;
+
+@Data
+@AllArgsConstructor
+public class StopResult implements ExecuteResult {
+
+ @NonNull
+ private final ProcessInstance workflowInstance;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java
similarity index 51%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
copy to
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java
index 47a1747c4f..17f24dc67e 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java
@@ -15,22 +15,27 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.repository.impl;
+package org.apache.dolphinscheduler.api.rpc;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
-import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
+import org.springframework.stereotype.Component;
-@Repository
-public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao {
+@Component
+public class ApiRpcClient {
- @Autowired
- private ProcessDefinitionLogMapper processDefinitionLogMapper;
+ private final NettyRemotingClient nettyRemotingClient;
- @Override
- public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
-
processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode);
+ public ApiRpcClient() {
+ this.nettyRemotingClient = new NettyRemotingClient(new
NettyClientConfig());
}
+
+ public void send(Host host, Command command) throws RemotingException {
+ nettyRemotingClient.send(host, command);
+ }
+
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 0fc682d403..12cc8a1829 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import javax.servlet.http.HttpServletResponse;
@@ -146,18 +147,22 @@ public interface ProcessDefinitionService {
* Get resource workflow
*
* @param loginUser login user
- * @param code process definition code
+ * @param code process definition code
* @return Process definition Object
*/
ProcessDefinition getProcessDefinition(User loginUser,
long code);
+ Optional<ProcessDefinition> queryWorkflowDefinition(long
workflowDefinitionCode, int workflowDefinitionVersion);
+ ProcessDefinition queryWorkflowDefinitionThrowExceptionIfNotFound(long
workflowDefinitionCode,
+ int
workflowDefinitionVersion);
+
/**
* query detail of process definition
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param name process definition name
+ * @param name process definition name
* @return process definition detail
*/
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index eb760722fa..f866fc9313 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -57,6 +57,8 @@ public interface ProcessInstanceService {
long projectCode,
Integer processId);
+ ProcessInstance queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer
processId);
+
/**
* query process instance by id
*
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
index 54a66c79ec..61e8540eb8 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -66,7 +67,9 @@ public interface ProjectService {
*/
Map<String, Object> checkProjectAndAuth(User loginUser, Project project,
long projectCode, String perm);
- void checkProjectAndAuthThrowException(User loginUser, Project project,
String permission);
+ void checkProjectAndAuthThrowException(User loginUser, Project project,
String permission) throws ServiceException;
+
+ void checkProjectAndAuthThrowException(User loginUser, long projectCode,
String permission) throws ServiceException;
boolean hasProjectAndPerm(User loginUser, Project project, Map<String,
Object> result, String perm);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 533a254eea..3f2a5f87df 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.service.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
@@ -34,8 +36,11 @@ import
org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.executor.ExecuteClient;
+import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.common.constants.Constants;
@@ -91,7 +96,6 @@ import
org.apache.dolphinscheduler.service.process.TriggerRelationService;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.time.ZonedDateTime;
@@ -101,6 +105,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -111,7 +116,6 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
/**
@@ -144,10 +148,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
private ProcessService processService;
@Autowired
- private CommandService commandService;
+ private ProcessInstanceDao processInstanceDao;
@Autowired
- private ProcessInstanceDao processInstanceDao;
+ private ProcessDefinitionService processDefinitionService;
+
+ @Autowired
+ private CommandService commandService;
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@@ -169,6 +176,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
@Autowired
private TriggerRelationService triggerRelationService;
+
+ @Autowired
+ private ExecuteClient executeClient;
+
/**
* execute process instance
*
@@ -236,15 +247,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
return result;
}
- if (!checkScheduleTimeNum(commandType, cronTime)) {
- putMsg(result, Status.SCHEDULE_TIME_NUMBER);
- return result;
- }
-
- // check master exists
- if (!checkMasterExists(result)) {
- return result;
- }
+ checkScheduleTimeNumExceed(commandType, cronTime);
+ checkMasterExists();
long triggerCode = CodeGenerateUtils.getInstance().genCode();
@@ -275,46 +279,31 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
return result;
}
- /**
- * check whether master exists
- *
- * @param result result
- * @return master exists return true , otherwise return false
- */
- private boolean checkMasterExists(Map<String, Object> result) {
+ private void checkMasterExists() {
// check master server exists
List<Server> masterServers =
monitorService.getServerListFromRegistry(true);
// no master
if (masterServers.isEmpty()) {
- log.error("Master does not exist.");
- putMsg(result, Status.MASTER_NOT_EXISTS);
- return false;
+ throw new ServiceException(Status.MASTER_NOT_EXISTS);
}
- return true;
}
- /**
- * @param complementData
- * @param cronTime
- * @return CommandType is COMPLEMENT_DATA and cronTime's number is not
greater than 100 return true , otherwise return false
- */
- private boolean checkScheduleTimeNum(CommandType complementData, String
cronTime) {
+ private void checkScheduleTimeNumExceed(CommandType complementData, String
cronTime) {
if (!CommandType.COMPLEMENT_DATA.equals(complementData)) {
- return true;
+ return;
}
if (cronTime == null) {
- return true;
+ return;
}
Map<String, String> cronMap = JSONUtils.toMap(cronTime);
if (cronMap.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST))
{
String[] stringDates =
cronMap.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA);
if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) {
log.warn("Parameter cornTime is bigger than {}.",
SCHEDULE_TIME_MAX_LENGTH);
- return false;
+ throw new ServiceException(Status.SCHEDULE_TIME_NUMBER_EXCEED);
}
}
- return true;
}
/**
@@ -322,19 +311,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
*
* @param projectCode project code
* @param processDefinition process definition
- * @param processDefineCode process definition code
- * @param version process instance version
*/
@Override
public void checkProcessDefinitionValid(long projectCode,
ProcessDefinition processDefinition,
long processDefineCode, Integer
version) {
// check process definition exists
- if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
- throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(processDefineCode));
+ if (projectCode != processDefinition.getProjectCode()) {
+ throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
processDefinition.getCode());
}
// check process definition online
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
- throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE,
String.valueOf(processDefineCode), version);
+ throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE,
processDefinition.getCode(),
+ processDefinition.getVersion());
}
// check sub process definition online
if (!checkSubProcessDefinitionValid(processDefinition)) {
@@ -381,8 +369,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
/**
* do action to process instance:pause, stop, repeat, recover from pause,
recover from stop,rerun failed task
-
-
*
* @param loginUser login user
* @param projectCode project code
@@ -391,103 +377,35 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
* @return execute result code
*/
@Override
- public Map<String, Object> execute(User loginUser, long projectCode,
Integer processInstanceId,
+ public Map<String, Object> execute(User loginUser,
+ long projectCode,
+ Integer processInstanceId,
ExecuteType executeType) {
- Project project = projectMapper.queryByCode(projectCode);
- // check user access for project
+ checkNotNull(processInstanceId, "workflowInstanceId cannot be null");
+ checkNotNull(executeType, "executeType cannot be null");
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
+ // check user access for project
+ projectService.checkProjectAndAuthThrowException(loginUser,
projectCode,
ApiFuncIdentificationConstant.map.get(executeType));
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
- }
+ checkMasterExists();
- // check master exists
- if (!checkMasterExists(result)) {
- return result;
- }
+ ProcessInstance workflowInstance =
+
Optional.ofNullable(processInstanceDao.queryByWorkflowInstanceId(processInstanceId))
+ .orElseThrow(() -> new
ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
- ProcessInstance processInstance =
processService.findProcessInstanceDetailById(processInstanceId)
- .orElseThrow(() -> new
ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
+ checkState(workflowInstance.getProjectCode() == projectCode,
+ "The workflow instance's project code doesn't equals to the
given project");
+ ProcessDefinition processDefinition =
processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(
+ workflowInstance.getProcessDefinitionCode(),
workflowInstance.getProcessDefinitionVersion());
- ProcessDefinition processDefinition =
-
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
- processDefinition.setReleaseState(ReleaseState.ONLINE);
- if (executeType != ExecuteType.STOP && executeType !=
ExecuteType.PAUSE) {
- this.checkProcessDefinitionValid(projectCode, processDefinition,
processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
- }
+ executeClient.executeWorkflowInstance(new ExecuteContext(
+ workflowInstance,
+ processDefinition,
+ loginUser,
+ executeType));
- result = checkExecuteType(processInstance, executeType);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
- }
- if (!checkTenantSuitable(processDefinition)) {
- log.error(
- "There is not any valid tenant for the process definition,
processDefinitionId:{}, processDefinitionCode:{}, ",
- processDefinition.getId(), processDefinition.getName());
- putMsg(result, Status.TENANT_NOT_SUITABLE);
- }
-
- // get the startParams user specified at the first starting while
repeat running is needed
- Map<String, Object> commandMap =
- JSONUtils.parseObject(processInstance.getCommandParam(), new
TypeReference<Map<String, Object>>() {
- });
- String startParams = null;
- if (MapUtils.isNotEmpty(commandMap) && executeType ==
ExecuteType.REPEAT_RUNNING) {
- Object startParamsJson = commandMap.get(CMD_PARAM_START_PARAMS);
- if (startParamsJson != null) {
- startParams = startParamsJson.toString();
- }
- }
-
- switch (executeType) {
- case REPEAT_RUNNING:
- result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(),
- processDefinition.getVersion(),
CommandType.REPEAT_RUNNING, startParams,
- processInstance.getTestFlag());
- break;
- case RECOVER_SUSPENDED_PROCESS:
- result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(),
- processDefinition.getVersion(),
CommandType.RECOVER_SUSPENDED_PROCESS, startParams,
- processInstance.getTestFlag());
- break;
- case START_FAILURE_TASK_PROCESS:
- result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(),
- processDefinition.getVersion(),
CommandType.START_FAILURE_TASK_PROCESS, startParams,
- processInstance.getTestFlag());
- break;
- case STOP:
- if (processInstance.getState() ==
WorkflowExecutionStatus.READY_STOP) {
- log.warn("Process instance status is already {},
processInstanceName:{}.",
- WorkflowExecutionStatus.READY_STOP.getDesc(),
processInstance.getName());
- putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED,
processInstance.getName(),
- processInstance.getState());
- } else {
- result =
- updateProcessInstancePrepare(processInstance,
CommandType.STOP,
- WorkflowExecutionStatus.READY_STOP);
- }
- break;
- case PAUSE:
- if (processInstance.getState() ==
WorkflowExecutionStatus.READY_PAUSE) {
- log.warn("Process instance status is already {},
processInstanceName:{}.",
- WorkflowExecutionStatus.READY_STOP.getDesc(),
processInstance.getName());
- putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED,
processInstance.getName(),
- processInstance.getState());
- } else {
- result = updateProcessInstancePrepare(processInstance,
CommandType.PAUSE,
- WorkflowExecutionStatus.READY_PAUSE);
- }
- break;
- default:
- log.warn("Unknown execute type for process instance,
processInstanceId:{}.",
- processInstance.getId());
- putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown
execute type");
-
- break;
- }
+ Map<String, Object> result = new HashMap<>();
+ result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
@@ -504,10 +422,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
@Override
public Map<String, Object> execute(User loginUser, Integer
workflowInstanceId, ExecuteType executeType) {
ProcessInstance processInstance =
processInstanceMapper.selectById(workflowInstanceId);
- ProcessDefinition processDefinition =
-
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
-
- return execute(loginUser, processDefinition.getProjectCode(),
workflowInstanceId, executeType);
+ return execute(loginUser, processInstance.getProjectCode(),
workflowInstanceId, executeType);
}
/**
@@ -633,10 +548,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
return result;
}
- // check master exists
- if (!checkMasterExists(result)) {
- return result;
- }
+ checkMasterExists();
return forceStart(processInstance, taskGroupQueue);
}
@@ -773,62 +685,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
return result;
}
- /**
- * insert command, used in the implementation of the page, rerun, recovery
(pause / failure) execution
- *
- * @param loginUser login user
- * @param instanceId instance id
- * @param processDefinitionCode process definition code
- * @param processVersion
- * @param commandType command type
- * @return insert result code
- */
- private Map<String, Object> insertCommand(User loginUser, Integer
instanceId, long processDefinitionCode,
- int processVersion, CommandType
commandType, String startParams,
- int testFlag) {
- Map<String, Object> result = new HashMap<>();
-
- // To add startParams only when repeat running is needed
- Map<String, Object> cmdParam = new HashMap<>();
- cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId);
- if (!StringUtils.isEmpty(startParams)) {
- cmdParam.put(CMD_PARAM_START_PARAMS, startParams);
- }
-
- Command command = new Command();
- command.setCommandType(commandType);
- command.setProcessDefinitionCode(processDefinitionCode);
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- command.setExecutorId(loginUser.getId());
- command.setProcessDefinitionVersion(processVersion);
- command.setProcessInstanceId(instanceId);
- command.setTestFlag(testFlag);
- if (!commandService.verifyIsNeedCreateCommand(command)) {
- log.warn(
- "Process instance is executing the command,
processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
- processDefinitionCode, processVersion, instanceId);
- putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND,
String.valueOf(processDefinitionCode));
- return result;
- }
-
- log.info("Creating command, commandInfo:{}.", command);
- int create = commandService.createCommand(command);
-
- if (create > 0) {
- log.info("Create {} command complete, processDefinitionCode:{},
processDefinitionVersion:{}.",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode(), processVersion);
- putMsg(result, Status.SUCCESS);
- } else {
- log.error(
- "Execute process instance failed because create {} command
error, processDefinitionCode:{}, processDefinitionVersion:{},
processInstanceId:{}.",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode(), processVersion,
- instanceId);
- putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
- }
-
- return result;
- }
-
/**
* check whether sub processes are offline before starting process
definition
*
@@ -1319,11 +1175,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
return result;
}
- // check master exists
- if (!checkMasterExists(result)) {
- return result;
- }
-
+ checkMasterExists();
// todo dispatch improvement
List<Server> masterServerList =
monitorService.getServerListFromRegistry(true);
Host host = new Host(masterServerList.get(0).getHost(),
masterServerList.get(0).getPort());
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index c7e36efb3a..b8def57492 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -236,6 +236,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
+ @Lazy
@Autowired
private SchedulerService schedulerService;
@@ -724,6 +725,25 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return processDefinition;
}
+ @Override
+ public Optional<ProcessDefinition> queryWorkflowDefinition(long
workflowDefinitionCode,
+ int
workflowDefinitionVersion) {
+ ProcessDefinition workflowDefinition =
processDefinitionDao.queryByCode(workflowDefinitionCode).orElse(null);
+ if (workflowDefinition == null || workflowDefinition.getVersion() !=
workflowDefinitionVersion) {
+ workflowDefinition =
processDefinitionLogDao.queryProcessDefinitionLog(workflowDefinitionCode,
+ workflowDefinitionVersion);
+ }
+ return Optional.ofNullable(workflowDefinition);
+ }
+
+ @Override
+ public ProcessDefinition
queryWorkflowDefinitionThrowExceptionIfNotFound(long workflowDefinitionCode,
+
int workflowDefinitionVersion) {
+ return queryWorkflowDefinition(workflowDefinitionCode,
workflowDefinitionVersion)
+ .orElseThrow(() -> new
ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
+ String.valueOf(workflowDefinitionCode)));
+ }
+
@Override
public Map<String, Object> queryProcessDefinitionByName(User loginUser,
long projectCode, String name) {
Project project = projectMapper.queryByCode(projectCode);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 9d151e528f..2f7417a4cd 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -278,6 +278,15 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
return result;
}
+ @Override
+ public ProcessInstance
queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer workflowInstanceId) {
+ ProcessInstance processInstance =
processInstanceDao.queryByWorkflowInstanceId(workflowInstanceId);
+ if (processInstance == null) {
+ throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST,
workflowInstanceId);
+ }
+ return processInstance;
+ }
+
/**
* query workflow instance by id
*
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index bd1349f4fa..35aeacf798 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -242,6 +242,12 @@ public class ProjectServiceImpl extends BaseServiceImpl
implements ProjectServic
}
}
+ @Override
+ public void checkProjectAndAuthThrowException(User loginUser, long
projectCode, String permission) {
+ Project project = projectMapper.queryByCode(projectCode);
+ checkProjectAndAuthThrowException(loginUser, project, permission);
+ }
+
@Override
public boolean hasProjectAndPerm(User loginUser, Project project,
Map<String, Object> result, String permission) {
boolean checkResult = false;
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
similarity index 99%
rename from
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
rename to
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
index 68ec7a5c0d..2f5cfb960a 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
@@ -55,7 +55,7 @@ import com.google.gson.JsonObject;
/**
* executor controller test
*/
-public class ExecutorControllerTest extends AbstractControllerTest {
+public class ExecuteFunctionControllerTest extends AbstractControllerTest {
final Gson gson = new Gson();
final long projectCode = 1L;
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
similarity index 94%
rename from
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
rename to
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
index fe19b5b402..21630b0e1d 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
@@ -31,8 +31,8 @@ import
org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.executor.ExecuteClient;
import
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
-import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.constants.Constants;
@@ -64,6 +64,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.TriggerRelationService;
@@ -96,11 +97,9 @@ import org.slf4j.LoggerFactory;
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
-public class ExecutorServiceTest {
+public class ExecuteFunctionServiceTest {
- private static final Logger logger =
LoggerFactory.getLogger(ExecutorServiceTest.class);
-
- private static final Logger baseServiceLogger =
LoggerFactory.getLogger(BaseServiceImpl.class);
+ private static final Logger logger =
LoggerFactory.getLogger(ExecuteFunctionServiceTest.class);
@Mock
private ResourcePermissionCheckService resourcePermissionCheckService;
@@ -147,6 +146,15 @@ public class ExecutorServiceTest {
@Mock
private TriggerRelationService triggerRelationService;
+ @Mock
+ private ExecuteClient executeClient;
+
+ @Mock
+ private ProcessInstanceDao processInstanceDao;
+
+ @Mock
+ private ProcessDefinitionService processDefinitionService;
+
private int processDefinitionId = 1;
private int processDefinitionVersion = 1;
@@ -195,6 +203,7 @@ public class ExecutorServiceTest {
// processInstance
processInstance.setId(processInstanceId);
+ processInstance.setProjectCode(projectCode);
processInstance.setState(WorkflowExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);
@@ -453,25 +462,39 @@ public class ExecutorServiceTest {
public void testNoMasterServers() {
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new
ArrayList<>());
- Map<String, Object> result =
executorService.execProcessInstance(loginUser, projectCode,
+ Assertions.assertThrows(ServiceException.class, () ->
executorService.execProcessInstance(
+ loginUser,
+ projectCode,
processDefinitionCode,
"{\"complementStartDate\":\"2020-01-01
00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
CommandType.COMPLEMENT_DATA,
- null, null,
- null, null, null,
+ null,
+ null,
+ null,
+ null,
+ null,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO,
+ Priority.LOW,
+ Constants.DEFAULT_WORKER_GROUP,
+ 100L,
+ 110,
+ null,
+ 0,
+ Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
- ComplementDependentMode.OFF_MODE, null);
- Assertions.assertEquals(result.get(Constants.STATUS),
Status.MASTER_NOT_EXISTS);
+ ComplementDependentMode.OFF_MODE,
+ null));
}
@Test
public void testExecuteRepeatRunning() {
-
Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
- Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode, RERUN))
+
when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
+ when(projectService.checkProjectAndAuth(loginUser, project,
projectCode, RERUN))
.thenReturn(checkProjectAndAuth());
+
when(processInstanceDao.queryByWorkflowInstanceId(processInstanceId)).thenReturn(processInstance);
+
when(processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(processDefinitionCode,
+ processDefinitionVersion)).thenReturn(processDefinition);
Map<String, Object> result =
executorService.execute(loginUser, projectCode,
processInstanceId, ExecuteType.REPEAT_RUNNING);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index 290be4377c..12d0a69fb7 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -25,7 +25,10 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import java.util.Date;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
@@ -33,6 +36,9 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
@TableName("t_ds_command")
public class Command {
@@ -55,10 +61,10 @@ public class Command {
private String commandParam;
@TableField("task_depend_type")
- private TaskDependType taskDependType;
+ private TaskDependType taskDependType = TaskDependType.TASK_POST;
@TableField("failure_strategy")
- private FailureStrategy failureStrategy;
+ private FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
@TableField("warning_type")
private WarningType warningType;
@@ -70,13 +76,13 @@ public class Command {
private Date scheduleTime;
@TableField("start_time")
- private Date startTime;
+ private Date startTime = new Date();
@TableField("process_instance_priority")
private Priority processInstancePriority;
@TableField("update_time")
- private Date updateTime;
+ private Date updateTime = new Date();
@TableField("worker_group")
private String workerGroup;
@@ -99,13 +105,6 @@ public class Command {
@TableField("test_flag")
private int testFlag;
- public Command() {
- this.taskDependType = TaskDependType.TASK_POST;
- this.failureStrategy = FailureStrategy.CONTINUE;
- this.startTime = new Date();
- this.updateTime = new Date();
- }
-
public Command(
CommandType commandType,
TaskDependType taskDependType,
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
index 7ba19d6892..4f32d142cd 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
@@ -17,7 +17,11 @@
package org.apache.dolphinscheduler.dao.repository;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
public interface ProcessDefinitionLogDao {
+ ProcessDefinitionLog queryProcessDefinitionLog(long
workflowDefinitionCode, int workflowDefinitionVersion);
+
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index f9002c823d..b249e265e7 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -38,4 +38,5 @@ public interface ProcessInstanceDao {
void deleteById(Integer workflowInstanceId);
+ ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
index 47a1747c4f..0f757193fb 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.repository.impl;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
@@ -29,6 +30,12 @@ public class ProcessDefinitionLogDaoImpl implements
ProcessDefinitionLogDao {
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
+ @Override
+ public ProcessDefinitionLog queryProcessDefinitionLog(long
workflowDefinitionCode, int workflowDefinitionVersion) {
+ return
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(workflowDefinitionCode,
+ workflowDefinitionVersion);
+ }
+
@Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index 2832e0a3f4..ae36ca924e 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -69,4 +69,9 @@ public class ProcessInstanceDaoImpl implements
ProcessInstanceDao {
public void deleteById(Integer workflowInstanceId) {
processInstanceMapper.deleteById(workflowInstanceId);
}
+
+ @Override
+ public ProcessInstance queryByWorkflowInstanceId(Integer
workflowInstanceId) {
+ return processInstanceMapper.selectById(workflowInstanceId);
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java
index 7988bd0459..595823f7b9 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -30,8 +32,9 @@ import lombok.experimental.UtilityClass;
public class ShellUtils {
public List<String> ENV_SOURCE_LIST = Arrays.stream(
-
Optional.ofNullable(PropertyUtils.getString("shell.env_source_list"))
- .map(s -> s.split(",")).orElse(new String[0]))
+
Optional.ofNullable(PropertyUtils.getString("shell.env_source_list")).map(s ->
s.split(","))
+ .orElse(new String[0]))
.map(String::trim)
+ .filter(StringUtils::isNotBlank)
.collect(Collectors.toList());
}