caishunfeng commented on code in PR #13948:
URL: 
https://github.com/apache/dolphinscheduler/pull/13948#discussion_r1174800668


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread 
implements AutoCloseable {
+
+    @Autowired
+    private AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue;
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    private static final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
+
+    private ExecutorService asyncTaskStateCheckThreadPool;
+
+    public AsyncMasterTaskDelayQueueLooper() {
+        super("AsyncMasterTaskDelayQueueLooper");
+    }
+
+    @Override
+    public synchronized void start() {
+        if (!RUNNING_FLAG.compareAndSet(false, true)) {
+            log.info("The AsyncMasterTaskDelayQueueLooper has already been 
started, will not start again");
+            return;
+        }
+
+        log.info("AsyncMasterTaskDelayQueueLooper starting...");
+        super.start();
+        log.info("AsyncMasterTaskDelayQueueLooper started...");
+    }
+
+    @Override
+    public void run() {
+        asyncTaskStateCheckThreadPool = 
ThreadUtils.newDaemonFixedThreadExecutor("AsyncTaskStateCheckThreadPool",
+                masterConfig.getMasterAsyncTaskStateCheckThreadPoolSize());
+        while (RUNNING_FLAG.get()) {
+            AsyncTaskExecutionContext asyncTaskExecutionContext;
+            try {
+                asyncTaskExecutionContext = 
asyncMasterTaskDelayQueue.pollAsyncTask();
+            } catch (InterruptedException e) {
+                log.info("AsyncConditionTaskLooper has been interrupted, will 
break this loop", e);
+                Thread.currentThread().interrupt();
+                break;
+            }
+            final TaskExecutionContext taskExecutionContext = 
asyncTaskExecutionContext.getTaskExecutionContext();
+            try (
+                    LogUtils.MDCAutoClosableContext mdcAutoClosableContext = 
LogUtils.setWorkflowAndTaskInstanceIDMDC(
+                            taskExecutionContext.getProcessInstanceId(), 
taskExecutionContext.getTaskInstanceId());
+                    LogUtils.MDCAutoClosableContext mdcAutoClosableContext1 =
+                            
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) {
+
+                if (MasterTaskExecutionContextHolder
+                        
.getTaskExecutionContext(taskExecutionContext.getTaskInstanceId()) == null) {
+                    log.warn(
+                            "Cannot find the taskInstance from 
TaskExecutionContextCacheManager, the task may already been killed, will stop 
the async master task");
+                    continue;
+                }
+                asyncTaskStateCheckThreadPool.submit(() -> {
+                    final AsyncTaskExecuteFunction asyncTaskExecuteFunction =
+                            
asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
+                    final AsyncTaskCallbackFunction asyncTaskCallbackFunction =
+                            
asyncTaskExecutionContext.getAsyncTaskCallbackFunction();
+                    try (
+                            LogUtils.MDCAutoClosableContext 
mdcAutoClosableContext2 =
+                                    
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) {
+                        AsyncTaskExecuteFunction.AsyncTaskExecutionStatus 
asyncTaskExecutionStatus =
+                                
asyncTaskExecuteFunction.getAsyncTaskExecutionStatus();
+                        switch (asyncTaskExecutionStatus) {
+                            case RUNNING:

Review Comment:
   Why add async task into delay queue when RUNNING? Can you add some comments 
for it?



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread 
implements AutoCloseable {
+
+    @Autowired
+    private AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue;
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    private static final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
+
+    private ExecutorService asyncTaskStateCheckThreadPool;
+
+    public AsyncMasterTaskDelayQueueLooper() {
+        super("AsyncMasterTaskDelayQueueLooper");
+    }
+
+    @Override
+    public synchronized void start() {
+        if (!RUNNING_FLAG.compareAndSet(false, true)) {
+            log.info("The AsyncMasterTaskDelayQueueLooper has already been 
started, will not start again");
+            return;
+        }
+
+        log.info("AsyncMasterTaskDelayQueueLooper starting...");
+        super.start();
+        log.info("AsyncMasterTaskDelayQueueLooper started...");
+    }
+
+    @Override
+    public void run() {
+        asyncTaskStateCheckThreadPool = 
ThreadUtils.newDaemonFixedThreadExecutor("AsyncTaskStateCheckThreadPool",
+                masterConfig.getMasterAsyncTaskStateCheckThreadPoolSize());
+        while (RUNNING_FLAG.get()) {
+            AsyncTaskExecutionContext asyncTaskExecutionContext;
+            try {
+                asyncTaskExecutionContext = 
asyncMasterTaskDelayQueue.pollAsyncTask();
+            } catch (InterruptedException e) {
+                log.info("AsyncConditionTaskLooper has been interrupted, will 
break this loop", e);

Review Comment:
   ```suggestion
                   log.error("AsyncConditionTaskLooper has been interrupted, 
will break this loop", e);
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
+import static 
org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES;
+
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import 
org.apache.dolphinscheduler.server.master.exception.LogicTaskFactoryNotFoundException;
+import 
org.apache.dolphinscheduler.server.master.exception.LogicTaskInitializeException;
+import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
+import 
org.apache.dolphinscheduler.server.master.runner.message.MasterMessageSenderManager;
+import org.apache.dolphinscheduler.server.master.runner.task.ILogicTask;
+import 
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class MasterTaskExecuteRunnable implements Runnable {
+
+    protected final TaskExecutionContext taskExecutionContext;
+    protected final LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder;
+    protected final MasterMessageSenderManager masterMessageSenderManager;
+    protected ILogicTask logicTask;
+
+    public MasterTaskExecuteRunnable(TaskExecutionContext taskExecutionContext,
+                                     LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder,
+                                     MasterMessageSenderManager 
masterMessageSenderManager) {
+        this.taskExecutionContext = taskExecutionContext;
+        this.logicTaskPluginFactoryBuilder = logicTaskPluginFactoryBuilder;
+        this.masterMessageSenderManager = masterMessageSenderManager;
+    }
+
+    protected abstract void executeTask() throws MasterTaskExecuteException;
+
+    protected abstract void afterExecute() throws MasterTaskExecuteException;
+
+    protected void afterThrowing(Throwable throwable) {
+        try {
+            cancelTask();
+            log.info("Get a exception when execute the task, canceled the 
task");
+        } catch (Exception e) {
+            // todo: if the task cancel failed, do we need to set the task 
status to failure?

Review Comment:
   I think we need it, but maybe `Canceled` is better than `Fail`.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task.blocking;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.BlockingOpportunity;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
+import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
+import org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+@Slf4j
+public class BlockingLogicTask extends BaseSyncLogicTask<BlockingParameters> {
+
+    public static final String TASK_TYPE = "BLOCKING";
+
+    private final ProcessInstanceExecCacheManager 
processInstanceExecCacheManager;
+
+    private final ProcessInstanceDao processInstanceDao;
+
+    private final TaskInstanceDao taskInstanceDao;
+
+    public BlockingLogicTask(TaskExecutionContext taskExecutionContext,
+                             ProcessInstanceExecCacheManager 
processInstanceExecCacheManager,
+                             ProcessInstanceDao processInstanceDao,
+                             TaskInstanceDao taskInstanceDao) {
+        super(taskExecutionContext,
+                JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
new TypeReference<BlockingParameters>() {
+                }));
+        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
+        this.processInstanceDao = processInstanceDao;
+        this.taskInstanceDao = taskInstanceDao;
+    }
+
+    @Override
+    public void handle() throws MasterTaskExecuteException {
+        DependResult conditionResult = calculateConditionResult();
+        DependResult expected = taskParameters.getBlockingOpportunity()
+                .equals(BlockingOpportunity.BLOCKING_ON_SUCCESS.getDesc())
+                        ? DependResult.SUCCESS
+                        : DependResult.FAILED;
+        boolean isBlocked = (expected == conditionResult);
+        log.info("blocking opportunity: expected-->{}, actual-->{}", expected, 
conditionResult);
+        ProcessInstance workflowInstance = processInstanceExecCacheManager
+                
.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()).getProcessInstance();
+        workflowInstance.setBlocked(isBlocked);
+        if (isBlocked) {
+            
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_BLOCK, "ready 
block");
+        }
+        
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
+    }
+
+    private DependResult calculateConditionResult() throws 
MasterTaskExecuteException {
+        // todo: Directly get the task instance from the cache
+        Map<Long, TaskInstance> completeTaskList = taskInstanceDao
+                
.findValidTaskListByProcessId(taskExecutionContext.getProcessInstanceId(),
+                        taskExecutionContext.getTestFlag())
+                .stream()
+                .collect(Collectors.toMap(TaskInstance::getTaskCode, 
Function.identity()));
+
+        // todo: we need to parse the task parameter from TaskExecutionContext
+        TaskInstance taskInstance =
+                
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
+                        
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
+                        .orElseThrow(() -> new 
MasterTaskExecuteException("Task instance not found"));
+        DependentParameters dependentParameters = taskInstance.getDependency();
+
+        List<DependResult> tempResultList = new ArrayList<>();
+        for (DependentTaskModel dependentTaskModel : 
dependentParameters.getDependTaskList()) {
+            List<DependResult> itemDependResult = new ArrayList<>();
+            for (DependentItem item : dependentTaskModel.getDependItemList()) {
+                itemDependResult.add(getDependResultForItem(item, 
completeTaskList));
+            }
+            DependResult tempResult =
+                    
DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), 
itemDependResult);
+            tempResultList.add(tempResult);
+        }
+        return 
DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), 
tempResultList);
+    }
+
+    private DependResult getDependResultForItem(DependentItem item, Map<Long, 
TaskInstance> completeTaskList) {
+
+        DependResult dependResult = DependResult.SUCCESS;
+        if (!completeTaskList.containsKey(item.getDepTaskCode())) {
+            log.info("depend item: {} have not completed yet.", 
item.getDepTaskCode());
+            dependResult = DependResult.FAILED;

Review Comment:
   If not completed, it should be WAITING until dependent finish?



##########
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java:
##########
@@ -180,24 +180,22 @@ public void release() {
      * scan future table
      */
     public static void scanFutureTable() {
-        final List<ResponseFuture> futureList = new LinkedList<>();
         Iterator<Map.Entry<Long, ResponseFuture>> it = 
FUTURE_TABLE.entrySet().iterator();
         while (it.hasNext()) {
             Map.Entry<Long, ResponseFuture> next = it.next();
             ResponseFuture future = next.getValue();
-            if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 
1000) <= System.currentTimeMillis()) {
-                futureList.add(future);
-                it.remove();
-                log.warn("remove timeout request : {}", future);
+            if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 
1000) > System.currentTimeMillis()) {
+                continue;
             }
-        }
-        for (ResponseFuture future : futureList) {
             try {
+                // todo: use thread pool to execute the async callback, 
otherwise will block the scan thread
                 future.release();
                 future.executeInvokeCallback();
             } catch (Exception ex) {
-                log.warn("scanFutureTable, execute callback error", ex);
+                log.error("ScanFutureTable, execute callback error, requestId: 
{}", future.getOpaque(), ex);
             }
+            it.remove();
+            log.warn("Remove timeout request: {}", future);

Review Comment:
   ```suggestion
               log.debug("Remove timeout request: {}", future);
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task.blocking;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.BlockingOpportunity;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
+import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
+import org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+@Slf4j
+public class BlockingLogicTask extends BaseSyncLogicTask<BlockingParameters> {
+
+    public static final String TASK_TYPE = "BLOCKING";
+
+    private final ProcessInstanceExecCacheManager 
processInstanceExecCacheManager;
+
+    private final ProcessInstanceDao processInstanceDao;
+
+    private final TaskInstanceDao taskInstanceDao;
+
+    public BlockingLogicTask(TaskExecutionContext taskExecutionContext,
+                             ProcessInstanceExecCacheManager 
processInstanceExecCacheManager,
+                             ProcessInstanceDao processInstanceDao,
+                             TaskInstanceDao taskInstanceDao) {
+        super(taskExecutionContext,
+                JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
new TypeReference<BlockingParameters>() {
+                }));
+        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
+        this.processInstanceDao = processInstanceDao;
+        this.taskInstanceDao = taskInstanceDao;
+    }
+
+    @Override
+    public void handle() throws MasterTaskExecuteException {
+        DependResult conditionResult = calculateConditionResult();
+        DependResult expected = taskParameters.getBlockingOpportunity()
+                .equals(BlockingOpportunity.BLOCKING_ON_SUCCESS.getDesc())
+                        ? DependResult.SUCCESS
+                        : DependResult.FAILED;
+        boolean isBlocked = (expected == conditionResult);
+        log.info("blocking opportunity: expected-->{}, actual-->{}", expected, 
conditionResult);
+        ProcessInstance workflowInstance = processInstanceExecCacheManager
+                
.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()).getProcessInstance();
+        workflowInstance.setBlocked(isBlocked);
+        if (isBlocked) {
+            
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_BLOCK, "ready 
block");
+        }
+        
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
+    }
+
+    private DependResult calculateConditionResult() throws 
MasterTaskExecuteException {
+        // todo: Directly get the task instance from the cache
+        Map<Long, TaskInstance> completeTaskList = taskInstanceDao
+                
.findValidTaskListByProcessId(taskExecutionContext.getProcessInstanceId(),
+                        taskExecutionContext.getTestFlag())
+                .stream()
+                .collect(Collectors.toMap(TaskInstance::getTaskCode, 
Function.identity()));
+
+        // todo: we need to parse the task parameter from TaskExecutionContext
+        TaskInstance taskInstance =
+                
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
+                        
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
+                        .orElseThrow(() -> new 
MasterTaskExecuteException("Task instance not found"));
+        DependentParameters dependentParameters = taskInstance.getDependency();
+
+        List<DependResult> tempResultList = new ArrayList<>();
+        for (DependentTaskModel dependentTaskModel : 
dependentParameters.getDependTaskList()) {
+            List<DependResult> itemDependResult = new ArrayList<>();
+            for (DependentItem item : dependentTaskModel.getDependItemList()) {
+                itemDependResult.add(getDependResultForItem(item, 
completeTaskList));
+            }
+            DependResult tempResult =
+                    
DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), 
itemDependResult);
+            tempResultList.add(tempResult);
+        }
+        return 
DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), 
tempResultList);
+    }
+
+    private DependResult getDependResultForItem(DependentItem item, Map<Long, 
TaskInstance> completeTaskList) {
+
+        DependResult dependResult = DependResult.SUCCESS;
+        if (!completeTaskList.containsKey(item.getDepTaskCode())) {
+            log.info("depend item: {} have not completed yet.", 
item.getDepTaskCode());
+            dependResult = DependResult.FAILED;

Review Comment:
   :rofl:  I found it's `block` task, not dependent, can we change these method 
name?



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskExecutionContext.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Data;
+import lombok.NonNull;
+
+@Data
+public class AsyncTaskExecutionContext implements Delayed {
+
+    private final TaskExecutionContext taskExecutionContext;
+
+    private final AsyncTaskExecuteFunction asyncTaskExecuteFunction;
+
+    private final AsyncTaskCallbackFunction asyncTaskCallbackFunction;
+
+    private long currentStartTime;
+    private int executeTimes;
+    private final long executeInterval;
+    private long timeout;
+
+    public AsyncTaskExecutionContext(@NonNull TaskExecutionContext 
taskExecutionContext,
+                                     @NonNull AsyncTaskExecuteFunction 
asyncTaskExecuteFunction,
+                                     @NonNull AsyncTaskCallbackFunction 
asyncTaskCallbackFunction) {
+        this.taskExecutionContext = taskExecutionContext;
+        this.asyncTaskExecuteFunction = asyncTaskExecuteFunction;
+        this.asyncTaskCallbackFunction = asyncTaskCallbackFunction;
+        this.currentStartTime = 0;
+        this.executeTimes = 0;
+        if 
(TaskTimeoutStrategy.FAILED.equals(taskExecutionContext.getTaskTimeoutStrategy())
+                || 
TaskTimeoutStrategy.WARNFAILED.equals(taskExecutionContext.getTaskTimeoutStrategy()))
 {
+            // will timeout
+            this.timeout = taskExecutionContext.getStartTime()
+                    + 
TimeUnit.SECONDS.toMillis(taskExecutionContext.getTaskTimeout());
+        } else {
+            this.timeout = 
TimeUnit.SECONDS.toMillis(taskExecutionContext.getTaskTimeout());
+        }

Review Comment:
   Maybe we should remove the else branch? It seems a bit strange if don't need 
timeout.
   ```suggestion
           } 
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/MasterTaskUpdatePidMessageSender.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.MessageType;
+import org.apache.dolphinscheduler.remote.command.task.TaskUpdatePidMessage;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
+
+import lombok.NonNull;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MasterTaskUpdatePidMessageSender implements 
MasterMessageSender<TaskUpdatePidMessage> {

Review Comment:
   I think `RuntimeMessage` is better because it not just pid msg. WDYT?
   ```suggestion
   public class MasterTaskUpdateRuntimeMessageSender implements 
MasterMessageSender<TaskUpdatePidMessage> {
   ```



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskUpdatePidAckProcessor.java:
##########
@@ -38,7 +38,7 @@
  */
 @Component
 @Slf4j
-public class TaskUpdatePidAckProcessor implements NettyRequestProcessor {
+public class WorkerTaskUpdatePidAckProcessor implements WorkerRpcProcessor {

Review Comment:
   ```suggestion
   public class WorkerTaskUpdateRuntimeAckProcessor implements 
WorkerRpcProcessor {
   ```



##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java:
##########
@@ -423,7 +423,7 @@ private static List<String> skipTaskNode4Switch(TaskNode 
taskNode, Map<String, T
         if (CollectionUtils.isEmpty(switchTaskList)) {
             switchTaskList = new ArrayList<>();
         }
-        conditionResultVoList.remove(resultConditionLocation);
+        // conditionResultVoList.remove(resultConditionLocation);

Review Comment:
   Why remove this code?



##########
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java:
##########
@@ -17,267 +17,229 @@
 
 package org.apache.dolphinscheduler.server.master;
 
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
-
-import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
-import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
-import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
-import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
-import 
org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
-import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import 
org.apache.dolphinscheduler.server.master.runner.task.BlockingTaskProcessor;
-import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.model.TaskNode;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
-
-public class BlockingTaskTest {
-
-    /**
-     * TaskNode.runFlag : task can be run normally
-     */
-    public static final String FLOW_NODE_RUN_FLAG_NORMAL = "NORMAL";
-
-    private ProcessService processService;
-
-    private TaskInstanceDao taskInstanceDao;
-
-    private TaskDefinitionDao taskDefinitionDao;
-
-    private ProcessInstance processInstance;
-
-    private MasterConfig config;
-
-    private MockedStatic<SpringApplicationContext> 
mockedStaticSpringApplicationContext;
-
-    @BeforeEach
-    public void before() {
-        // mock master
-        config = new MasterConfig();
-        config.setTaskCommitRetryTimes(3);
-        config.setTaskCommitInterval(Duration.ofSeconds(1));
-
-        mockedStaticSpringApplicationContext = 
Mockito.mockStatic(SpringApplicationContext.class);
-        
Mockito.when(SpringApplicationContext.getBean(MasterConfig.class)).thenReturn(config);
-
-        // mock process service
-        processService = Mockito.mock(ProcessService.class);
-        
Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
-
-        taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
-        
Mockito.when(SpringApplicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
-
-        taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
-        
Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
-
-        // mock process instance
-        processInstance = getProcessInstance();
-        Mockito.when(processService
-                .findProcessInstanceById(processInstance.getId()))
-                .thenReturn(processInstance);
-
-        TaskDefinition taskDefinition = new TaskDefinition();
-        taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
-        taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
-        taskDefinition.setTimeout(0);
-        Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
-                .thenReturn(taskDefinition);
-    }
-
-    @AfterEach
-    public void after() {
-        mockedStaticSpringApplicationContext.close();
-    }
-
-    private ProcessInstance getProcessInstance() {
-        // mock process instance
-        ProcessInstance processInstance = new ProcessInstance();
-        processInstance.setId(1000);
-        processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
-        processInstance.setProcessDefinitionCode(1L);
-
-        return processInstance;
-    }
-
-    private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance 
processInstance) {
-        // wrap taskNode
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(100);
-        taskInstance.setName(taskNode.getName());
-        taskInstance.setTaskType(taskNode.getType().toUpperCase());
-        taskInstance.setTaskCode(taskNode.getCode());
-        taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
-        taskInstance.setProcessInstanceId(processInstance.getId());
-        taskInstance.setTaskParams(taskNode.getTaskParams());
-        taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
-        taskInstance.setFirstSubmitTime(new Date());
-        Mockito.when(processService
-                .submitTaskWithRetry(Mockito.any(ProcessInstance.class), 
Mockito.any(TaskInstance.class),
-                        Mockito.any(Integer.class), Mockito.any(Long.class)))
-                .thenReturn(taskInstance);
-        return taskInstance;
-    }
-
-    private TaskNode getTaskNode(String blockingCondition) {
-        // mock task nodes
-        // 1----\
-        // 2-----4(Blocking Node)
-        // 3----/
-        // blocking logic: 1-->SUCCESS 2-->SUCCESS 3-->SUCCESS
-        TaskNode taskNode = new TaskNode();
-        taskNode.setId("tasks-1000");
-        taskNode.setName("4");
-        taskNode.setCode(1L);
-        taskNode.setVersion(1);
-        taskNode.setType(TASK_TYPE_BLOCKING);
-        taskNode.setRunFlag(FLOW_NODE_RUN_FLAG_NORMAL);
-
-        DependentItem dependentItemA = new DependentItem();
-        dependentItemA.setDepTaskCode(1L);
-        dependentItemA.setStatus(TaskExecutionStatus.SUCCESS);
-
-        DependentItem dependentItemB = new DependentItem();
-        dependentItemB.setDepTaskCode(2L);
-        dependentItemB.setStatus(TaskExecutionStatus.SUCCESS);
-
-        DependentItem dependentItemC = new DependentItem();
-        dependentItemC.setDepTaskCode(3L);
-        dependentItemC.setStatus(TaskExecutionStatus.SUCCESS);
-
-        // build relation
-        DependentTaskModel dependentTaskModel = new DependentTaskModel();
-        dependentTaskModel.setDependItemList(Stream.of(dependentItemA, 
dependentItemB, dependentItemC)
-                .collect(Collectors.toList()));
-        dependentTaskModel.setRelation(DependentRelation.AND);
-
-        DependentParameters dependentParameters = new DependentParameters();
-        
dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
-        dependentParameters.setRelation(DependentRelation.AND);
-
-        taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
-
-        // set blocking node params
-        BlockingParameters blockingParameters = new BlockingParameters();
-        blockingParameters.setAlertWhenBlocking(false);
-        blockingParameters.setBlockingCondition(blockingCondition);
-
-        taskNode.setParams(JSONUtils.toJsonString(blockingParameters));
-
-        return taskNode;
-    }
-
-    private TaskInstance testBasicInit(String blockingCondition, 
TaskExecutionStatus... expectResults) {
-
-        TaskInstance taskInstance = 
getTaskInstance(getTaskNode(blockingCondition), processInstance);
-
-        Mockito.when(processService
-                .submitTask(processInstance, taskInstance))
-                .thenReturn(taskInstance);
-
-        Mockito.when(taskInstanceDao
-                .findTaskInstanceById(taskInstance.getId()))
-                .thenReturn(taskInstance);
-
-        // for BlockingTaskExecThread.initTaskParameters
-        Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
-                .thenReturn(true);
-
-        // for BlockingTaskExecThread.updateTaskState
-        Mockito.when(taskInstanceDao
-                .updateTaskInstance(taskInstance))
-                .thenReturn(true);
-
-        // for BlockingTaskExecThread.waitTaskQuit
-        List<TaskInstance> conditions = 
getTaskInstanceForValidTaskList(expectResults);
-        Mockito.when(
-                
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), 
processInstance.getTestFlag()))
-                .thenReturn(conditions);
-        taskInstance.setProcessInstance(processInstance);
-        return taskInstance;
-    }
-
-    /**
-     * mock task instance and its execution result in front of blocking node
-     */
-    private List<TaskInstance> 
getTaskInstanceForValidTaskList(TaskExecutionStatus... status) {
-        List<TaskInstance> taskInstanceList = new ArrayList<>();
-        for (int i = 1; i <= status.length; i++) {
-            TaskInstance taskInstance = new TaskInstance();
-            taskInstance.setId(i);
-            taskInstance.setName(String.valueOf(i));
-            taskInstance.setState(status[i - 1]);
-            taskInstanceList.add(taskInstance);
-        }
-        return taskInstanceList;
-    }
-
-    @Test
-    public void testBlockingTaskSubmit() {
-        TaskInstance taskInstance = testBasicInit("BlockingOnFailed",
-                TaskExecutionStatus.SUCCESS, TaskExecutionStatus.FAILURE, 
TaskExecutionStatus.SUCCESS);
-        BlockingTaskProcessor blockingTaskProcessor = new 
BlockingTaskProcessor();
-        blockingTaskProcessor.init(taskInstance, processInstance);
-        boolean res = blockingTaskProcessor.action(TaskAction.SUBMIT);
-        Assertions.assertTrue(res);
-    }
-
-    @Test
-    public void testPauseTask() {
-        TaskInstance taskInstance = testBasicInit("BlockingOnFailed",
-                TaskExecutionStatus.SUCCESS, TaskExecutionStatus.FAILURE, 
TaskExecutionStatus.SUCCESS);
-        BlockingTaskProcessor blockingTaskProcessor = new 
BlockingTaskProcessor();
-        blockingTaskProcessor.init(taskInstance, processInstance);
-        blockingTaskProcessor.action(TaskAction.SUBMIT);
-        blockingTaskProcessor.action(TaskAction.PAUSE);
-        TaskExecutionStatus status = taskInstance.getState();
-        Assertions.assertEquals(TaskExecutionStatus.PAUSE, status);
-    }
-
-    @Test
-    public void testBlocking() {
-        TaskInstance taskInstance = testBasicInit("BlockingOnFailed",
-                TaskExecutionStatus.SUCCESS, TaskExecutionStatus.FAILURE, 
TaskExecutionStatus.SUCCESS);
-        BlockingTaskProcessor blockingTaskProcessor = new 
BlockingTaskProcessor();
-        blockingTaskProcessor.init(taskInstance, processInstance);
-        blockingTaskProcessor.action(TaskAction.SUBMIT);
-        blockingTaskProcessor.action(TaskAction.RUN);
-        WorkflowExecutionStatus status = processInstance.getState();
-        Assertions.assertEquals(WorkflowExecutionStatus.READY_BLOCK, status);
-    }
-
-    @Test
-    public void testNoneBlocking() {
-        TaskInstance taskInstance = testBasicInit("BlockingOnSuccess",
-                TaskExecutionStatus.SUCCESS, TaskExecutionStatus.SUCCESS, 
TaskExecutionStatus.SUCCESS);
-        BlockingTaskProcessor blockingTaskProcessor = new 
BlockingTaskProcessor();
-        blockingTaskProcessor.init(taskInstance, processInstance);
-        blockingTaskProcessor.action(TaskAction.SUBMIT);
-        blockingTaskProcessor.action(TaskAction.RUN);
-        WorkflowExecutionStatus status = processInstance.getState();
-        Assertions.assertEquals(WorkflowExecutionStatus.RUNNING_EXECUTION, 
status);
-    }
-}
+// public class BlockingTaskTest {

Review Comment:
   It's better to update the test case.



##########
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java:
##########
@@ -17,176 +17,143 @@
 
 package org.apache.dolphinscheduler.server.master;
 
-import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
-import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
-import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
-import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
-import 
org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
-import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.model.TaskNode;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-import org.springframework.context.ApplicationContext;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.LENIENT)
-public class ConditionsTaskTest {
-
-    /**
-     * TaskNode.runFlag : task can be run normally
-     */
-    public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
-
-    private ProcessService processService;
-
-    private ProcessInstance processInstance;
-
-    private TaskInstanceDao taskInstanceDao;
-
-    private TaskDefinitionDao taskDefinitionDao;
-
-    @BeforeEach
-    public void before() {
-        ApplicationContext applicationContext = 
Mockito.mock(ApplicationContext.class);
-        SpringApplicationContext springApplicationContext = new 
SpringApplicationContext();
-        springApplicationContext.setApplicationContext(applicationContext);
-
-        MasterConfig config = new MasterConfig();
-        
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
-        config.setTaskCommitRetryTimes(3);
-        config.setTaskCommitInterval(Duration.ofSeconds(1));
-
-        processService = Mockito.mock(ProcessService.class);
-        
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
-
-        taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
-        
Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
-
-        taskDefinitionDao = Mockito.mock(TaskDefinitionDao.class);
-        
Mockito.when(SpringApplicationContext.getBean(TaskDefinitionDao.class)).thenReturn(taskDefinitionDao);
-
-        processInstance = getProcessInstance();
-        Mockito.when(processService
-                .findProcessInstanceById(processInstance.getId()))
-                .thenReturn(processInstance);
-
-        TaskDefinition taskDefinition = new TaskDefinition();
-        taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
-        taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
-        taskDefinition.setTimeout(0);
-        Mockito.when(taskDefinitionDao.findTaskDefinition(1L, 1))
-                .thenReturn(taskDefinition);
-    }
-
-    private TaskInstance testBasicInit(TaskExecutionStatus expectResult) {
-        TaskInstance taskInstance = getTaskInstance(getTaskNode(), 
processInstance);
-
-        // for MasterBaseTaskExecThread.submit
-        Mockito.when(processService
-                .submitTask(processInstance, taskInstance))
-                .thenReturn(taskInstance);
-        // for MasterBaseTaskExecThread.call
-        Mockito.when(taskInstanceDao
-                .findTaskInstanceById(taskInstance.getId()))
-                .thenReturn(taskInstance);
-        // for ConditionsTaskExecThread.initTaskParameters
-        Mockito.when(taskInstanceDao.upsertTaskInstance(taskInstance))
-                .thenReturn(true);
-        // for ConditionsTaskExecThread.updateTaskState
-        Mockito.when(taskInstanceDao
-                .updateTaskInstance(taskInstance))
-                .thenReturn(true);
-
-        // for ConditionsTaskExecThread.waitTaskQuit
-        List<TaskInstance> conditions = Stream.of(
-                
getTaskInstanceForValidTaskList(expectResult)).collect(Collectors.toList());
-        Mockito.when(taskInstanceDao
-                .findValidTaskListByProcessId(processInstance.getId(), 
processInstance.getTestFlag()))
-                .thenReturn(conditions);
-        return taskInstance;
-    }
-
-    private TaskNode getTaskNode() {
-        TaskNode taskNode = new TaskNode();
-        taskNode.setId("tasks-1000");
-        taskNode.setName("C");
-        taskNode.setCode(1L);
-        taskNode.setVersion(1);
-        taskNode.setType("CONDITIONS");
-        taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
-
-        DependentItem dependentItem = new DependentItem();
-        dependentItem.setDepTaskCode(11L);
-        dependentItem.setStatus(TaskExecutionStatus.SUCCESS);
-
-        DependentTaskModel dependentTaskModel = new DependentTaskModel();
-        
dependentTaskModel.setDependItemList(Stream.of(dependentItem).collect(Collectors.toList()));
-        dependentTaskModel.setRelation(DependentRelation.AND);
-
-        DependentParameters dependentParameters = new DependentParameters();
-        
dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
-        dependentParameters.setRelation(DependentRelation.AND);
-
-        // in: AND(AND(1 is SUCCESS))
-        taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
-
-        ConditionsParameters conditionsParameters = new ConditionsParameters();
-        
conditionsParameters.setSuccessNode(Stream.of("2").collect(Collectors.toList()));
-        
conditionsParameters.setFailedNode(Stream.of("3").collect(Collectors.toList()));
-
-        // out: SUCCESS => 2, FAILED => 3
-        
taskNode.setConditionResult(JSONUtils.toJsonString(conditionsParameters));
-
-        return taskNode;
-    }
-
-    private ProcessInstance getProcessInstance() {
-        ProcessInstance processInstance = new ProcessInstance();
-        processInstance.setId(1000);
-        processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
-
-        return processInstance;
-    }
-
-    private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance 
processInstance) {
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(1000);
-        taskInstance.setName(taskNode.getName());
-        taskInstance.setTaskType(taskNode.getType().toUpperCase());
-        taskInstance.setTaskCode(taskNode.getCode());
-        taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
-        taskInstance.setProcessInstanceId(processInstance.getId());
-        taskInstance.setTaskParams(taskNode.getTaskParams());
-        return taskInstance;
-    }
-
-    private TaskInstance getTaskInstanceForValidTaskList(TaskExecutionStatus 
state) {
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(1001);
-        taskInstance.setName("1");
-        taskInstance.setState(state);
-        return taskInstance;
-    }
-}
+// @ExtendWith(MockitoExtension.class)
+// @MockitoSettings(strictness = Strictness.LENIENT)
+// public class ConditionsTaskTest {

Review Comment:
   Same here.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.execute;
+
+import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
+import static 
org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES;
+
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import 
org.apache.dolphinscheduler.server.master.exception.LogicTaskFactoryNotFoundException;
+import 
org.apache.dolphinscheduler.server.master.exception.LogicTaskInitializeException;
+import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
+import 
org.apache.dolphinscheduler.server.master.runner.message.MasterMessageSenderManager;
+import org.apache.dolphinscheduler.server.master.runner.task.ILogicTask;
+import 
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class MasterTaskExecuteRunnable implements Runnable {
+
+    protected final TaskExecutionContext taskExecutionContext;
+    protected final LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder;
+    protected final MasterMessageSenderManager masterMessageSenderManager;
+    protected ILogicTask logicTask;
+
+    public MasterTaskExecuteRunnable(TaskExecutionContext taskExecutionContext,
+                                     LogicTaskPluginFactoryBuilder 
logicTaskPluginFactoryBuilder,
+                                     MasterMessageSenderManager 
masterMessageSenderManager) {
+        this.taskExecutionContext = taskExecutionContext;
+        this.logicTaskPluginFactoryBuilder = logicTaskPluginFactoryBuilder;
+        this.masterMessageSenderManager = masterMessageSenderManager;
+    }
+
+    protected abstract void executeTask() throws MasterTaskExecuteException;
+
+    protected abstract void afterExecute() throws MasterTaskExecuteException;
+
+    protected void afterThrowing(Throwable throwable) {
+        try {
+            cancelTask();
+            log.info("Get a exception when execute the task, canceled the 
task");
+        } catch (Exception e) {
+            // todo: if the task cancel failed, do we need to set the task 
status to failure?
+            log.error("Cancel task failed,", e);
+        }
+        
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
+        sendTaskResult();
+        log.info(
+                "Get a exception when execute the task, sent the task execute 
result to master, the current task execute result is {}",
+                taskExecutionContext.getCurrentExecutionStatus());
+        
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        log.info("Get a exception when execute the task, removed the 
TaskExecutionContext");
+    }
+
+    public void cancelTask() throws MasterTaskExecuteException {
+        if (logicTask != null) {
+            logicTask.kill();
+        }
+    }
+
+    public void pauseTask() throws MasterTaskExecuteException {
+        if (logicTask != null) {
+            logicTask.pause();
+        }
+    }
+
+    public TaskExecutionContext getTaskExecutionContext() {
+        return taskExecutionContext;
+    }
+
+    @Override
+    public void run() {
+        try (
+                final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = 
LogUtils.setWorkflowAndTaskInstanceIDMDC(
+                        taskExecutionContext.getProcessInstanceId(), 
taskExecutionContext.getTaskInstanceId());
+                final LogUtils.MDCAutoClosableContext mdcAutoClosableContext1 =
+                        
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) {
+            TaskInstanceLogHeader.printInitializeTaskContextHeader();
+            initializeTask();
+
+            if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {

Review Comment:
   Does it need dry run model in master? Some logic task will be wrong in dry 
run, like `Switch`, `Condition`?



##########
dolphinscheduler-standalone-server/src/main/resources/application.yaml:
##########
@@ -143,7 +143,7 @@ master:
   # master max cpuload avg percentage, only higher than the system cpu load 
average, master server can schedule. default value 1: will use 100% cpu
   max-cpu-load-avg: 1
   # master reserved memory, only lower than system available memory, master 
server can schedule. default value 0.3, only the available memory is higher 
than 30%, master server can schedule.
-  reserved-memory: 0.3
+  reserved-memory: 0.01

Review Comment:
   Please recovery the config, or you should change the comment too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to