mgsky1 commented on a change in pull request #6272:
URL: https://github.com/apache/dolphinscheduler/pull/6272#discussion_r716203874



##########
File path: 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.DependResult;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.model.DependentItem;
+import org.apache.dolphinscheduler.common.model.DependentTaskModel;
+import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
+import org.apache.dolphinscheduler.common.utils.DependentUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.utils.LogUtils;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.LoggerFactory;
+
+/**
+ * blocking task processor
+ */
+public class BlockingTaskProcessor extends BaseTaskProcessor {
+    /**
+     * dependent parameters
+     */
+    private DependentParameters dependentParameters;
+
+    private ProcessInstance processInstance;
+
+    /**
+     * condition result
+     */
+    private DependResult conditionResult = DependResult.WAITING;
+
+    /**
+     * complete task map
+     */
+    private Map<String, ExecutionStatus> completeTaskList = new 
ConcurrentHashMap<>();
+
+    protected ProcessService processService = 
SpringApplicationContext.getBean(ProcessService.class);
+    MasterConfig masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
+
+
+    private boolean isBlocked;
+
+    /**
+     * blocking condition result
+     */
+    private String blockingConditionResult;
+
+    @Override
+    public boolean submit(TaskInstance task, ProcessInstance processInstance, 
int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
+        this.processInstance = processInstance;
+        this.taskInstance = processService.submitTask(task, 
masterTaskCommitRetryTimes, masterTaskCommitInterval);
+
+        if (this.taskInstance == null) {
+            return false;
+        }
+
+        logger = 
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+                processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion(),
+                taskInstance.getProcessInstanceId(),
+                taskInstance.getId()));
+        String threadLoggerInfoName = 
String.format(Constants.TASK_LOG_INFO_FORMAT, 
processService.formatTaskAppId(this.taskInstance));
+        Thread.currentThread().setName(threadLoggerInfoName);
+        initTaskParameters();
+        logger.info("blocking task start");
+        return true;
+    }
+
+    @Override
+    public ExecutionStatus taskState() {
+        return this.taskInstance.getState();
+    }
+
+    @Override
+    protected boolean pauseTask() {
+        this.taskInstance.setState(ExecutionStatus.PAUSE);
+        this.taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        return true;
+    }
+
+    @Override
+    protected boolean killTask() {
+        this.taskInstance.setState(ExecutionStatus.KILL);
+        this.taskInstance.setEndTime(new Date());
+        processService.saveTaskInstance(taskInstance);
+        return true;
+    }
+
+    @Override
+    protected boolean taskTimeout() {
+        return false;
+    }
+
+    @Override
+    public void run() {
+        if (conditionResult.equals(DependResult.WAITING)) {
+            setConditionResult();
+        }
+        endTask();
+    }
+
+    /**
+     * init task running parameters
+     */
+    private void initTaskParameters() {
+        
this.taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion(),
+                taskInstance.getProcessInstanceId(),
+                taskInstance.getId()));
+        
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
+        this.taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        this.taskInstance.setStartTime(new Date());
+        this.processService.saveTaskInstance(taskInstance);
+        this.dependentParameters = taskInstance.getDependency();
+        this.blockingConditionResult = taskInstance.getBlockingCondition();
+    }
+
+    /**
+     *
+     * @param item the dependent item containing depTasks and status
+     * @return depend result for depend item. SUCCESS or FAILED
+     */
+    private DependResult getDependentResultForItem(DependentItem item) {
+        DependResult dependResult = DependResult.SUCCESS;
+        if (!completeTaskList.containsKey(item.getDepTasks())) {
+            logger.info("depend item: {} have not completed yet.", 
item.getDepTasks());
+            return dependResult;
+        }
+        // the actual status of task
+        ExecutionStatus executionStatus = 
completeTaskList.get(item.getDepTasks());
+        if (executionStatus != item.getStatus()) {
+            logger.info("depend item: {} expect status: {}, actual status: 
{}",item.getDepTasks(),
+                    item.getStatus(),executionStatus);
+            dependResult = DependResult.FAILED;
+        }
+        logger.info("dependent item complete {} {}, {}",
+                Constants.DEPENDENT_SPLIT,item.getStatus(),executionStatus);
+        return dependResult;
+    }
+
+    private void setConditionResult() {
+
+        List<TaskInstance> taskInstances = 
processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
+        for (TaskInstance task : taskInstances) {
+            completeTaskList.putIfAbsent(task.getName(), task.getState());
+        }
+
+        List<DependResult> modelResultList = new ArrayList<>();
+        for (DependentTaskModel dependentTaskModel : 
dependentParameters.getDependTaskList()) {
+            List<DependResult> itemDependResult = new ArrayList<>();
+            for (DependentItem item : dependentTaskModel.getDependItemList()) {
+                itemDependResult.add(getDependentResultForItem(item));
+            }
+            DependResult modelResult = 
DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), 
itemDependResult);
+            modelResultList.add(modelResult);
+        }
+        conditionResult = 
DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), 
modelResultList);
+        logger.info("the conditions task depend result : {}", conditionResult);
+    }
+
+    /**
+     * set task execution status and then update db
+     * for blocking task, determine blocking logic result in this function
+     */
+    private void endTask() {
+        ExecutionStatus status = ExecutionStatus.SUCCESS;
+        DependResult expected = 
"BlockingOnSuccess".equals(this.blockingConditionResult) ? DependResult.SUCCESS 
: DependResult.FAILED;

Review comment:
       If replace hardcode to variable, the code fragment maybe look like this:
   
   ```java
   String var = "BlockingOnSuccess";
   DependResult expected = var.equals(this.blockingConditionResult) ? 
DependResult.SUCCESS : DependResult.FAILED;
   ```
   It is confused to me why using variable is better?
   The hardcode `BlockingOnSuccess ` is received from task definition json 
string. I use this value to consider user expected `DependResult `. I think 
using hardcode and varivale are  the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to