This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 640d4f9636 Fix backfill queued task getting reset to scheduled
state. (#23720)
640d4f9636 is described below
commit 640d4f9636d3867d66af2478bca15272811329da
Author: sanjayp <[email protected]>
AuthorDate: Fri Nov 18 06:39:31 2022 +0530
Fix backfill queued task getting reset to scheduled state. (#23720)
---
airflow/executors/kubernetes_executor.py | 5 +++-
tests/executors/test_kubernetes_executor.py | 46 +++++++++++++++++++++++++----
2 files changed, 45 insertions(+), 6 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 8ff942ba92..72de3e050e 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -467,7 +467,9 @@ class KubernetesExecutor(BaseExecutor):
if not self.kube_client:
raise AirflowException(NOT_STARTED_MESSAGE)
- query = session.query(TaskInstance).filter(TaskInstance.state ==
State.QUEUED)
+ query = session.query(TaskInstance).filter(
+ TaskInstance.state == State.QUEUED, TaskInstance.queued_by_job_id
== self.job_id
+ )
if self.kubernetes_queue:
query = query.filter(TaskInstance.queue == self.kubernetes_queue)
queued_tis: list[TaskInstance] = query.all()
@@ -536,6 +538,7 @@ class KubernetesExecutor(BaseExecutor):
self.kube_config.worker_pods_pending_timeout_check_interval,
self._check_worker_pods_pending_timeout,
)
+
self.event_scheduler.call_regular_interval(
self.kube_config.worker_pods_queued_check_interval,
self.clear_not_launched_queued_tasks,
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index 34f082a263..367f1cb2c4 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -160,7 +160,7 @@ class TestAirflowKubernetesScheduler:
mock_kube_client.return_value.delete_namespaced_pod =
mock_delete_namespace
kube_executor = KubernetesExecutor()
- kube_executor.job_id = "test-job-id"
+ kube_executor.job_id = 1
kube_executor.start()
kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
@@ -180,7 +180,7 @@ class TestAirflowKubernetesScheduler:
# ApiException is raised because status is not 404
mock_kube_client.return_value.delete_namespaced_pod.side_effect =
ApiException(status=400)
kube_executor = KubernetesExecutor()
- kube_executor.job_id = "test-job-id"
+ kube_executor.job_id = 1
kube_executor.start()
with pytest.raises(ApiException):
@@ -201,7 +201,7 @@ class TestAirflowKubernetesScheduler:
# ApiException not raised because the status is 404
mock_kube_client.return_value.delete_namespaced_pod.side_effect =
ApiException(status=404)
kube_executor = KubernetesExecutor()
- kube_executor.job_id = "test-job-id"
+ kube_executor.job_id = 1
kube_executor.start()
kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
@@ -729,7 +729,7 @@ class TestKubernetesExecutor:
}
with conf_vars(config):
executor = KubernetesExecutor()
- executor.job_id = "123"
+ executor.job_id = 123
executor.start()
assert 2 == len(executor.event_scheduler.queue)
executor._check_worker_pods_pending_timeout()
@@ -771,7 +771,7 @@ class TestKubernetesExecutor:
}
with conf_vars(config):
executor = KubernetesExecutor()
- executor.job_id = "123"
+ executor.job_id = 123
executor.start()
executor._check_worker_pods_pending_timeout()
@@ -797,6 +797,7 @@ class TestKubernetesExecutor:
session.flush()
executor = self.kubernetes_executor
+ executor.job_id = 1
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)
@@ -840,6 +841,7 @@ class TestKubernetesExecutor:
session.flush()
executor = self.kubernetes_executor
+ executor.job_id = 1
executor.kubernetes_queue = kubernetes_queue
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)
@@ -877,6 +879,7 @@ class TestKubernetesExecutor:
session.flush()
executor = self.kubernetes_executor
+ executor.job_id = 1
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)
@@ -922,7 +925,9 @@ class TestKubernetesExecutor:
session.flush()
executor = self.kubernetes_executor
+ executor.job_id = 1
executor.kubernetes_queue = "kubernetes"
+
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)
@@ -930,6 +935,37 @@ class TestKubernetesExecutor:
assert ti.state == State.QUEUED
assert mock_kube_client.list_namespaced_pod.call_count == 0
+ def test_clear_not_launched_queued_tasks_clear_only_by_job_id(self,
dag_maker, create_dummy_dag, session):
+ """clear only not launched queued tasks which are queued by the same
executor job"""
+ mock_kube_client = mock.MagicMock()
+ mock_kube_client.list_namespaced_pod.return_value =
k8s.V1PodList(items=[])
+
+ create_dummy_dag(dag_id="test_clear_0", task_id="task0",
with_dagrun_type=None)
+ dag_run = dag_maker.create_dagrun()
+
+ ti0 = dag_run.task_instances[0]
+ ti0.state = State.QUEUED
+ ti0.queued_by_job_id = 1
+ session.flush()
+
+ create_dummy_dag(dag_id="test_clear_1", task_id="task1",
with_dagrun_type=None)
+ dag_run = dag_maker.create_dagrun()
+
+ ti1 = dag_run.task_instances[0]
+ ti1.state = State.QUEUED
+ ti1.queued_by_job_id = 2
+ session.flush()
+
+ executor = self.kubernetes_executor
+ executor.job_id = 1
+ executor.kube_client = mock_kube_client
+ executor.clear_not_launched_queued_tasks(session=session)
+
+ ti0.refresh_from_db()
+ ti1.refresh_from_db()
+ assert ti0.state == State.SCHEDULED
+ assert ti1.state == State.QUEUED
+
class TestKubernetesJobWatcher:
def setup_method(self):