Radeity commented on code in PR #16069:
URL: 
https://github.com/apache/dolphinscheduler/pull/16069#discussion_r1614473832


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java:
##########
@@ -64,29 +58,34 @@ public synchronized void start() {
 
     @Override
     public void run() {
-        DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
         while (RUNNING_FLAG.get()) {
-            defaultTaskExecuteRunnable = 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
-            try {
-                TaskExecutionStatus status = 
defaultTaskExecuteRunnable.getTaskInstance().getState();
-                if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status 
!= TaskExecutionStatus.DELAY_EXECUTION) {
-                    log.warn("The TaskInstance {} state is : {}, will not 
dispatch",
-                            
defaultTaskExecuteRunnable.getTaskInstance().getName(), status);
-                    continue;
-                }
+            doDispatch();
+        }
+    }
 
-                TaskDispatcher taskDispatcher =
-                        
taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance());
-                taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
-                DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0);
-            } catch (Exception e) {
-                
defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes();
-                
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable);
-                if (DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() > 
MAX_DISPATCHED_FAILED_TIMES) {
-                    ThreadUtils.sleep(10 * 1000L);
-                }
-                log.error("Dispatch Task: {} failed", 
defaultTaskExecuteRunnable.getTaskInstance().getName(), e);
+    void doDispatch() {
+        final TaskExecuteRunnable taskExecuteRunnable = 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
+        TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
+        if (taskInstance == null) {
+            // This case shouldn't happen, but if it does, log an error and 
continue
+            log.error("The TaskInstance is null, drop it(This case shouldn't 
happen)");
+            return;
+        }
+        try {
+            TaskExecutionStatus status = taskInstance.getState();
+            if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status != 
TaskExecutionStatus.DELAY_EXECUTION) {
+                log.warn("The TaskInstance {} state is : {}, will not 
dispatch", taskInstance.getName(), status);
+                return;
             }
+            
taskDispatchFactory.getTaskDispatcher(taskInstance).dispatchTask(taskExecuteRunnable);
+        } catch (Exception e) {
+            // If dispatch failed, will put the task back to the queue
+            // The task will be dispatched after waiting time.
+            // the waiting time will increase exponentially, but will not 
exceed 60 seconds
+            long waitingTimeMills = Math.max(
+                    
taskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);

Review Comment:
   It seems add 1s for each time, not exponentially.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to