ruanwenjun commented on code in PR #17796:
URL: 
https://github.com/apache/dolphinscheduler/pull/17796#discussion_r2715585636


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java:
##########
@@ -97,6 +99,18 @@ public void validate(Object target, Errors errors) {
         if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) {
             errors.rejectValue("worker-group-refresh-interval", null, "should 
>= 10s");
         }
+
+        TaskDispatchPolicy dispatchPolicy = 
masterConfig.getTaskDispatchPolicy();
+        if (dispatchPolicy != null && 
dispatchPolicy.isDispatchTimeoutFailedEnabled()) {
+            if (dispatchPolicy.getMaxTaskDispatchDuration() == null) {
+                
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
+                        "must be specified when dispatch timeout checker is 
enabled");
+            } else if (dispatchPolicy.getMaxTaskDispatchDuration().toMillis() 
<= 0) {
+                
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
+                        "must be a positive duration (e.g., '2m', '5m', 
'30m')");
+            }
+        }

Review Comment:
   Can dispatchPolicy be null?



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java:
##########
@@ -84,23 +93,69 @@ public void run() {
     }
 
     private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+        final int taskInstanceId = taskExecutionRunnable.getId();
+        final TaskExecutionContext taskExecutionContext = 
taskExecutionRunnable.getTaskExecutionContext();
         try {
-            if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) 
{
+            if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
                 log.info(
                         "The task: {} doesn't exist in 
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
-                        taskExecutionRunnable.getId());
+                        taskInstanceId);
                 return;
             }
             taskExecutorClient.dispatch(taskExecutionRunnable);
-        } catch (Exception e) {
+        } catch (Exception ex) {
+            if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) {
+                // If a dispatch timeout occurs, the task will not be put back 
into the queue.
+                long timeoutMs = 
this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();

Review Comment:
   https://github.com/apache/dolphinscheduler/pull/17796#discussion_r2623030826



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java:
##########
@@ -84,23 +93,69 @@ public void run() {
     }
 
     private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+        final int taskInstanceId = taskExecutionRunnable.getId();
+        final TaskExecutionContext taskExecutionContext = 
taskExecutionRunnable.getTaskExecutionContext();
         try {
-            if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) 
{
+            if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
                 log.info(
                         "The task: {} doesn't exist in 
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
-                        taskExecutionRunnable.getId());
+                        taskInstanceId);
                 return;
             }
             taskExecutorClient.dispatch(taskExecutionRunnable);
-        } catch (Exception e) {
+        } catch (Exception ex) {
+            if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) {
+                // If a dispatch timeout occurs, the task will not be put back 
into the queue.
+                long timeoutMs = 
this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
+                long elapsed = System.currentTimeMillis() - 
taskExecutionContext.getFirstDispatchTime();
+                if (elapsed > timeoutMs) {
+                    handleDispatchFailure(taskExecutionRunnable, ex, elapsed, 
timeoutMs);
+                    return;
+                }
+            }
+
             // If dispatch failed, will put the task back to the queue
             // The task will be dispatched after waiting time.
             // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
