ruanwenjun commented on code in PR #9955:
URL: https://github.com/apache/dolphinscheduler/pull/9955#discussion_r912302035


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java:
##########
@@ -150,6 +172,20 @@ public List<TaskPriority> batchDispatch(int fetchTaskNum) 
throws TaskPriorityQue
         List<TaskPriority> failedDispatchTasks = 
Collections.synchronizedList(new ArrayList<>());
         CountDownLatch latch = new CountDownLatch(fetchTaskNum);
 
+        // put the failed dispatch task into the dispatch queue again
+        for (int i = 0; i < fetchTaskNum; i++) {
+            TaskPriority dispatchFailedTaskPriority = 
taskPriorityDispatchFailedQueue.poll(Constants.SLEEP_TIME_MILLIS, 
TimeUnit.MILLISECONDS);
+            if (Objects.isNull(dispatchFailedTaskPriority)){
+                continue;
+            }
+            if (canRetry(dispatchFailedTaskPriority)){
+                
dispatchFailedTaskPriority.setDispatchFailedRetryTimes(dispatchFailedTaskPriority.getDispatchFailedRetryTimes()
 + 1);
+                taskPriorityQueue.put(dispatchFailedTaskPriority);
+            } else {
+                
taskPriorityDispatchFailedQueue.put(dispatchFailedTaskPriority);
+            }
+        }

Review Comment:
   It's not a good idea to deal with the dispatchFailedQueue in the normal 
process. 
   Most of the time, the failedDispatchQueue is empty, so if you use the 
default dispatchTaskNum -> 3, you will wait for 3s in each loop here, since you 
need to poll the failedDispatchQueue.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java:
##########
@@ -129,11 +149,13 @@ public void run() {
                 if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
                     
TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
                     for (TaskPriority dispatchFailedTask : 
failedDispatchTasks) {
-                        taskPriorityQueue.put(dispatchFailedTask);
-                    }
-                    // If the all task dispatch failed, will sleep for 1s to 
avoid the master cpu higher.
-                    if (fetchTaskNum == failedDispatchTasks.size()) {
-                        
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+                        // service alarm when retries 100 times
+                        if (dispatchFailedTask.getDispatchFailedRetryTimes() 
== Constants.DEFAULT_MAX_RETRY_COUNT){
+                            logger.error("the number of retries for dispatch 
failure has exceeded the maximum limit, taskId: {} processInstanceId: {}", 
dispatchFailedTask.getTaskId(), dispatchFailedTask.getProcessInstanceId());
+                            // business alarm
+                        }
+                        // differentiate the queue to prevent high priority 
from affecting the execution of other tasks
+                        
taskPriorityDispatchFailedQueue.put(dispatchFailedTask);

Review Comment:
   So if one task exceeds the max retry times, will it still retry in your plan?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to