This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 9d89eb0 TaskExecutionContext create modify (#1994)
9d89eb0 is described below
commit 9d89eb03cdce259aef80dcac40da55a14876ad62
Author: qiaozhanwei <[email protected]>
AuthorDate: Fri Feb 21 22:34:40 2020 +0800
TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
---
.../builder/TaskExecutionContextBuilder.java | 91 ++++++++++++++++++++++
.../master/runner/MasterBaseTaskExecThread.java | 57 +++++---------
2 files changed, 110 insertions(+), 38 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
new file mode 100644
index 0000000..cafd894
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.builder;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+
+/**
+ * TaskExecutionContext builder
+ */
+public class TaskExecutionContextBuilder {
+
+ public static TaskExecutionContextBuilder get(){
+ return new TaskExecutionContextBuilder();
+ }
+
+ private TaskExecutionContext taskExecutionContext = new
TaskExecutionContext();
+
+ /**
+ * build taskInstance related info
+ *
+ * @param taskInstance taskInstance
+ * @return TaskExecutionContextBuilder
+ */
+ public TaskExecutionContextBuilder
buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
+ taskExecutionContext.setTaskId(taskInstance.getId());
+ taskExecutionContext.setTaskName(taskInstance.getName());
+ taskExecutionContext.setStartTime(taskInstance.getStartTime());
+ taskExecutionContext.setTaskType(taskInstance.getTaskType());
+ taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
+ taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
+ return this;
+ }
+
+
+ /**
+ * build processInstance related info
+ *
+ * @param processInstance
+ * @return TaskExecutionContextBuilder
+ */
+ public TaskExecutionContextBuilder
buildProcessInstanceRelatedInfo(ProcessInstance processInstance){
+ taskExecutionContext.setProcessInstanceId(processInstance.getId());
+
taskExecutionContext.setScheduleTime(processInstance.getScheduleTime());
+
taskExecutionContext.setGlobalParams(processInstance.getGlobalParams());
+ taskExecutionContext.setExecutorId(processInstance.getExecutorId());
+
taskExecutionContext.setCmdTypeIfComplement(processInstance.getCmdTypeIfComplement().getCode());
+ taskExecutionContext.setTenantCode(processInstance.getTenantCode());
+ taskExecutionContext.setQueue(processInstance.getQueue());
+ return this;
+ }
+
+ /**
+ * build processDefinition related info
+ *
+ * @param processDefinition processDefinition
+ * @return TaskExecutionContextBuilder
+ */
+ public TaskExecutionContextBuilder
buildProcessDefinitionRelatedInfo(ProcessDefinition processDefinition){
+ taskExecutionContext.setProcessDefineId(processDefinition.getId());
+ taskExecutionContext.setProjectId(processDefinition.getProjectId());
+ return this;
+ }
+
+ /**
+ * create
+ *
+ * @return taskExecutionContext
+ */
+ public TaskExecutionContext create(){
+ return taskExecutionContext;
+ }
+
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index d704629..0167839 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -20,6 +20,7 @@ import
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@@ -33,6 +34,7 @@ import
org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -128,13 +130,9 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
// TODO send task to worker
public void sendToWorker(TaskInstance taskInstance){
final Address address = new Address("127.0.0.1", 12346);
- /**
- * set taskInstance relation
- */
- TaskInstance destTaskInstance = setTaskInstanceRelation(taskInstance);
ExecuteTaskRequestCommand taskRequestCommand = new
ExecuteTaskRequestCommand(
-
FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance)));
+
FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance)));
try {
Command responseCommand = nettyRemotingClient.sendSync(address,
taskRequestCommand.convert2Command(), 2000);
@@ -156,18 +154,25 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
}
/**
- * set task instance relation
+ * get TaskExecutionContext
*
* @param taskInstance taskInstance
+ * @return TaskExecutionContext
*/
- private TaskInstance setTaskInstanceRelation(TaskInstance taskInstance){
+ private TaskExecutionContext getTaskExecutionContext(TaskInstance
taskInstance){
taskInstance =
processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
- int userId = taskInstance.getProcessDefine() == null ? 0 :
taskInstance.getProcessDefine().getUserId();
+ Integer userId = taskInstance.getProcessDefine() == null ? 0 :
taskInstance.getProcessDefine().getUserId();
Tenant tenant =
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
userId);
+
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
- processService.changeTaskState(ExecutionStatus.FAILURE,
taskInstance.getStartTime(), taskInstance.getHost(), null, null,
taskInstance.getId());
+ processService.changeTaskState(ExecutionStatus.FAILURE,
+ taskInstance.getStartTime(),
+ taskInstance.getHost(),
+ null,
+ null,
+ taskInstance.getId());
return null;
}
// set queue for process instance, user-specified queue takes
precedence over tenant queue
@@ -175,7 +180,11 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ?
tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
- return taskInstance;
+ return TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
+
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+ .create();
}
@@ -197,34 +206,6 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
/**
- * taskInstance convert to taskInfo
- *
- * @param taskInstance taskInstance
- * @return taskInfo
- */
- private TaskExecutionContext convertToTaskInfo(TaskInstance taskInstance){
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
- taskExecutionContext.setTaskId(taskInstance.getId());
- taskExecutionContext.setTaskName(taskInstance.getName());
- taskExecutionContext.setStartTime(taskInstance.getStartTime());
- taskExecutionContext.setTaskType(taskInstance.getTaskType());
- taskExecutionContext.setExecutePath(getExecLocalPath(taskInstance));
- taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
-
taskExecutionContext.setProcessInstanceId(taskInstance.getProcessInstance().getId());
-
taskExecutionContext.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime());
-
taskExecutionContext.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams());
-
taskExecutionContext.setExecutorId(taskInstance.getProcessInstance().getExecutorId());
-
taskExecutionContext.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode());
-
taskExecutionContext.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
-
taskExecutionContext.setQueue(taskInstance.getProcessInstance().getQueue());
-
taskExecutionContext.setProcessDefineId(taskInstance.getProcessDefine().getId());
-
taskExecutionContext.setProjectId(taskInstance.getProcessDefine().getProjectId());
-
- return taskExecutionContext;
- }
-
-
- /**
* get execute local path
*
* @return execute local path