This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 9d89eb0  TaskExecutionContext create modify (#1994)
9d89eb0 is described below

commit 9d89eb03cdce259aef80dcac40da55a14876ad62
Author: qiaozhanwei <[email protected]>
AuthorDate: Fri Feb 21 22:34:40 2020 +0800

    TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
---
 .../builder/TaskExecutionContextBuilder.java       | 91 ++++++++++++++++++++++
 .../master/runner/MasterBaseTaskExecThread.java    | 57 +++++---------
 2 files changed, 110 insertions(+), 38 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
new file mode 100644
index 0000000..cafd894
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.builder;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+
+/**
+ *  TaskExecutionContext builder
+ */
+public class TaskExecutionContextBuilder {
+
+    public static TaskExecutionContextBuilder get(){
+        return new TaskExecutionContextBuilder();
+    }
+
+    private TaskExecutionContext taskExecutionContext =  new 
TaskExecutionContext();
+
+    /**
+     * build taskInstance related info
+     *
+     * @param taskInstance taskInstance
+     * @return TaskExecutionContextBuilder
+     */
+    public TaskExecutionContextBuilder 
buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
+        taskExecutionContext.setTaskId(taskInstance.getId());
+        taskExecutionContext.setTaskName(taskInstance.getName());
+        taskExecutionContext.setStartTime(taskInstance.getStartTime());
+        taskExecutionContext.setTaskType(taskInstance.getTaskType());
+        taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
+        taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
+        return this;
+    }
+
+
+    /**
+     * build processInstance related info
+     *
+     * @param processInstance
+     * @return TaskExecutionContextBuilder
+     */
+    public TaskExecutionContextBuilder 
buildProcessInstanceRelatedInfo(ProcessInstance processInstance){
+        taskExecutionContext.setProcessInstanceId(processInstance.getId());
+        
taskExecutionContext.setScheduleTime(processInstance.getScheduleTime());
+        
taskExecutionContext.setGlobalParams(processInstance.getGlobalParams());
+        taskExecutionContext.setExecutorId(processInstance.getExecutorId());
+        
taskExecutionContext.setCmdTypeIfComplement(processInstance.getCmdTypeIfComplement().getCode());
+        taskExecutionContext.setTenantCode(processInstance.getTenantCode());
+        taskExecutionContext.setQueue(processInstance.getQueue());
+        return this;
+    }
+
+    /**
+     * build processDefinition related info
+     *
+     * @param processDefinition processDefinition
+     * @return TaskExecutionContextBuilder
+     */
+    public TaskExecutionContextBuilder 
buildProcessDefinitionRelatedInfo(ProcessDefinition processDefinition){
+        taskExecutionContext.setProcessDefineId(processDefinition.getId());
+        taskExecutionContext.setProjectId(processDefinition.getProjectId());
+        return this;
+    }
+
+    /**
+     * create
+     *
+     * @return taskExecutionContext
+     */
+    public TaskExecutionContext create(){
+        return taskExecutionContext;
+    }
+
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index d704629..0167839 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -20,6 +20,7 @@ import 
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
@@ -33,6 +34,7 @@ import 
org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.remote.utils.Address;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -128,13 +130,9 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
     // TODO send task to worker
     public void sendToWorker(TaskInstance taskInstance){
         final Address address = new Address("127.0.0.1", 12346);
-        /**
-         *  set taskInstance relation
-         */
-        TaskInstance destTaskInstance = setTaskInstanceRelation(taskInstance);
 
         ExecuteTaskRequestCommand taskRequestCommand = new 
ExecuteTaskRequestCommand(
-                
FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance)));
+                
FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance)));
         try {
             Command responseCommand = nettyRemotingClient.sendSync(address,
                     taskRequestCommand.convert2Command(), 2000);
@@ -156,18 +154,25 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
     }
 
     /**
-     *  set task instance relation
+     * get TaskExecutionContext
      *
      * @param taskInstance taskInstance
+     * @return TaskExecutionContext
      */
-    private TaskInstance setTaskInstanceRelation(TaskInstance taskInstance){
+    private TaskExecutionContext getTaskExecutionContext(TaskInstance 
taskInstance){
         taskInstance = 
processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
 
-        int userId = taskInstance.getProcessDefine() == null ? 0 : 
taskInstance.getProcessDefine().getUserId();
+        Integer userId = taskInstance.getProcessDefine() == null ? 0 : 
taskInstance.getProcessDefine().getUserId();
         Tenant tenant = 
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
 userId);
+
         // verify tenant is null
         if (verifyTenantIsNull(tenant, taskInstance)) {
-            processService.changeTaskState(ExecutionStatus.FAILURE, 
taskInstance.getStartTime(), taskInstance.getHost(), null, null, 
taskInstance.getId());
+            processService.changeTaskState(ExecutionStatus.FAILURE,
+                    taskInstance.getStartTime(),
+                    taskInstance.getHost(),
+                    null,
+                    null,
+                    taskInstance.getId());
             return null;
         }
         // set queue for process instance, user-specified queue takes 
precedence over tenant queue
@@ -175,7 +180,11 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
         
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? 
tenant.getQueue() : userQueue);
         
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
 
-        return taskInstance;
+        return TaskExecutionContextBuilder.get()
+                .buildTaskInstanceRelatedInfo(taskInstance)
+                
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
+                
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+                .create();
     }
 
 
@@ -197,34 +206,6 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
 
 
     /**
-     * taskInstance convert to taskInfo
-     *
-     * @param taskInstance taskInstance
-     * @return taskInfo
-     */
-    private TaskExecutionContext convertToTaskInfo(TaskInstance taskInstance){
-        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
-        taskExecutionContext.setTaskId(taskInstance.getId());
-        taskExecutionContext.setTaskName(taskInstance.getName());
-        taskExecutionContext.setStartTime(taskInstance.getStartTime());
-        taskExecutionContext.setTaskType(taskInstance.getTaskType());
-        taskExecutionContext.setExecutePath(getExecLocalPath(taskInstance));
-        taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
-        
taskExecutionContext.setProcessInstanceId(taskInstance.getProcessInstance().getId());
-        
taskExecutionContext.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime());
-        
taskExecutionContext.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams());
-        
taskExecutionContext.setExecutorId(taskInstance.getProcessInstance().getExecutorId());
-        
taskExecutionContext.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode());
-        
taskExecutionContext.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
-        
taskExecutionContext.setQueue(taskInstance.getProcessInstance().getQueue());
-        
taskExecutionContext.setProcessDefineId(taskInstance.getProcessDefine().getId());
-        
taskExecutionContext.setProjectId(taskInstance.getProcessDefine().getProjectId());
-
-        return taskExecutionContext;
-    }
-
-
-    /**
      * get execute local path
      *
      * @return execute local path

Reply via email to