-            long waitingTimeMills = Math.min(
+            long waitingTimeMillis = Math.min(
                     
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
-            dispatchTask(taskExecutionRunnable, waitingTimeMills);
-            log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskExecutionRunnable.getId(),
-                    waitingTimeMills, e);
+            dispatchTask(taskExecutionRunnable, waitingTimeMillis);
+            log.warn("Dispatch Task: {} failed will retry after: {}/ms", 
taskInstanceId,
+                    waitingTimeMillis, ex);
+        }
+    }
+
+    /**
+     * Marks the specified task as fatally failed due to an unrecoverable 
dispatch error,such as timeout
+     * Once this method is called, the task is considered permanently failed 
and will not be retried.
+     */
+    private void handleDispatchFailure(ITaskExecutionRunnable 
taskExecutionRunnable, Exception ex,
+                                       long elapsed, long timeoutMs) {
+        final String taskName = taskExecutionRunnable.getName();
+
+        log.warn("Dispatch fail, taskName: {}, timed out after {} ms (limit: 
{} ms))", taskName, elapsed, timeoutMs);
+
+        if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) {
+            log.error("Dispatch fail, taskName: {}, Worker group not found.", 
taskName, ex);
+            final TaskFailedLifecycleEvent taskFailedEvent = 
TaskFailedLifecycleEvent.builder()
+                    .taskExecutionRunnable(taskExecutionRunnable)
+                    .endTime(new Date())
+                    .build();
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+        } else if (ExceptionUtils.isNoAvailableWorkerException(ex)) {
+            log.error("Dispatch fail, taskName: {}, No available worker.", 
taskName, ex);
+            final TaskFailedLifecycleEvent taskFailedEvent = 
TaskFailedLifecycleEvent.builder()
+                    .taskExecutionRunnable(taskExecutionRunnable)
+                    .endTime(new Date())
+                    .build();
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+        } else {
+            log.error("Dispatch fail, taskName: {}, Unexpected dispatch 
error.", taskName, ex);
+            final TaskFailedLifecycleEvent taskFailedEvent = 
TaskFailedLifecycleEvent.builder()
+                    .taskExecutionRunnable(taskExecutionRunnable)
+                    .endTime(new Date())
+                    .build();
+            
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);

Review Comment:
   ```suggestion
           log.error("Task: {} dispatch timeout.", taskName, ex);
           final TaskFailedLifecycleEvent taskFailedEvent = 
TaskFailedLifecycleEvent.builder()
                       .taskExecutionRunnable(taskExecutionRunnable)
                       .endTime(new Date())
                       .build();
           taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
   ```



##########
dolphinscheduler-master/src/main/resources/application.yaml:
##########
@@ -111,6 +111,11 @@ master:
     # Master max concurrent workflow instances, when the master's workflow 
instance count exceeds this value, master server will be marked as busy.
     max-concurrent-workflow-instances: 2147483647
   worker-group-refresh-interval: 5m
+  # Task dispatch timeout check (currently disabled).
+  # When enabled, tasks not dispatched within this duration are marked as 
failed.
+  task-dispatch-policy:
+    dispatch-timeout-failed-enabled: false
+    max-task-dispatch-duration: 5m

Review Comment:
   ```suggestion
       max-task-dispatch-duration: 1h
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.config;
+
+import java.time.Duration;
+
+import lombok.Data;
+
+/**
+ * Configuration for the master's task dispatch policy.
+ * When enabled, tasks that remain in the dispatch queue longer than
+ * {@link #maxTaskDispatchDuration} will be marked as failed to prevent 
indefinite queuing.
+ */
+@Data
+public class TaskDispatchPolicy {
+
+    /**
+     * Indicates whether the dispatch timeout checking mechanism is enabled.
+     */
+    private boolean dispatchTimeoutFailedEnabled = false;

Review Comment:
   ```suggestion
       private boolean dispatchTimeoutEnabled = false;
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java:
##########
@@ -84,23 +93,69 @@ public void run() {
     }
 
     private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+        final int taskInstanceId = taskExecutionRunnable.getId();
+        final TaskExecutionContext taskExecutionContext = 
taskExecutionRunnable.getTaskExecutionContext();
         try {
-            if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) 
{
+            if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
                 log.info(
                         "The task: {} doesn't exist in 
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
-                        taskExecutionRunnable.getId());
+                        taskInstanceId);
                 return;
             }
             taskExecutorClient.dispatch(taskExecutionRunnable);
-        } catch (Exception e) {
+        } catch (Exception ex) {
+            if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) {
+                // If a dispatch timeout occurs, the task will not be put back 
into the queue.
+                long timeoutMs = 
this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
+                long elapsed = System.currentTimeMillis() - 
taskExecutionContext.getFirstDispatchTime();
+                if (elapsed > timeoutMs) {
+                    handleDispatchFailure(taskExecutionRunnable, ex, elapsed, 
timeoutMs);
+                    return;
+                }
+            }
+
             // If dispatch failed, will put the task back to the queue
             // The task will be dispatched after waiting time.
             // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
-            long waitingTimeMills = Math.min(
+            long waitingTimeMillis = Math.min(
                     
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
-            dispatchTask(taskExecutionRunnable, waitingTimeMills);
-            log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskExecutionRunnable.getId(),
-                    waitingTimeMills, e);
+            dispatchTask(taskExecutionRunnable, waitingTimeMillis);
+            log.warn("Dispatch Task: {} failed will retry after: {}/ms", 
taskInstanceId,
+                    waitingTimeMillis, ex);
+        }
+    }
+
+    /**
+     * Marks the specified task as fatally failed due to an unrecoverable 
dispatch error,such as timeout
+     * Once this method is called, the task is considered permanently failed 
and will not be retried.
+     */
+    private void handleDispatchFailure(ITaskExecutionRunnable 
taskExecutionRunnable, Exception ex,

Review Comment:
   ```suggestion
       private void onDispatchTimeout(ITaskExecutionRunnable 
taskExecutionRunnable, Exception ex,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to