This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bada372  Fix stuck "queued" tasks in KubernetesExecutor (#18152)
bada372 is described below

commit bada372ce3c06e30dd47813c21aba2b979cf111b
Author: Andrew Godwin <[email protected]>
AuthorDate: Mon Sep 20 15:20:56 2021 -0600

    Fix stuck "queued" tasks in KubernetesExecutor (#18152)
    
    There are a set of circumstances where TaskInstances can get "stuck" in the 
QUEUED state when they are running under KubernetesExecutor, where they claim 
to have a pod scheduled (and so are queued) but do not actually have one, and 
so sit there forever.
    
    It appears this happens occasionally with reschedule sensors and now more 
often with deferrable tasks, when the task instance defers/reschedules and then 
resumes before the old pod has vanished. It would also, I believe, happen when 
the Executor hard-exits with items still in its internal queues.
    
    There was a pre-existing method in there to clean up stuck queued tasks, 
but it only ran once, on executor start. I have modified it to be safe to run 
periodically (by teaching it not to touch things that the executor looked at 
recently), and then made it run every so often (60 seconds by default).
    
    This is not a perfect fix - the only real fix would be to have far more 
detailed state tracking as part of TaskInstance or another table, and 
re-architect the KubernetesExecutor. However, this should reduce the number of 
times this happens very signficantly, so it should do for now.
---
 airflow/config_templates/config.yml          |  7 ++++
 airflow/config_templates/default_airflow.cfg |  3 ++
 airflow/executors/kubernetes_executor.py     | 49 +++++++++++++++++++---------
 airflow/kubernetes/kube_config.py            |  3 ++
 tests/executors/test_kubernetes_executor.py  |  2 +-
 5 files changed, 47 insertions(+), 17 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 4b9e342..9612727 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2224,6 +2224,13 @@
       type: integer
       example: ~
       default: "120"
+    - name: worker_pods_queued_check_interval
+      description: |
+        How often in seconds to check for task instances stuck in "queued" 
status without a pod
+      version_added: 2.2.0
+      type: integer
+      example: ~
+      default: "60"
     - name: worker_pods_pending_timeout_batch_size
       description: |
         How many pending pods to check for timeout violations in each check 
interval.
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 558e355..84af35c 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1097,6 +1097,9 @@ worker_pods_pending_timeout = 300
 # How often in seconds to check if Pending workers have exceeded their timeouts
 worker_pods_pending_timeout_check_interval = 120
 
+# How often in seconds to check for task instances stuck in "queued" status 
without a pod
+worker_pods_queued_check_interval = 60
+
 # How many pending pods to check for timeout violations in each check interval.
 # You may want this higher if you have a very large cluster and/or use 
``multi_namespace_mode``.
 worker_pods_pending_timeout_batch_size = 100
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 5e748da..8caf3ba 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -431,40 +431,48 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         self.kube_client: Optional[client.CoreV1Api] = None
         self.scheduler_job_id: Optional[str] = None
         self.event_scheduler: Optional[EventScheduler] = None
+        self.last_handled: Dict[TaskInstanceKey, int] = {}
         super().__init__(parallelism=self.kube_config.parallelism)
 
     @provide_session
     def clear_not_launched_queued_tasks(self, session=None) -> None:
         """
-        If the airflow scheduler restarts with pending "Queued" tasks, the 
tasks may or
-        may not
-        have been launched. Thus on starting up the scheduler let's check every
-        "Queued" task to
-        see if it has been launched (ie: if there is a corresponding pod on 
kubernetes)
-
-        If it has been launched then do nothing, otherwise reset the state to 
"None" so
-        the task
-        will be rescheduled
-
-        This will not be necessary in a future version of airflow in which 
there is
-        proper support
-        for State.LAUNCHED
+        Tasks can end up in a "Queued" state through either the executor being
+        abruptly shut down (leaving a non-empty task_queue on this executor)
+        or when a rescheduled/deferred operator comes back up for execution
+        (with the same try_number) before the pod of its previous incarnation
+        has been fully removed (we think).
+
+        This method checks each of those tasks to see if the corresponding pod
+        is around, and if not, and there's no matching entry in our own
+        task_queue, marks it for re-execution.
         """
         self.log.debug("Clearing tasks that have not been launched")
         if not self.kube_client:
             raise AirflowException(NOT_STARTED_MESSAGE)
         queued_tasks = session.query(TaskInstance).filter(TaskInstance.state 
== State.QUEUED).all()
-        self.log.info('When executor started up, found %s queued task 
instances', len(queued_tasks))
+        self.log.info('Found %s queued task instances', len(queued_tasks))
+
+        # Go through the "last seen" dictionary and clean out old entries
+        allowed_age = self.kube_config.worker_pods_queued_check_interval * 3
+        for key, timestamp in list(self.last_handled.items()):
+            if time.time() - timestamp > allowed_age:
+                del self.last_handled[key]
 
         for task in queued_tasks:
 
             self.log.debug("Checking task %s", task)
+
+            # Check to see if we've handled it ourselves recently
+            if task.key in self.last_handled:
+                continue
+
+            # Build the pod selector
             dict_string = "dag_id={},task_id={},airflow-worker={}".format(
                 pod_generator.make_safe_label_value(task.dag_id),
                 pod_generator.make_safe_label_value(task.task_id),
                 
pod_generator.make_safe_label_value(str(self.scheduler_job_id)),
             )
-
             kwargs = dict(label_selector=dict_string)
             if self.kube_config.kube_client_request_args:
                 kwargs.update(**self.kube_config.kube_client_request_args)
@@ -486,7 +494,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
                 TaskInstance.dag_id == task.dag_id,
                 TaskInstance.task_id == task.task_id,
                 TaskInstance.run_id == task.run_id,
-            ).update({TaskInstance.state: State.NONE})
+            ).update({TaskInstance.state: State.SCHEDULED})
 
     def start(self) -> None:
         """Starts the executor"""
@@ -504,6 +512,12 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             self.kube_config.worker_pods_pending_timeout_check_interval,
             self._check_worker_pods_pending_timeout,
         )
