jedcunningham commented on code in PR #30375:
URL: https://github.com/apache/airflow/pull/30375#discussion_r1165711032


##########
airflow/configuration.py:
##########
@@ -236,6 +236,16 @@ class AirflowConfigParser(ConfigParser):
         ("scheduler", "parsing_cleanup_interval"): ("scheduler", 
"deactivate_stale_dags_interval", "2.5.0"),
     }
 
+    # A mapping of new configurations to a list of old configurations for when 
one configuration
+    # deprecates more than one other deprecation.

Review Comment:
   ```suggestion
       # deprecates more than one other deprecation.
       # There is no special handling for values set under the old options - 
you must handle that yourself.
   ```
   
   Or something like that. Let's be explicit.



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -854,6 +815,45 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
         tis_to_flush.extend(pod_ids.values())
         return tis_to_flush
 
+    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
+        """
+        Handle remnants of tasks that were failed because they were stuck in 
queued.
+        Tasks can get stuck in queued. If such a task is detected, it will be 
marked
+        as `UP_FOR_RETRY` if the task instance has remaining retries or marked 
as `FAILED`
+        if it doesn't.
+
+        :param tis: List of Task Instances to clean up
+        :return: List of readable task instances for a warning message
+        """
+        readable_tis = []
+        for ti in tis:
+            readable_tis.append(repr(ti))

Review Comment:
   ```suggestion
   ```
   
   Nit: these next few are just refactoring how this is built to be simpler.
   
   1 of 4



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1427,6 +1463,53 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) -> 
None:
         )
         self.job.executor.send_callback(request)
 
+    @provide_session
+    def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.
+
+        If one of the configuration settings 
`kubernetes.worker_pods_pending_timeout`,
+        `celery.stalled_task_timeout`, or `celery.task_adoption_timeout` is 
set and
+        has a higher value than `scheduler.task_queued_timeout`, that value 
will be
+        used as `task_queued_timeout`.
+

Review Comment:
   ```suggestion
   ```
   
   Not sure we need this comment here, this is handled in a different scope.



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -854,6 +815,45 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
         tis_to_flush.extend(pod_ids.values())
         return tis_to_flush
 
+    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
+        """
+        Handle remnants of tasks that were failed because they were stuck in 
queued.
+        Tasks can get stuck in queued. If such a task is detected, it will be 
marked
+        as `UP_FOR_RETRY` if the task instance has remaining retries or marked 
as `FAILED`
+        if it doesn't.
+
+        :param tis: List of Task Instances to clean up
+        :return: List of readable task instances for a warning message
+        """
+        readable_tis = []
+        for ti in tis:
+            readable_tis.append(repr(ti))
+            selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                map_index=ti.map_index,
+                run_id=ti.run_id,
+                airflow_worker=ti.queued_by_job_id,
+            )
+            namespace = self._get_pod_namespace(ti)
+            pod_list = self.kube_client.list_namespaced_pod(
+                namespace=namespace,
+                label_selector=selector,
+            ).items
+            if not pod_list:
+                # Remove from list of tis that were cleaned up
+                readable_tis.pop()
+                self.log.warning("Cannot find pod for ti %s", ti)
+                continue
+            elif len(pod_list) > 1:
+                # Remove from list of tis that were cleaned up
+                readable_tis.pop()
+                self.log.warning("Found multiple pods for ti %s: %s", ti, 
pod_list)
+                continue
+            self.kube_scheduler.delete_pod(pod_id=pod_list[0].metadata.name, 
namespace=namespace)

Review Comment:
   ```suggestion
               readable_tis.append(repr(ti))
               self.kube_scheduler.delete_pod(pod_id=pod_list[0].metadata.name, 
namespace=namespace)
   ```
   
   4 of 4



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1427,6 +1463,53 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) -> 
None:
         )
         self.job.executor.send_callback(request)
 
