njnu-seafish commented on code in PR #17796:
URL:
https://github.com/apache/dolphinscheduler/pull/17796#discussion_r2626081271
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java:
##########
@@ -84,23 +95,77 @@ public void run() {
}
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ final int taskId = taskExecutionRunnable.getId();
+ final TaskExecutionContext taskExecutionContext =
taskExecutionRunnable.getTaskExecutionContext();
+ final long timeoutMs = this.dispatchTimeout.toMillis();
try {
- if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId()))
{
+ if (!waitingDispatchTaskIds.remove(taskId)) {
log.info(
"The task: {} doesn't exist in
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
- taskExecutionRunnable.getId());
+ taskId);
return;
}
taskExecutorClient.dispatch(taskExecutionRunnable);
- } catch (Exception e) {
+ } catch (TaskDispatchException ex) {
+ // Checks whether the given task has exceeded its allowed dispatch
timeout.
+ long elapsed = System.currentTimeMillis() -
taskExecutionContext.getFirstDispatchEnqueueTimeMs();
+ if (elapsed > timeoutMs) {
+ handleDispatchFailure(taskExecutionRunnable, ex, elapsed,
timeoutMs);
+ return;
+ }
+
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not
exceed 60 seconds
- long waitingTimeMills = Math.min(
+ long waitingTimeMillis = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
- dispatchTask(taskExecutionRunnable, waitingTimeMills);
- log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskExecutionRunnable.getId(),
- waitingTimeMills, e);
+ dispatchTask(taskExecutionRunnable, waitingTimeMillis);
+ log.warn("Dispatch Task: {} failed will retry after: {}/ms",
taskId,
+ waitingTimeMillis, ex);
+ }
+ }
+
+ /**
+ * Marks the specified task as fatally failed due to an unrecoverable
dispatch error,such as timeout or persistent client failure.
+ * Once this method is called, the task is considered permanently failed
and will not be retried.
+ *
+ * @param taskExecutionRunnable the task to mark as fatally failed; must
not be null
+ * @param exception the dispatch exception that triggered this
failure handling; must not be null
+ * @param elapsed the time (in milliseconds) already spent
attempting to dispatch the task
+ * @param timeoutMs the configured dispatch timeout threshold
(in milliseconds)
+ */
+ private void handleDispatchFailure(ITaskExecutionRunnable
taskExecutionRunnable, TaskDispatchException exception,
Review Comment:
> Please don't print the `taskId` and `workflowId` here, all ids should
already be added by MDC. We should only need to print the exception here, the
`exception` already contains failure message.
ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]