jedcunningham commented on code in PR #30108:
URL: https://github.com/apache/airflow/pull/30108#discussion_r1142755002
##########
airflow/jobs/scheduler_job.py:
##########
@@ -152,6 +152,14 @@ def __init__(
self._zombie_threshold_secs = conf.getint("scheduler",
"scheduler_zombie_task_threshold")
self._standalone_dag_processor = conf.getboolean("scheduler",
"standalone_dag_processor")
self._dag_stale_not_seen_duration = conf.getint("scheduler",
"dag_stale_not_seen_duration")
+
+ stalled_task_timeout = conf.getfloat("celery", "stalled_task_timeout",
fallback=0)
+ # Fall back to celery.stalled_task_timeout for backward compatibility
+ self._task_queued_timeout = conf.getfloat(
+ "scheduler",
+ "task_queued_timeout",
+ fallback=stalled_task_timeout,
Review Comment:
You don't have to do this manually, Airflow does it for you when you marked
it as a deprecated option.
##########
airflow/jobs/scheduler_job.py:
##########
@@ -1408,6 +1416,40 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) ->
None:
)
self.executor.send_callback(request)
+ @provide_session
+ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) ->
None:
Review Comment:
worker_pods_queued_check_interval is similar, but different in that it won't
automatically just reset the TI. It first checks to see if the pod exists.
worker_pods_pending_timeout is essentially this same process though. It
should probably be deprecated as well (though, not sure how config handles many
-> one).
##########
airflow/jobs/scheduler_job.py:
##########
@@ -853,6 +858,10 @@ def _run_scheduler_loop(self) -> None:
# Check on start up, then every configured interval
self.adopt_or_reset_orphaned_tasks()
+ if self._task_queued_timeout:
+ self._fail_tasks_stuck_in_queued()
+ timers.call_regular_interval(self._task_queued_timeout,
self._fail_tasks_stuck_in_queued)
Review Comment:
This probably doesn't need to run at startup. Adoption makes sense to do
because it's pretty likely that another scheduler just shut down if we have a
new one starting, but I don't we have a similar situation here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]