+    @provide_session
+    def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.
+
+        If one of the configuration settings 
`kubernetes.worker_pods_pending_timeout`,
+        `celery.stalled_task_timeout`, or `celery.task_adoption_timeout` is 
set and
+        has a higher value than `scheduler.task_queued_timeout`, that value 
will be
+        used as `task_queued_timeout`.
+
+        Tasks can get stuck in queued for a wide variety of reasons (e.g. 
celery loses
+        track of a task, a cluster can't further scale up its workers, etc.), 
but tasks
+        should not be stuck in queued for a long time. This will mark tasks 
stuck in
+        queued for longer than `self._task_queued_timeout` as failed. If the 
task has
+        available retries, it will be retried.
+        """
+        self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued 
method")
+        for attempt in run_with_db_retries(logger=self.log):
+            with attempt:
+                self.log.debug(
+                    "Running SchedulerJob._fail_tasks_stuck_in_queued with 
retries. Try %d of %d",
+                    attempt.retry_state.attempt_number,
+                    MAX_DB_RETRIES,
+                )
+                try:
+                    query = session.query(TI).filter(
+                        TI.state == State.QUEUED,
+                        TI.queued_dttm < (timezone.utcnow() - 
timedelta(seconds=self._task_queued_timeout)),
+                    )
+                    tasks_stuck_in_queued: list[TaskInstance] = with_row_locks(
+                        query, of=TI, session=session, 
**skip_locked(session=session)
+                    ).all()
+                    print(f"\n\ntis: {tasks_stuck_in_queued}\n\n")

Review Comment:
   ```suggestion
                       print(f"\n\ntis: {tasks_stuck_in_queued}\n\n")
   ```
   
   Leftover debugging? Or, switch this to debug or warning log instead (with 
some extra context).



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -854,6 +815,45 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
         tis_to_flush.extend(pod_ids.values())
         return tis_to_flush
 
+    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
+        """
+        Handle remnants of tasks that were failed because they were stuck in 
queued.
+        Tasks can get stuck in queued. If such a task is detected, it will be 
marked
+        as `UP_FOR_RETRY` if the task instance has remaining retries or marked 
as `FAILED`
+        if it doesn't.
+
+        :param tis: List of Task Instances to clean up
+        :return: List of readable task instances for a warning message
+        """
+        readable_tis = []
+        for ti in tis:
+            readable_tis.append(repr(ti))
+            selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                map_index=ti.map_index,
+                run_id=ti.run_id,
+                airflow_worker=ti.queued_by_job_id,
+            )
+            namespace = self._get_pod_namespace(ti)
+            pod_list = self.kube_client.list_namespaced_pod(
+                namespace=namespace,
+                label_selector=selector,
+            ).items
+            if not pod_list:
+                # Remove from list of tis that were cleaned up
+                readable_tis.pop()

Review Comment:
   ```suggestion
   ```
   
   2 of 4



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -854,6 +815,45 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
         tis_to_flush.extend(pod_ids.values())
         return tis_to_flush
 
+    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
+        """
+        Handle remnants of tasks that were failed because they were stuck in 
queued.
+        Tasks can get stuck in queued. If such a task is detected, it will be 
marked
+        as `UP_FOR_RETRY` if the task instance has remaining retries or marked 
as `FAILED`
+        if it doesn't.
+
+        :param tis: List of Task Instances to clean up
+        :return: List of readable task instances for a warning message
+        """
+        readable_tis = []
+        for ti in tis:
+            readable_tis.append(repr(ti))
+            selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                map_index=ti.map_index,
+                run_id=ti.run_id,
+                airflow_worker=ti.queued_by_job_id,
+            )
+            namespace = self._get_pod_namespace(ti)
+            pod_list = self.kube_client.list_namespaced_pod(
+                namespace=namespace,
+                label_selector=selector,
+            ).items
+            if not pod_list:
+                # Remove from list of tis that were cleaned up
+                readable_tis.pop()
+                self.log.warning("Cannot find pod for ti %s", ti)
+                continue
+            elif len(pod_list) > 1:
+                # Remove from list of tis that were cleaned up
+                readable_tis.pop()

Review Comment:
   ```suggestion
   ```
   
   3 of 4



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to