+        self.event_scheduler.call_regular_interval(
+            self.kube_config.worker_pods_queued_check_interval,
+            self.clear_not_launched_queued_tasks,
+        )
+        # We also call this at startup as that's the most likely time to see
+        # stuck queued tasks
         self.clear_not_launched_queued_tasks()
 
     def execute_async(
@@ -530,6 +544,9 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             raise AirflowException(NOT_STARTED_MESSAGE)
         self.event_buffer[key] = (State.QUEUED, self.scheduler_job_id)
         self.task_queue.put((key, command, kube_executor_config, 
pod_template_file))
+        # We keep a temporary local record that we've handled this so we don't
+        # try and remove it from the QUEUED state while we process it
+        self.last_handled[key] = time.time()
 
     def sync(self) -> None:
         """Synchronize task state."""
diff --git a/airflow/kubernetes/kube_config.py 
b/airflow/kubernetes/kube_config.py
index ef32831..c85d7df 100644
--- a/airflow/kubernetes/kube_config.py
+++ b/airflow/kubernetes/kube_config.py
@@ -66,6 +66,9 @@ class KubeConfig:
         self.worker_pods_pending_timeout_batch_size = conf.getint(
             self.kubernetes_section, 'worker_pods_pending_timeout_batch_size'
         )
+        self.worker_pods_queued_check_interval = conf.getint(
+            self.kubernetes_section, 'worker_pods_queued_check_interval'
+        )
 
         kube_client_request_args = conf.get(self.kubernetes_section, 
'kube_client_request_args')
         if kube_client_request_args:
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 2fa0b8a..e7911ce 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -608,7 +608,7 @@ class TestKubernetesExecutor(unittest.TestCase):
             executor = KubernetesExecutor()
             executor.job_id = "123"
             executor.start()
-            assert 1 == len(executor.event_scheduler.queue)
+            assert 2 == len(executor.event_scheduler.queue)
             executor._check_worker_pods_pending_timeout()
 
         mock_kube_client.list_namespaced_pod.assert_called_once_with(

Reply via email to