This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5efcb992e0 Rename most pod_id usage to pod_name in KubernetesExecutor
(#29147)
5efcb992e0 is described below
commit 5efcb992e0692c69ba48c3b2f4144c6670162927
Author: Jed Cunningham <[email protected]>
AuthorDate: Sat Apr 22 15:28:29 2023 -0500
Rename most pod_id usage to pod_name in KubernetesExecutor (#29147)
We were using pod_id in a lot of place, where really it is just the pod
name. I've renamed it, where it is easy to do so, so things are easier
to follow.
---
airflow/executors/kubernetes_executor.py | 90 +++++++++++++++--------------
tests/executors/test_kubernetes_executor.py | 56 +++++++++---------
2 files changed, 76 insertions(+), 70 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 8e4ac0dac0..f8e816e9f5 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -58,10 +58,10 @@ if TYPE_CHECKING:
# TaskInstance key, command, configuration, pod_template_file
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
- # key, pod state, pod_id, namespace, resource_version
+ # key, pod state, pod_name, namespace, resource_version
KubernetesResultsType = Tuple[TaskInstanceKey, Optional[str], str, str,
str]
- # pod_id, namespace, pod state, annotations, resource_version
+ # pod_name, namespace, pod state, annotations, resource_version
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
ALL_NAMESPACES = "ALL_NAMESPACES"
@@ -180,7 +180,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
task_instance_related_annotations["map_index"] = map_index
self.process_status(
- pod_id=task.metadata.name,
+ pod_name=task.metadata.name,
namespace=task.metadata.namespace,
status=task.status.phase,
annotations=task_instance_related_annotations,
@@ -208,7 +208,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
def process_status(
self,
- pod_id: str,
+ pod_name: str,
namespace: str,
status: str,
annotations: dict[str, str],
@@ -218,28 +218,28 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
"""Process status response."""
if status == "Pending":
if event["type"] == "DELETED":
- self.log.info("Event: Failed to start pod %s", pod_id)
- self.watcher_queue.put((pod_id, namespace, State.FAILED,
annotations, resource_version))
+ self.log.info("Event: Failed to start pod %s", pod_name)
+ self.watcher_queue.put((pod_name, namespace, State.FAILED,
annotations, resource_version))
else:
- self.log.debug("Event: %s Pending", pod_id)
+ self.log.debug("Event: %s Pending", pod_name)
elif status == "Failed":
- self.log.error("Event: %s Failed", pod_id)
- self.watcher_queue.put((pod_id, namespace, State.FAILED,
annotations, resource_version))
+ self.log.error("Event: %s Failed", pod_name)
+ self.watcher_queue.put((pod_name, namespace, State.FAILED,
annotations, resource_version))
elif status == "Succeeded":
- self.log.info("Event: %s Succeeded", pod_id)
- self.watcher_queue.put((pod_id, namespace, State.SUCCESS,
annotations, resource_version))
+ self.log.info("Event: %s Succeeded", pod_name)
+ self.watcher_queue.put((pod_name, namespace, State.SUCCESS,
annotations, resource_version))
elif status == "Running":
if event["type"] == "DELETED":
- self.log.info("Event: Pod %s deleted before it could
complete", pod_id)
- self.watcher_queue.put((pod_id, namespace, State.FAILED,
annotations, resource_version))
+ self.log.info("Event: Pod %s deleted before it could
complete", pod_name)
+ self.watcher_queue.put((pod_name, namespace, State.FAILED,
annotations, resource_version))
else:
- self.log.info("Event: %s is Running", pod_id)
+ self.log.info("Event: %s is Running", pod_name)
else:
self.log.warning(
"Event: Invalid state: %s on pod: %s in namespace %s with
annotations: %s with "
"resource_version: %s",
status,
- pod_id,
+ pod_name,
namespace,
annotations,
resource_version,
@@ -368,12 +368,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
self.log.debug("Kubernetes Job created!")
- def delete_pod(self, pod_id: str, namespace: str) -> None:
- """Deletes POD."""
+ def delete_pod(self, pod_name: str, namespace: str) -> None:
+ """Deletes Pod from a namespace. Does not raise if it does not
exist."""
try:
- self.log.debug("Deleting pod %s in namespace %s", pod_id,
namespace)
+ self.log.debug("Deleting pod %s in namespace %s", pod_name,
namespace)
self.kube_client.delete_namespaced_pod(
- pod_id,
+ pod_name,
namespace,
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
**self.kube_config.kube_client_request_args,
@@ -419,14 +419,14 @@ class AirflowKubernetesScheduler(LoggingMixin):
def process_watcher_task(self, task: KubernetesWatchType) -> None:
"""Process the task by watcher."""
- pod_id, namespace, state, annotations, resource_version = task
+ pod_name, namespace, state, annotations, resource_version = task
self.log.debug(
- "Attempting to finish pod; pod_id: %s; state: %s; annotations:
%s", pod_id, state, annotations
+ "Attempting to finish pod; pod_name: %s; state: %s; annotations:
%s", pod_name, state, annotations
)
key = annotations_to_key(annotations=annotations)
if key:
- self.log.debug("finishing job %s - %s (%s)", key, state, pod_id)
- self.result_queue.put((key, state, pod_id, namespace,
resource_version))
+ self.log.debug("finishing job %s - %s (%s)", key, state, pod_name)
+ self.result_queue.put((key, state, pod_name, namespace,
resource_version))
def _flush_watcher_queue(self) -> None:
self.log.debug("Executor shutting down, watcher_queue approx.
size=%d", self.watcher_queue.qsize())
@@ -658,11 +658,11 @@ class KubernetesExecutor(BaseExecutor):
try:
results = self.result_queue.get_nowait()
try:
- key, state, pod_id, namespace, resource_version = results
+ key, state, pod_name, namespace, resource_version = results
last_resource_version[namespace] = resource_version
self.log.info("Changing state of %s to %s", results, state)
try:
- self._change_state(key, state, pod_id, namespace)
+ self._change_state(key, state, pod_name, namespace)
except Exception as e:
self.log.exception(
"Exception: %s when attempting to change state of
%s to %s, re-queueing.",
@@ -725,7 +725,7 @@ class KubernetesExecutor(BaseExecutor):
next_event = self.event_scheduler.run(blocking=False)
self.log.debug("Next timed event is in %f", next_event)
- def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id:
str, namespace: str) -> None:
+ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_name:
str, namespace: str) -> None:
if TYPE_CHECKING:
assert self.kube_scheduler
@@ -735,10 +735,10 @@ class KubernetesExecutor(BaseExecutor):
if self.kube_config.delete_worker_pods:
if state != State.FAILED or
self.kube_config.delete_worker_pods_on_failure:
- self.kube_scheduler.delete_pod(pod_id, namespace)
+ self.kube_scheduler.delete_pod(pod_name=pod_name,
namespace=namespace)
self.log.info("Deleted pod: %s in namespace %s", str(key),
str(namespace))
else:
- self.kube_scheduler.patch_pod_executor_done(pod_name=pod_id,
namespace=namespace)
+ self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name,
namespace=namespace)
self.log.info("Patched pod %s in namespace %s to mark it as done",
str(key), str(namespace))
try:
@@ -801,9 +801,10 @@ class KubernetesExecutor(BaseExecutor):
return messages, ["\n".join(log)]
def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) ->
Sequence[TaskInstance]:
+ # Always flush TIs without queued_by_job_id
tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
- pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+ tis_to_flush_by_key = {ti.key: ti for ti in tis if ti.queued_by_job_id}
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
scheduler_job_id =
pod_generator.make_safe_label_value(str(scheduler_job_id))
@@ -821,9 +822,9 @@ class KubernetesExecutor(BaseExecutor):
}
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
- self.adopt_launched_task(kube_client, pod, pod_ids)
+ self.adopt_launched_task(kube_client, pod, tis_to_flush_by_key)
self._adopt_completed_pods(kube_client)
- tis_to_flush.extend(pod_ids.values())
+ tis_to_flush.extend(tis_to_flush_by_key.values())
return tis_to_flush
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
@@ -861,26 +862,29 @@ class KubernetesExecutor(BaseExecutor):
self.log.warning("Found multiple pods for ti %s: %s", ti,
pod_list)
continue
readable_tis.append(repr(ti))
- self.kube_scheduler.delete_pod(pod_id=pod_list[0].metadata.name,
namespace=namespace)
+ self.kube_scheduler.delete_pod(pod_name=pod_list[0].metadata.name,
namespace=namespace)
return readable_tis
def adopt_launched_task(
- self, kube_client: client.CoreV1Api, pod: k8s.V1Pod, pod_ids:
dict[TaskInstanceKey, k8s.V1Pod]
+ self,
+ kube_client: client.CoreV1Api,
+ pod: k8s.V1Pod,
+ tis_to_flush_by_key: dict[TaskInstanceKey, k8s.V1Pod],
) -> None:
"""
Patch existing pod so that the current KubernetesJobWatcher can
monitor it via label selectors.
:param kube_client: kubernetes client for speaking to kube API
:param pod: V1Pod spec that we will patch with new label
- :param pod_ids: pod_ids we expect to patch.
+ :param tis_to_flush_by_key: TIs that will be flushed if they aren't
adopted
"""
if TYPE_CHECKING:
assert self.scheduler_job_id
self.log.info("attempting to adopt pod %s", pod.metadata.name)
- pod_id = annotations_to_key(pod.metadata.annotations)
- if pod_id not in pod_ids:
- self.log.error("attempting to adopt taskinstance which was not
specified by database: %s", pod_id)
+ ti_key = annotations_to_key(pod.metadata.annotations)
+ if ti_key not in tis_to_flush_by_key:
+ self.log.error("attempting to adopt taskinstance which was not
specified by database: %s", ti_key)
return
new_worker_id_label =
pod_generator.make_safe_label_value(self.scheduler_job_id)
@@ -894,8 +898,8 @@ class KubernetesExecutor(BaseExecutor):
self.log.info("Failed to adopt pod %s. Reason: %s",
pod.metadata.name, e)
return
- del pod_ids[pod_id]
- self.running.add(pod_id)
+ del tis_to_flush_by_key[ti_key]
+ self.running.add(ti_key)
def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
"""
@@ -925,8 +929,8 @@ class KubernetesExecutor(BaseExecutor):
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s",
pod.metadata.name, e)
- pod_id = annotations_to_key(pod.metadata.annotations)
- self.running.add(pod_id)
+ ti_id = annotations_to_key(pod.metadata.annotations)
+ self.running.add(ti_id)
def _flush_task_queue(self) -> None:
if TYPE_CHECKING:
@@ -952,12 +956,12 @@ class KubernetesExecutor(BaseExecutor):
results = self.result_queue.get_nowait()
self.log.warning("Executor shutting down, flushing
results=%s", results)
try:
- key, state, pod_id, namespace, resource_version = results
+ key, state, pod_name, namespace, resource_version = results
self.log.info(
"Changing state of %s to %s : resource_version=%d",
results, state, resource_version
)
try:
- self._change_state(key, state, pod_id, namespace)
+ self._change_state(key, state, pod_name, namespace)
except Exception as e:
self.log.exception(
"Ignoring exception: %s when attempting to change
state of %s to %s.",
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index c2b7d24c55..94e092b447 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -159,7 +159,7 @@ class TestAirflowKubernetesScheduler:
@mock.patch("airflow.executors.kubernetes_executor.client")
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
def test_delete_pod_successfully(self, mock_watcher, mock_client,
mock_kube_client):
- pod_id = "my-pod-1"
+ pod_name = "my-pod-1"
namespace = "my-namespace-1"
mock_delete_namespace = mock.MagicMock()
@@ -169,8 +169,8 @@ class TestAirflowKubernetesScheduler:
kube_executor.job_id = 1
kube_executor.start()
try:
- kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
- mock_delete_namespace.assert_called_with(pod_id, namespace,
body=mock_client.V1DeleteOptions())
+ kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+ mock_delete_namespace.assert_called_with(pod_name, namespace,
body=mock_client.V1DeleteOptions())
finally:
kube_executor.end()
@@ -181,7 +181,7 @@ class TestAirflowKubernetesScheduler:
@mock.patch("airflow.executors.kubernetes_executor.client")
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
def test_delete_pod_raises_404(self, mock_watcher, mock_client,
mock_kube_client):
- pod_id = "my-pod-1"
+ pod_name = "my-pod-1"
namespace = "my-namespace-2"
mock_delete_namespace = mock.MagicMock()
@@ -194,8 +194,8 @@ class TestAirflowKubernetesScheduler:
kube_executor.start()
with pytest.raises(ApiException):
- kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
- mock_delete_namespace.assert_called_with(pod_id, namespace,
body=mock_client.V1DeleteOptions())
+ kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+ mock_delete_namespace.assert_called_with(pod_name, namespace,
body=mock_client.V1DeleteOptions())
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package
is not installed"
@@ -204,7 +204,7 @@ class TestAirflowKubernetesScheduler:
@mock.patch("airflow.executors.kubernetes_executor.client")
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
def test_delete_pod_404_not_raised(self, mock_watcher, mock_client,
mock_kube_client):
- pod_id = "my-pod-1"
+ pod_name = "my-pod-1"
namespace = "my-namespace-3"
mock_delete_namespace = mock.MagicMock()
@@ -216,8 +216,8 @@ class TestAirflowKubernetesScheduler:
kube_executor.job_id = 1
kube_executor.start()
try:
- kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
- mock_delete_namespace.assert_called_with(pod_id, namespace,
body=mock_client.V1DeleteOptions())
+ kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+ mock_delete_namespace.assert_called_with(pod_name, namespace,
body=mock_client.V1DeleteOptions())
finally:
kube_executor.end()
@@ -531,7 +531,7 @@ class TestKubernetesExecutor:
try:
key = ("dag_id", "task_id", "run_id", "try_number1")
executor.running = {key}
- executor._change_state(key, State.RUNNING, "pod_id", "default")
+ executor._change_state(key, State.RUNNING, "pod_name", "default")
assert executor.event_buffer[key][0] == State.RUNNING
assert executor.running == {key}
finally:
@@ -546,10 +546,10 @@ class TestKubernetesExecutor:
try:
key = ("dag_id", "task_id", "run_id", "try_number2")
executor.running = {key}
- executor._change_state(key, State.SUCCESS, "pod_id", "default")
+ executor._change_state(key, State.SUCCESS, "pod_name", "default")
assert executor.event_buffer[key][0] == State.SUCCESS
assert executor.running == set()
- mock_delete_pod.assert_called_once_with("pod_id", "default")
+ mock_delete_pod.assert_called_once_with(pod_name="pod_name",
namespace="default")
finally:
executor.end()
@@ -615,11 +615,11 @@ class TestKubernetesExecutor:
try:
key = ("dag_id", "task_id", "run_id", "try_number2")
executor.running = {key}
- executor._change_state(key, State.SUCCESS, "pod_id",
"test-namespace")
+ executor._change_state(key, State.SUCCESS, "pod_name",
"test-namespace")
assert executor.event_buffer[key][0] == State.SUCCESS
assert executor.running == set()
mock_delete_pod.assert_not_called()
- mock_patch_pod.assert_called_once_with(pod_name="pod_id",
namespace="test-namespace")
+ mock_patch_pod.assert_called_once_with(pod_name="pod_name",
namespace="test-namespace")
finally:
executor.end()
@@ -638,10 +638,10 @@ class TestKubernetesExecutor:
try:
key = ("dag_id", "task_id", "run_id", "try_number2")
executor.running = {key}
- executor._change_state(key, State.FAILED, "pod_id",
"test-namespace")
+ executor._change_state(key, State.FAILED, "pod_name",
"test-namespace")
assert executor.event_buffer[key][0] == State.FAILED
assert executor.running == set()
- mock_delete_pod.assert_called_once_with("pod_id", "test-namespace")
+ mock_delete_pod.assert_called_once_with(pod_name="pod_name",
namespace="test-namespace")
mock_patch_pod.assert_not_called()
finally:
executor.end()
@@ -683,8 +683,10 @@ class TestKubernetesExecutor:
mock_ti.queued_by_job_id = "10" # scheduler_job would have updated
this after the first adoption
executor.scheduler_job_id = "20"
- # assume success adopting when checking return, `adopt_launched_task`
pops `ti_key` from `pod_ids`
- mock_adopt_launched_task.side_effect = lambda client, pod, pod_ids:
pod_ids.pop(ti_key)
+ # assume success adopting, `adopt_launched_task` pops `ti_key` from
`tis_to_flush_by_key`
+ mock_adopt_launched_task.side_effect = (
+ lambda client, pod, tis_to_flush_by_key:
tis_to_flush_by_key.pop(ti_key)
+ )
reset_tis = executor.try_adopt_task_instances([mock_ti])
mock_kube_client.list_namespaced_pod.assert_called_once_with(
@@ -757,15 +759,15 @@ class TestKubernetesExecutor:
pod = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(name="foo", labels={"airflow-worker":
"bar"}, annotations=annotations)
)
- pod_ids = {ti_key: {}}
+ tis_to_flush_by_key = {ti_key: {}}
- executor.adopt_launched_task(mock_kube_client, pod=pod,
pod_ids=pod_ids)
+ executor.adopt_launched_task(mock_kube_client, pod=pod,
tis_to_flush_by_key=tis_to_flush_by_key)
mock_kube_client.patch_namespaced_pod.assert_called_once_with(
body={"metadata": {"labels": {"airflow-worker": "modified"}}},
name="foo",
namespace=None,
)
- assert pod_ids == {}
+ assert tis_to_flush_by_key == {}
assert executor.running == {ti_key}
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -781,16 +783,16 @@ class TestKubernetesExecutor:
}
ti_key = annotations_to_key(annotations)
pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo",
annotations=annotations))
- pod_ids = {ti_key: {}}
+ tis_to_flush_by_key = {ti_key: {}}
mock_kube_client.patch_namespaced_pod.side_effect =
ApiException(status=400)
- executor.adopt_launched_task(mock_kube_client, pod=pod,
pod_ids=pod_ids)
+ executor.adopt_launched_task(mock_kube_client, pod=pod,
tis_to_flush_by_key=tis_to_flush_by_key)
mock_kube_client.patch_namespaced_pod.assert_called_once_with(
body={"metadata": {"labels": {"airflow-worker": "modified"}}},
name="foo",
namespace=None,
)
- assert pod_ids == {ti_key: {}}
+ assert tis_to_flush_by_key == {ti_key: {}}
assert executor.running == set()
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -852,7 +854,7 @@ class TestKubernetesExecutor:
executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
- pod_ids = {"foobar": {}}
+ tis_to_flush_by_key = {"foobar": {}}
pod = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name="foo",
@@ -865,9 +867,9 @@ class TestKubernetesExecutor:
},
)
)
- executor.adopt_launched_task(mock_kube_client, pod=pod,
pod_ids=pod_ids)
+ executor.adopt_launched_task(mock_kube_client, pod=pod,
tis_to_flush_by_key=tis_to_flush_by_key)
assert not mock_kube_client.patch_namespaced_pod.called
- assert pod_ids == {"foobar": {}}
+ assert tis_to_flush_by_key == {"foobar": {}}
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")