njnu-seafish commented on code in PR #17796:
URL:
https://github.com/apache/dolphinscheduler/pull/17796#discussion_r2719725674
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java:
##########
@@ -84,23 +93,69 @@ public void run() {
}
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ final int taskInstanceId = taskExecutionRunnable.getId();
+ final TaskExecutionContext taskExecutionContext =
taskExecutionRunnable.getTaskExecutionContext();
try {
- if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId()))
{
+ if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
log.info(
"The task: {} doesn't exist in
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
- taskExecutionRunnable.getId());
+ taskInstanceId);
return;
}
taskExecutorClient.dispatch(taskExecutionRunnable);
- } catch (Exception e) {
+ } catch (Exception ex) {
+ if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) {
+ // If a dispatch timeout occurs, the task will not be put back
into the queue.
+ long timeoutMs =
this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
+ long elapsed = System.currentTimeMillis() -
taskExecutionContext.getFirstDispatchTime();
+ if (elapsed > timeoutMs) {
+ handleDispatchFailure(taskExecutionRunnable, ex, elapsed,
timeoutMs);
+ return;
+ }
+ }
+
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not
exceed 60 seconds
- long waitingTimeMills = Math.min(
+ long waitingTimeMillis = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
- dispatchTask(taskExecutionRunnable, waitingTimeMills);
- log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskExecutionRunnable.getId(),
- waitingTimeMills, e);
+ dispatchTask(taskExecutionRunnable, waitingTimeMillis);
+ log.warn("Dispatch Task: {} failed will retry after: {}/ms",
taskInstanceId,
+ waitingTimeMillis, ex);
+ }
+ }
+
+ /**
+ * Marks the specified task as fatally failed due to an unrecoverable
dispatch error,such as timeout
+ * Once this method is called, the task is considered permanently failed
and will not be retried.
+ */
+ private void handleDispatchFailure(ITaskExecutionRunnable
taskExecutionRunnable, Exception ex,
Review Comment:
> onDispatchTimeout
better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]