ephraimbuddy commented on code in PR #30375:
URL: https://github.com/apache/airflow/pull/30375#discussion_r1157484492


##########
airflow/jobs/scheduler_job.py:
##########
@@ -1408,6 +1443,40 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) -> 
None:
         )
         self.executor.send_callback(request)
 
+    @provide_session
+    def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.

Review Comment:
   In the doc string here, we are using `task_queued_timeout` but 
self._task_queued_timeout is a max of many other configs



##########
airflow/jobs/scheduler_job.py:
##########
@@ -152,6 +152,43 @@ def __init__(
         self._zombie_threshold_secs = conf.getint("scheduler", 
"scheduler_zombie_task_threshold")
         self._standalone_dag_processor = conf.getboolean("scheduler", 
"standalone_dag_processor")
         self._dag_stale_not_seen_duration = conf.getint("scheduler", 
"dag_stale_not_seen_duration")
+
+        # Since the functionality for stalled_task_timeout, 
task_adoption_timeout, and worker_pods_pending_timeout
+        # are now handled by a single config (task_queued_timeout), we can't 
deprecate them as we normally would.
+        # So, we'll read each config and take the max value in order to ensure 
we're not undercutting a legitimate
+        # use of any of these configs.
+        stalled_task_timeout = conf.getfloat("celery", "stalled_task_timeout", 
fallback=0)
+        if stalled_task_timeout:
+            # TODO: Remove in Airflow 3.0
+            warnings.warn(
+                "The 'stalled_task_timeout' parameter is deprecated. "
+                "Please use 'scheduler.task_queued_timeout'.",
+                RemovedInAirflow3Warning,
+                stacklevel=2,
+            )
+        task_adoption_timeout = conf.getfloat("celery", 
"task_adoption_timeout", fallback=0)
+        if task_adoption_timeout:
+            # TODO: Remove in Airflow 3.0
+            warnings.warn(
+                "The 'task_adoption_timeout' parameter is deprecated. "
+                "Please use 'scheduler.task_queued_timeout'.",
+                RemovedInAirflow3Warning,
+                stacklevel=2,
+            )
+        worker_pods_pending_timeout = conf.getfloat("kubernetes", 
"worker_pods_pending_timeout", fallback=0)
+        if worker_pods_pending_timeout:
+            # TODO: Remove in Airflow 3.0
+            warnings.warn(
+                "The 'worker_pods_pending_timeout' parameter is deprecated. "
+                "Please use 'scheduler.task_queued_timeout'.",
+                RemovedInAirflow3Warning,
+                stacklevel=2,
+            )
+        task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")
+        self._task_queued_timeout = max(
+            stalled_task_timeout, task_adoption_timeout, 
worker_pods_pending_timeout, task_queued_timeout
+        )

Review Comment:
   Why are we taking the max value? Won't it be better to have a single config 
value? Having a single point of truth is better in my opinion.



##########
airflow/jobs/scheduler_job.py:
##########
@@ -1408,6 +1443,40 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) -> 
None:
         )
         self.executor.send_callback(request)
 
+    @provide_session
+    def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.
+        Tasks can get stuck in queued for a wide variety of reasons (e.g. 
celery loses
+        track of a task, a cluster can't further scale up its workers, etc.), 
but tasks
+        should not be stuck in queued for a long time. This will mark tasks 
stuck in
+        queued for longer than `self._task_queued_timeout` as failed. If the 
task has
+        available retries, it will be retried.
+        """
+        self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued 
method")
+        try:

Review Comment:
   Instead of this try, I think we should use `run_with_db_retries` see 
`adopt_or_reset_orphaned_tasks` below



##########
airflow/jobs/scheduler_job.py:
##########
@@ -152,6 +152,43 @@ def __init__(
         self._zombie_threshold_secs = conf.getint("scheduler", 
"scheduler_zombie_task_threshold")
         self._standalone_dag_processor = conf.getboolean("scheduler", 
"standalone_dag_processor")
         self._dag_stale_not_seen_duration = conf.getint("scheduler", 
"dag_stale_not_seen_duration")
+
+        # Since the functionality for stalled_task_timeout, 
task_adoption_timeout, and worker_pods_pending_timeout
+        # are now handled by a single config (task_queued_timeout), we can't 
deprecate them as we normally would.
+        # So, we'll read each config and take the max value in order to ensure 
we're not undercutting a legitimate
+        # use of any of these configs.
+        stalled_task_timeout = conf.getfloat("celery", "stalled_task_timeout", 
fallback=0)
+        if stalled_task_timeout:
+            # TODO: Remove in Airflow 3.0
+            warnings.warn(
+                "The 'stalled_task_timeout' parameter is deprecated. "
+                "Please use 'scheduler.task_queued_timeout'.",
+                RemovedInAirflow3Warning,
+                stacklevel=2,

Review Comment:
   Registering the deprecation at 
https://github.com/apache/airflow/blob/85427755f30d72e6c401ddc01b694b8a181c9a96/airflow/configuration.py#L175-L179
 is enough. Airflow will automatically warn. No need to have another warning



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to