This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f64ac5978f Be more selective when adopting pods with
KubernetesExecutor (#28899)
f64ac5978f is described below
commit f64ac5978fb3dfa9e40a0e5190ef88e9f9615824
Author: Jed Cunningham <[email protected]>
AuthorDate: Wed Jan 18 14:05:50 2023 -0600
Be more selective when adopting pods with KubernetesExecutor (#28899)
* Be more selective when adopting pods with KubernetesExecutor
When trying to adopt "resettable" TIs from SchedulerJob, we should not
list out all the pods to compare against, only those that didn't
succeed. This means we will get any pods that are still starting,
running, or failed (meaning the TI wasn't moved to a terminal state
there, and will be in out "adoptable" list).
This avoids the scenario where a dead scheduler has both a completed,
successful worker, and a still running worker, causing log lines
like these about the successful one:
ERROR - attempting to adopt taskinstance which was not specified by
database: TaskInstanceKey(...)
This also makes sure we only find pods with the
`kubernetes_executor=True` label for extra safety.
Closes #28071
* Also ignore done pods
---
airflow/executors/kubernetes_executor.py | 26 +++++++++++++++++++-------
tests/executors/test_kubernetes_executor.py | 20 ++++++++++++++++----
2 files changed, 35 insertions(+), 11 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 4bd0de7e29..040ca21856 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -496,13 +496,14 @@ class KubernetesExecutor(BaseExecutor):
"""
Clear tasks that were not yet launched, but were previously queued.
- Tasks can end up in a "Queued" state through either the executor being
- abruptly shut down (leaving a non-empty task_queue on this executor)
- or when a rescheduled/deferred operator comes back up for execution
- (with the same try_number) before the pod of its previous incarnation
- has been fully removed (we think).
+ Tasks can end up in a "Queued" state through when a
rescheduled/deferred
+ operator comes back up for execution (with the same try_number) before
the
+ pod of its previous incarnation has been fully removed (we think).
- This method checks each of those tasks to see if the corresponding pod
+ It's also possible when an executor abruptly shuts down (leaving a
non-empty
+ task_queue on that executor), but that scenario is handled via normal
adoption.
+
+ This method checks each of our queued tasks to see if the
corresponding pod
is around, and if not, and there's no matching entry in our own
task_queue, marks it for re-execution.
"""
@@ -779,7 +780,18 @@ class KubernetesExecutor(BaseExecutor):
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
scheduler_job_id =
pod_generator.make_safe_label_value(str(scheduler_job_id))
- query_kwargs = {"label_selector":
f"airflow-worker={scheduler_job_id}"}
+ # We will look for any pods owned by the no-longer-running
scheduler,
+ # but will exclude only successful pods, as those TIs will have a
terminal state
+ # and not be up for adoption!
+ # Those workers that failed, however, are okay to adopt here as
their TI will
+ # still be in queued.
+ query_kwargs = {
+ "field_selector": "status.phase!=Succeeded",
+ "label_selector": (
+ "kubernetes_executor=True,"
+
f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"
+ ),
+ }
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
self.adopt_launched_task(kube_client, pod, pod_ids)
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index 72052622a7..0ca2cd3a96 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -654,7 +654,9 @@ class TestKubernetesExecutor:
# First adoption
reset_tis = executor.try_adopt_task_instances([mock_ti])
mock_kube_client.list_namespaced_pod.assert_called_once_with(
- namespace="default", label_selector="airflow-worker=1"
+ namespace="default",
+ field_selector="status.phase!=Succeeded",
+
label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True",
)
mock_adopt_launched_task.assert_called_once_with(mock_kube_client,
pod, {ti_key: mock_ti})
mock_adopt_completed_pods.assert_called_once()
@@ -672,7 +674,9 @@ class TestKubernetesExecutor:
reset_tis = executor.try_adopt_task_instances([mock_ti])
mock_kube_client.list_namespaced_pod.assert_called_once_with(
- namespace="default", label_selector="airflow-worker=10"
+ namespace="default",
+ field_selector="status.phase!=Succeeded",
+
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
)
mock_adopt_launched_task.assert_called_once() # Won't check args this
time around as they get mutated
mock_adopt_completed_pods.assert_called_once()
@@ -695,8 +699,16 @@ class TestKubernetesExecutor:
assert mock_kube_client.list_namespaced_pod.call_count == 2
mock_kube_client.list_namespaced_pod.assert_has_calls(
[
- mock.call(namespace="default",
label_selector="airflow-worker=10"),
- mock.call(namespace="default",
label_selector="airflow-worker=40"),
+ mock.call(
+ namespace="default",
+ field_selector="status.phase!=Succeeded",
+
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
+ ),
+ mock.call(
+ namespace="default",
+ field_selector="status.phase!=Succeeded",
+
label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True",
+ ),
],
any_order=True,
)