This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 640d4f9636 Fix  backfill  queued  task getting reset to scheduled 
state.  (#23720)
640d4f9636 is described below

commit 640d4f9636d3867d66af2478bca15272811329da
Author: sanjayp <[email protected]>
AuthorDate: Fri Nov 18 06:39:31 2022 +0530

    Fix  backfill  queued  task getting reset to scheduled state.  (#23720)
---
 airflow/executors/kubernetes_executor.py    |  5 +++-
 tests/executors/test_kubernetes_executor.py | 46 +++++++++++++++++++++++++----
 2 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 8ff942ba92..72de3e050e 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -467,7 +467,9 @@ class KubernetesExecutor(BaseExecutor):
         if not self.kube_client:
             raise AirflowException(NOT_STARTED_MESSAGE)
 
-        query = session.query(TaskInstance).filter(TaskInstance.state == 
State.QUEUED)
+        query = session.query(TaskInstance).filter(
+            TaskInstance.state == State.QUEUED, TaskInstance.queued_by_job_id 
== self.job_id
+        )
         if self.kubernetes_queue:
             query = query.filter(TaskInstance.queue == self.kubernetes_queue)
         queued_tis: list[TaskInstance] = query.all()
@@ -536,6 +538,7 @@ class KubernetesExecutor(BaseExecutor):
             self.kube_config.worker_pods_pending_timeout_check_interval,
             self._check_worker_pods_pending_timeout,
         )
+
         self.event_scheduler.call_regular_interval(
             self.kube_config.worker_pods_queued_check_interval,
             self.clear_not_launched_queued_tasks,
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 34f082a263..367f1cb2c4 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -160,7 +160,7 @@ class TestAirflowKubernetesScheduler:
         mock_kube_client.return_value.delete_namespaced_pod = 
mock_delete_namespace
 
         kube_executor = KubernetesExecutor()
-        kube_executor.job_id = "test-job-id"
+        kube_executor.job_id = 1
         kube_executor.start()
         kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
 
@@ -180,7 +180,7 @@ class TestAirflowKubernetesScheduler:
         # ApiException is raised because status is not 404
         mock_kube_client.return_value.delete_namespaced_pod.side_effect = 
ApiException(status=400)
         kube_executor = KubernetesExecutor()
-        kube_executor.job_id = "test-job-id"
+        kube_executor.job_id = 1
         kube_executor.start()
 
         with pytest.raises(ApiException):
@@ -201,7 +201,7 @@ class TestAirflowKubernetesScheduler:
         # ApiException not raised because the status is 404
         mock_kube_client.return_value.delete_namespaced_pod.side_effect = 
ApiException(status=404)
         kube_executor = KubernetesExecutor()
-        kube_executor.job_id = "test-job-id"
+        kube_executor.job_id = 1
         kube_executor.start()
 
         kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
@@ -729,7 +729,7 @@ class TestKubernetesExecutor:
         }
         with conf_vars(config):
             executor = KubernetesExecutor()
-            executor.job_id = "123"
+            executor.job_id = 123
             executor.start()
             assert 2 == len(executor.event_scheduler.queue)
             executor._check_worker_pods_pending_timeout()
@@ -771,7 +771,7 @@ class TestKubernetesExecutor:
         }
         with conf_vars(config):
             executor = KubernetesExecutor()
-            executor.job_id = "123"
+            executor.job_id = 123
             executor.start()
             executor._check_worker_pods_pending_timeout()
 
@@ -797,6 +797,7 @@ class TestKubernetesExecutor:
         session.flush()
 
         executor = self.kubernetes_executor
+        executor.job_id = 1
         executor.kube_client = mock_kube_client
         executor.clear_not_launched_queued_tasks(session=session)
 
@@ -840,6 +841,7 @@ class TestKubernetesExecutor:
         session.flush()
 
         executor = self.kubernetes_executor
+        executor.job_id = 1
         executor.kubernetes_queue = kubernetes_queue
         executor.kube_client = mock_kube_client
         executor.clear_not_launched_queued_tasks(session=session)
@@ -877,6 +879,7 @@ class TestKubernetesExecutor:
         session.flush()
 
         executor = self.kubernetes_executor
+        executor.job_id = 1
         executor.kube_client = mock_kube_client
         executor.clear_not_launched_queued_tasks(session=session)
 
@@ -922,7 +925,9 @@ class TestKubernetesExecutor:
         session.flush()
 
         executor = self.kubernetes_executor
+        executor.job_id = 1
         executor.kubernetes_queue = "kubernetes"
+
         executor.kube_client = mock_kube_client
         executor.clear_not_launched_queued_tasks(session=session)
 
@@ -930,6 +935,37 @@ class TestKubernetesExecutor:
         assert ti.state == State.QUEUED
         assert mock_kube_client.list_namespaced_pod.call_count == 0
 
+    def test_clear_not_launched_queued_tasks_clear_only_by_job_id(self, 
dag_maker, create_dummy_dag, session):
+        """clear only not launched queued  tasks which are queued by the same 
executor job"""
+        mock_kube_client = mock.MagicMock()
+        mock_kube_client.list_namespaced_pod.return_value = 
k8s.V1PodList(items=[])
+
+        create_dummy_dag(dag_id="test_clear_0", task_id="task0", 
with_dagrun_type=None)
+        dag_run = dag_maker.create_dagrun()
+
+        ti0 = dag_run.task_instances[0]
+        ti0.state = State.QUEUED
+        ti0.queued_by_job_id = 1
+        session.flush()
+
+        create_dummy_dag(dag_id="test_clear_1", task_id="task1", 
with_dagrun_type=None)
+        dag_run = dag_maker.create_dagrun()
+
+        ti1 = dag_run.task_instances[0]
+        ti1.state = State.QUEUED
+        ti1.queued_by_job_id = 2
+        session.flush()
+
+        executor = self.kubernetes_executor
+        executor.job_id = 1
+        executor.kube_client = mock_kube_client
+        executor.clear_not_launched_queued_tasks(session=session)
+
+        ti0.refresh_from_db()
+        ti1.refresh_from_db()
+        assert ti0.state == State.SCHEDULED
+        assert ti1.state == State.QUEUED
+
 
 class TestKubernetesJobWatcher:
     def setup_method(self):

Reply via email to