This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 69e4445b438 Fix monitoring-pod leak in KubernetesJobOperator (#67333)
69e4445b438 is described below
commit 69e4445b438c8b95acc0daf75a5d4187ee19b098
Author: Ville Jyrkkä <[email protected]>
AuthorDate: Fri May 29 14:43:02 2026 +0300
Fix monitoring-pod leak in KubernetesJobOperator (#67333)
* Fixing pod leak in KubernetesJobOperator (#1)
* fix(providers/cncf/kubernetes): clean up monitoring pods in
KubernetesJobOperator
KubernetesJobOperator inherited from KubernetesPodOperator but overrode
execute() without calling post_complete_action(), so the monitoring /
log-streaming pods discovered via get_pods() were never deleted. These
pods have no ownerReferences to the V1Job, so ttl_seconds_after_finished
and the Foreground cascade in on_kill don't reap them either.
- execute() and execute_complete() now wrap their work in try/finally and
call post_complete_action() for each pod in self.pods. on_finish_action
(delete_pod / delete_succeeded_pod / keep_pod) is now honoured.
- on_kill() additionally calls pod_manager.delete_pod() for each
monitoring pod (the Job's foreground cascade doesn't reach them).
- Per-pod cleanup errors are logged but never mask the in-flight
exception, so Job-level failures keep propagating.
- execute_complete() resolves monitoring pods once and shares the lookup
between the log-retrieval path and the cleanup path.
- Added unit tests, a bugfix newsfragment, and an operators.rst section
documenting the cleanup contract.
* Address code review feedback: remove dead PodNotFoundException check,
drop unused import, relax pod-deletion ordering in test, fix trailing comma
* Potential fix for pull request finding
In _cleanup_monitoring_pods, remote_pod is resolved via find_pod(), which
is designed to locate a single matching pod by task-instance labels and can
invoke duplicate-pod resolution logic (process_duplicate_label_pods). For
KubernetesJobOperator with parallelism > 1, this lookup can return the wrong
pod (or trigger duplicate-handling side effects), so post_complete_action() may
receive a mismatched remote_pod. Consider using the already-discovered pod’s
name/namespace to refresh state [...]
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
* Use isinstance(exc, TaskDeferred) instead of brittle string comparison
* Potential fix for pull request finding
The new unit tests add several mock.MagicMock() instances (pods, jobs, TI,
etc.) without spec/autospec, and some patch() usages also create non-spec'd
mocks by default. Using autospec=True on patches and
create_autospec(...)/MagicMock(spec=...) for key Kubernetes objects helps catch
typos/attribute mismatches in these tests and aligns with Airflow’s test
hardening guidance.
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
* Address PR review comments: fix trigger pod_names, on_kill logging, and
test assertions
- triggers/job.py: Always include pod_names/pod_namespace in trigger event
regardless of get_logs setting, so execute_complete() can reliably clean
up monitoring pods even when get_logs=False
- operators/job.py: Log unexpected ApiException in on_kill() instead of
suppressing all ApiExceptions; remove unused `suppress` import
- tests/test_job.py: Rewrite test_execute_respects_keep_pod and
test_execute_deletes_pod_default to keep process_pod_deletion real and
assert on pod_manager.delete_pod; stub hook.get_pod for remote_pod
resolution
- tests/test_job.py: Add regression test for get_logs=False deferrable path
* Fix orphaned test_on_kill_deletes_monitoring_pods method body after
accidental deletion of method signature
* Make pod resolution best-effort in execute_complete
* Address remaining KubernetesJobOperator review comments
* Finalize review-comment fixes for KubernetesJobOperator
* Fix remaining KubernetesJobOperator review comments
* Update KubernetesJobOperator docs for action semantics
* Improve KubernetesJobOperator newsfragment readability
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: Ville Jyrkkä <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
* Apply ruff format
* Move provider changelog entry; drop newsfragment
* Apply prek docstring auto-fix (D212 multi-line summary)
* Drop changelog entry — added by release manager at release time
* Cover remaining KubernetesJobOperator monitoring-pod cleanup paths
* Consolidate PR tests: drop call-counting, dedupe, observe state
* Apply test review fixes: hoist imports, add state assertions, dedupe with
_pod helper
* Fix Copilot review: spec all MagicMock usages in KubernetesJobOperator
tests
- _recording_pod_manager(): use create_autospec(PodManager) instead of
MagicMock
- test error masking: use create_autospec(PodManager) for boom-raising pm
- test_execute_does_not_cleanup_when_deferring: use
create_autospec(KubernetesJobTrigger) for trigger
- test_execute_complete_pod_api_error_does_not_raise: use plain {} for
event["job"] field
- test_run_success_emits_pod_info_when_get_logs_false (triggers): use
create_autospec(k8s.V1Job)
---------
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
providers/cncf/kubernetes/docs/operators.rst | 23 ++
.../providers/cncf/kubernetes/operators/job.py | 278 +++++++++++----
.../providers/cncf/kubernetes/triggers/job.py | 4 +-
.../unit/cncf/kubernetes/operators/test_job.py | 390 ++++++++++++++++++++-
.../unit/cncf/kubernetes/triggers/test_job.py | 34 ++
5 files changed, 643 insertions(+), 86 deletions(-)
diff --git a/providers/cncf/kubernetes/docs/operators.rst
b/providers/cncf/kubernetes/docs/operators.rst
index 72521af704f..a3f8e417237 100644
--- a/providers/cncf/kubernetes/docs/operators.rst
+++ b/providers/cncf/kubernetes/docs/operators.rst
@@ -713,6 +713,29 @@ It means that user can use all parameters from
:class:`~airflow.providers.cncf.k
More information about the Jobs here: `Kubernetes Job Documentation
<https://kubernetes.io/docs/concepts/workloads/controllers/job/>`__
+Pod cleanup and ``on_finish_action``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When ``wait_until_job_complete=True``, the operator discovers Job pods via
+``get_pods()`` and streams logs/XCom from those pods while the Job runs.
+
+The inherited ``on_finish_action`` parameter controls what happens to these
+discovered pods at the end of the task:
+
+* ``delete_pod`` (default) — the pod is deleted after the task
+ finishes (success or failure).
+* ``delete_succeeded_pod`` — the pod is deleted only when the task
+ succeeded.
+* ``delete_active_pod`` — the pod is deleted only if it is still
+ active (``Pending`` or ``Running``).
+* ``keep_pod`` — the pod is kept (useful for offline log
+ inspection).
+
+When the task is killed, ``on_kill`` deletes the Job (with foreground cascade).
+For discovered pods, deletion is controlled by ``on_kill_action``:
+``delete_pod`` attempts direct pod deletion and ``keep_pod`` skips it.
+
+
.. _howto/operator:KubernetesDeleteJobOperator:
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 510346d34ad..0e8540d815c 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -22,6 +22,7 @@ import copy
import json
import logging
import os
+import sys
import warnings
from collections.abc import Sequence
from functools import cached_property
@@ -41,9 +42,9 @@ from
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator,
merge_objects
from airflow.providers.cncf.kubernetes.triggers.job import KubernetesJobTrigger
-from airflow.providers.cncf.kubernetes.utils.pod_manager import
EMPTY_XCOM_RESULT, PodNotFoundException
+from airflow.providers.cncf.kubernetes.utils.pod_manager import
EMPTY_XCOM_RESULT, OnKillAction
from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS
-from airflow.providers.common.compat.sdk import AirflowException, conf
+from airflow.providers.common.compat.sdk import AirflowException,
TaskDeferred, conf
from airflow.utils import yaml
if AIRFLOW_V_3_1_PLUS:
@@ -218,44 +219,49 @@ class KubernetesJobOperator(KubernetesPodOperator):
ti.xcom_push(key="job_name", value=self.job.metadata.name)
ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
- if self.wait_until_job_complete:
- self.pods: Sequence[k8s.V1Pod] = self.get_pods(
- pod_request_obj=self.pod_request_obj, context=context
- )
+ try:
+ if self.wait_until_job_complete:
+ self.pods: Sequence[k8s.V1Pod] = self.get_pods(
+ pod_request_obj=self.pod_request_obj, context=context
+ )
- if self.deferrable:
- self.execute_deferrable()
- return
+ if self.deferrable:
+ self.execute_deferrable()
+ # execute_deferrable raises TaskDeferred; cleanup is
handled
+ # by execute_complete on resume.
+ return
+
+ if self.do_xcom_push:
+ xcom_result = []
+ for pod in self.pods:
+ self.pod_manager.await_container_completion(
+ pod=pod, container_name=self.base_container_name
+ )
+
self.pod_manager.await_xcom_sidecar_container_start(pod=pod)
+ xcom_result.append(self.extract_xcom(pod=pod))
+ self.job = self.hook.wait_until_job_complete(
+ job_name=self.job.metadata.name,
+ namespace=self.job.metadata.namespace,
+ job_poll_interval=self.job_poll_interval,
+ )
+ if self.get_logs:
+ for pod in self.pods:
+ self.pod_manager.fetch_requested_container_logs(
+ pod=pod,
+ containers=self.container_logs,
+ follow_logs=True,
+ )
- if self.do_xcom_push:
- xcom_result = []
- for pod in self.pods:
- self.pod_manager.await_container_completion(
- pod=pod, container_name=self.base_container_name
- )
-
self.pod_manager.await_xcom_sidecar_container_start(pod=pod)
- xcom_result.append(self.extract_xcom(pod=pod))
- self.job = self.hook.wait_until_job_complete(
- job_name=self.job.metadata.name,
- namespace=self.job.metadata.namespace,
- job_poll_interval=self.job_poll_interval,
- )
- if self.get_logs:
- for pod in self.pods:
- self.pod_manager.fetch_requested_container_logs(
- pod=pod,
- containers=self.container_logs,
- follow_logs=True,
+ ti.xcom_push(key="job", value=self.job.to_dict())
+ if self.wait_until_job_complete:
+ if error_message := self.hook.is_job_failed(job=self.job):
+ raise AirflowException(
+ f"Kubernetes job '{self.job.metadata.name}' is failed
with error '{error_message}'"
)
-
- ti.xcom_push(key="job", value=self.job.to_dict())
- if self.wait_until_job_complete:
- if error_message := self.hook.is_job_failed(job=self.job):
- raise AirflowException(
- f"Kubernetes job '{self.job.metadata.name}' is failed with
error '{error_message}'"
- )
- if self.do_xcom_push:
- return xcom_result[0] if self.unwrap_single and
len(xcom_result) == 1 else xcom_result
+ if self.do_xcom_push:
+ return xcom_result[0] if self.unwrap_single and
len(xcom_result) == 1 else xcom_result
+ finally:
+ self._cleanup_monitoring_pods(context)
def execute_deferrable(self):
self.defer(
@@ -277,39 +283,81 @@ class KubernetesJobOperator(KubernetesPodOperator):
)
def execute_complete(self, context: Context, event: dict, **kwargs):
- ti = context["ti"]
- ti.xcom_push(key="job", value=event["job"])
- if event["status"] == "error":
- raise AirflowException(event["message"])
-
- if self.get_logs:
- for pod_name in event["pod_names"]:
- pod_namespace = event["pod_namespace"]
- try:
- pod = self.hook.get_pod(pod_name, pod_namespace)
- except ApiException as e:
- if e.status == 404:
- self.log.warning(
- "Pod %s in namespace %s not found (possibly
deleted). Skipping log retrieval.",
- pod_name,
- pod_namespace,
- )
+ # Resolve monitoring pods up front so the log-retrieval path and the
+ # cleanup path in the finally block share the same lookup (no double
+ # ``hook.get_pod`` calls).
+ pods_by_name: dict[str, k8s.V1Pod] = {}
+ event_job = event.get("job")
+ job_namespace = (
+ event_job.get("metadata", {}).get("namespace") if
isinstance(event_job, dict) else None
+ )
+ pod_namespace = event.get("pod_namespace") or event.get("namespace")
or job_namespace
+ unresolved_pods: list[tuple[str, str]] = []
+ for pod_name in event.get("pod_names") or []:
+ if not pod_namespace:
+ self.log.warning(
+ "Skipping pod %s lookup because no pod namespace was
provided in trigger event.",
+ pod_name,
+ )
+ continue
+ try:
+ pod = self.hook.get_pod(pod_name, pod_namespace)
+ except ApiException as e:
+ if e.status == 404:
+ self.log.warning(
+ "Pod %s in namespace %s not found (possibly deleted).",
+ pod_name,
+ pod_namespace,
+ )
+ else:
+ self.log.warning(
+ "Failed to retrieve pod %s in namespace %s: %s.
Skipping.",
+ pod_name,
+ pod_namespace,
+ e,
+ )
+ unresolved_pods.append((pod_name, pod_namespace))
+ continue
+ except Exception as e:
+ self.log.warning(
+ "Failed to retrieve pod %s in namespace %s: %s. Skipping.",
+ pod_name,
+ pod_namespace,
+ e,
+ )
+ unresolved_pods.append((pod_name, pod_namespace))
+ continue
+ if pod is not None:
+ pods_by_name[pod_name] = pod
+
+ try:
+ ti = context["ti"]
+ ti.xcom_push(key="job", value=event["job"])
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+
+ if self.get_logs:
+ for pod_name in event.get("pod_names") or []:
+ if pod_name not in pods_by_name:
+ # Pod was reported by the trigger but missing now
(e.g. 404)
+ self.log.warning("Skipping log retrieval for pod %s
(not found).", pod_name)
+ continue
+ self._write_logs(pods_by_name[pod_name])
+
+ if self.do_xcom_push:
+ xcom_results: list[Any | None] = []
+ for xcom_result in event["xcom_result"]:
+ if isinstance(xcom_result, str) and xcom_result.rstrip()
== EMPTY_XCOM_RESULT:
+ self.log.info("xcom result file is empty.")
+ xcom_results.append(None)
continue
- raise
- if not pod:
- raise PodNotFoundException("Could not find pod after
resuming from deferral")
- self._write_logs(pod)
-
- if self.do_xcom_push:
- xcom_results: list[Any | None] = []
- for xcom_result in event["xcom_result"]:
- if isinstance(xcom_result, str) and xcom_result.rstrip() ==
EMPTY_XCOM_RESULT:
- self.log.info("xcom result file is empty.")
- xcom_results.append(None)
- continue
- self.log.info("xcom result: \n%s", xcom_result)
- xcom_results.append(json.loads(xcom_result))
- return xcom_results[0] if self.unwrap_single and len(xcom_results)
== 1 else xcom_results
+ self.log.info("xcom result: \n%s", xcom_result)
+ xcom_results.append(json.loads(xcom_result))
+ return xcom_results[0] if self.unwrap_single and
len(xcom_results) == 1 else xcom_results
+ finally:
+ self._cleanup_monitoring_pods_from_dict(
+ context, pods_by_name, unresolved_pods=unresolved_pods,
event_status=event.get("status")
+ )
@staticmethod
def deserialize_job_template_file(path: str) -> k8s.V1Job:
@@ -334,6 +382,7 @@ class KubernetesJobOperator(KubernetesPodOperator):
return api_client._ApiClient__deserialize_model(job, k8s.V1Job)
def on_kill(self) -> None:
+ self._killed = True
if self.job:
job = self.job
kwargs = {
@@ -344,6 +393,97 @@ class KubernetesJobOperator(KubernetesPodOperator):
if self.termination_grace_period is not None:
kwargs.update(grace_period_seconds=self.termination_grace_period)
self.job_client.delete_namespaced_job(**kwargs)
+ if self.on_kill_action == OnKillAction.KEEP_POD:
+ self.log.info(
+ "Skipping monitoring pod deletion since on_kill_action is set
to %r.",
+ self.on_kill_action.value,
+ )
+ return
+ # Monitoring pods discovered via get_pods() have no ownerReferences and
+ # are not reaped by the Job's foreground cascade. Delete them directly.
+ for pod in getattr(self, "pods", None) or []:
+ try:
+ self.pod_manager.delete_pod(pod)
+ except ApiException:
+ self.log.exception(
+ "Unable to delete monitoring pod %s",
+ getattr(pod.metadata, "name", "<unknown>"),
+ )
+
+ def _cleanup_monitoring_pods(self, context: Context) -> None:
+ """
+ Run ``post_complete_action`` on each monitoring pod from ``self.pods``.
+
+ Honours ``on_finish_action`` (inherited from ``KubernetesPodOperator``)
+ and runs as a side-effect: any per-pod cleanup error is logged but
never
+ masks the in-flight exception (e.g. an ``AirflowException`` raised
because
+ the Job itself failed).
+ """
+ # Skip cleanup when control is leaving execute() via TaskDeferred: the
+ # deferred trigger still needs the monitoring pods to exist; the pods
+ # will be cleaned up by execute_complete() on resume.
+ exc = sys.exc_info()[1]
+ if isinstance(exc, TaskDeferred):
+ return
+ for pod in getattr(self, "pods", None) or []:
+ remote_pod = pod
+ try:
+ pod_name = getattr(pod.metadata, "name", None)
+ pod_namespace = getattr(pod.metadata, "namespace", None)
+ if pod_name and pod_namespace:
+ remote_pod = self.hook.get_pod(name=pod_name,
namespace=pod_namespace) or pod
+ except Exception:
+ remote_pod = pod
+ try:
+ self.post_complete_action(
+ pod=pod,
+ remote_pod=remote_pod,
+ context=context,
+ result=None,
+ )
+ except Exception:
+ # cleanup() can raise AirflowException for failed pods, and the
+ # k8s client can raise transport errors. For the Job operator
we
+ # prefer the Job-level failure (or the original exception) to
+ # propagate instead of any per-pod cleanup error.
+ self.log.warning(
+ "Error while cleaning up monitoring pod %s",
+ getattr(pod.metadata, "name", "<unknown>"),
+ exc_info=True,
+ )
+
+ def _cleanup_monitoring_pods_from_dict(
+ self,
+ context: Context,
+ pods_by_name: dict[str, k8s.V1Pod],
+ *,
+ unresolved_pods: list[tuple[str, str]] | None = None,
+ event_status: str | None = None,
+ ) -> None:
+ """
+ Run ``post_complete_action`` on each pod previously resolved via the
trigger event.
+
+ Same semantics as :meth:`_cleanup_monitoring_pods` - errors are logged
+ but never mask the in-flight exception.
+ """
+ for pod_name, pod in pods_by_name.items():
+ try:
+ self.post_complete_action(pod=pod, remote_pod=pod,
context=context, result=None)
+ except Exception:
+ self.log.warning(
+ "Error while cleaning up monitoring pod %s",
+ pod_name,
+ exc_info=True,
+ )
+ pod_phase = (
+ "Succeeded" if event_status == "success" else "Failed" if
event_status == "error" else None
+ )
+ for pod_name, pod_namespace in unresolved_pods or []:
+ fallback_pod = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(name=pod_name,
namespace=pod_namespace),
+ status=k8s.V1PodStatus(phase=pod_phase),
+ )
+ self.process_pod_deletion(fallback_pod, reraise=False)
def build_job_request_obj(self, context: Context | None = None) ->
k8s.V1Job:
"""
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
index b60373c2d53..271099598f6 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
@@ -145,8 +145,8 @@ class KubernetesJobTrigger(BaseTrigger):
{
"name": job.metadata.name,
"namespace": job.metadata.namespace,
- "pod_names": [pod_name for pod_name in self.pod_names] if
self.get_logs else None,
- "pod_namespace": self.pod_namespace if self.get_logs else None,
+ "pod_names": list(self.pod_names),
+ "pod_namespace": self.pod_namespace,
"status": "error" if error_message else "success",
"message": f"Job failed with error: {error_message}"
if error_message
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
index 6bde7c4772d..a0f3049cce5 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
@@ -25,6 +25,7 @@ from unittest.mock import patch
import pendulum
import pytest
from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client.rest import ApiException
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import DAG, DagModel, DagRun, TaskInstance
@@ -33,7 +34,9 @@ from airflow.providers.cncf.kubernetes.operators.job import (
KubernetesJobOperator,
KubernetesPatchJobOperator,
)
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.cncf.kubernetes.triggers.job import KubernetesJobTrigger
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
+from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
@@ -105,6 +108,25 @@ def create_context(task, persist_to_db=False,
map_index=None):
}
+def _recording_pod_manager():
+ """A ``pod_manager`` mock that records ``delete_pod`` invocations by pod
name.
+
+ Tests should assert against ``pm.deleted`` to observe which monitoring pods
+ were actually deleted — the only thing that matters to the cluster.
+ """
+ pm = mock.create_autospec(PodManager, instance=True)
+ pm.deleted = []
+ pm.delete_pod.side_effect = lambda pod:
pm.deleted.append(pod.metadata.name)
+ return pm
+
+
+def _pod(name, phase="Succeeded", namespace=POD_NAMESPACE):
+ """Build a real ``V1Pod`` with the minimum fields ``cleanup`` needs."""
+ pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name=name, namespace=namespace))
+ pod.status = k8s.V1PodStatus(phase=phase, container_statuses=[])
+ return pod
+
+
@pytest.mark.db_test
@pytest.mark.execution_timeout(300)
class TestKubernetesJobOperator:
@@ -883,29 +905,73 @@ class TestKubernetesJobOperator:
mocked_write_logs.assert_not_called()
@pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
- def test_execute_complete_pod_api_error_reraises(self, mock_hook,
mocked_write_logs):
- """Non-404 ApiExceptions should still be raised."""
- from kubernetes.client.rest import ApiException
+ def test_execute_complete_pod_api_error_does_not_raise(
+ self, mock_hook, mocked_write_logs, mock_pod_manager_prop
+ ):
+ """Non-404 errors fetching the monitoring pod must not fail
``execute_complete``.
- mock_ti = mock.MagicMock()
- context = {"ti": mock_ti}
- mock_job = mock.MagicMock()
+ Cleanup is best-effort: log writing is skipped, but the pod is still
+ targeted for deletion by name so it doesn't outlive the job.
+ """
+ pm = _recording_pod_manager()
+ mock_pod_manager_prop.return_value = pm
+ mock_hook.get_pod.side_effect = ApiException(status=403,
reason="Forbidden")
event = {
- "job": mock_job,
+ "job": {},
"status": "success",
"pod_names": [POD_NAME],
"pod_namespace": POD_NAMESPACE,
"xcom_result": None,
}
- mock_hook.get_pod.side_effect = ApiException(status=403,
reason="Forbidden")
+ # Must return normally — any raise here is a regression.
+ KubernetesJobOperator(task_id="test_task_id", get_logs=True,
do_xcom_push=False).execute_complete(
+ context={"ti": mock.create_autospec(TaskInstance, instance=True)},
event=event
+ )
- with pytest.raises(ApiException):
- KubernetesJobOperator(task_id="test_task_id", get_logs=True,
do_xcom_push=False).execute_complete(
- context=context, event=event
- )
+ # Best-effort cleanup: pod is still scheduled for deletion by name.
+ assert pm.deleted == [POD_NAME]
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
+ def test_execute_complete_uses_event_namespace_fallback(
+ self, mock_hook, mocked_write_logs, mock_pod_manager_prop
+ ):
+ """``pod_namespace`` falls back to ``event["namespace"]`` when absent.
+
+ The monitoring pod must be fetched and deleted from that namespace.
+ """
+ pm = _recording_pod_manager()
+ mock_pod_manager_prop.return_value = pm
+ pod = _pod(POD_NAME, namespace=JOB_NAMESPACE)
+
+ fetched_namespaces: list[str] = []
+
+ def get_pod(name, namespace):
+ fetched_namespaces.append(namespace)
+ return pod
+
+ mock_hook.get_pod.side_effect = get_pod
+
+ event = {
+ "job": {"metadata": {"name": JOB_NAME, "namespace":
JOB_NAMESPACE}},
+ "namespace": JOB_NAMESPACE,
+ "status": "success",
+ "pod_names": [POD_NAME],
+ "xcom_result": None,
+ }
+
+ KubernetesJobOperator(task_id="test_task_id", get_logs=True,
do_xcom_push=False).execute_complete(
+ context={"ti": mock.create_autospec(TaskInstance, instance=True)},
event=event
+ )
+
+ assert fetched_namespaces == [JOB_NAMESPACE]
+ assert pm.deleted == [POD_NAME]
@pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
@@ -944,7 +1010,7 @@ class TestKubernetesJobOperator:
@pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
def test_on_kill(self, mock_client):
- mock_job = mock.MagicMock()
+ mock_job = mock.create_autospec(k8s.V1Job, instance=True)
mock_job.metadata.name = JOB_NAME
mock_job.metadata.namespace = JOB_NAMESPACE
@@ -952,6 +1018,7 @@ class TestKubernetesJobOperator:
op.job = mock_job
op.on_kill()
+ assert op._killed is True
mock_client.delete_namespaced_job.assert_called_once_with(
name=JOB_NAME,
namespace=JOB_NAMESPACE,
@@ -961,7 +1028,7 @@ class TestKubernetesJobOperator:
@pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
def test_on_kill_termination_grace_period(self, mock_client):
- mock_job = mock.MagicMock()
+ mock_job = mock.create_autospec(k8s.V1Job, instance=True)
mock_job.metadata.name = JOB_NAME
mock_job.metadata.namespace = JOB_NAMESPACE
mock_termination_grace_period = mock.MagicMock()
@@ -991,6 +1058,26 @@ class TestKubernetesJobOperator:
mock_client.delete_namespaced_job.assert_not_called()
mock_serialize.assert_not_called()
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
+ def test_on_kill_respects_keep_pod_action(self, mock_client,
mock_pod_manager_prop):
+ """With ``on_kill_action="keep_pod"`` the job is deleted but
monitoring pods survive."""
+ pm = _recording_pod_manager()
+ mock_pod_manager_prop.return_value = pm
+ mock_job = mock.create_autospec(k8s.V1Job, instance=True)
+ mock_job.metadata.name = JOB_NAME
+ mock_job.metadata.namespace = JOB_NAMESPACE
+ pod = _pod(POD_NAME)
+
+ op = KubernetesJobOperator(task_id="test_task_id",
on_kill_action="keep_pod")
+ op.job = mock_job
+ op.pods = [pod]
+ op.on_kill()
+
+ assert op._killed is True
+ assert pm.deleted == []
+
@pytest.mark.parametrize("parallelism", [1, 2])
@pytest.mark.parametrize("do_xcom_push", [True, False])
@pytest.mark.parametrize("get_logs", [True, False])
@@ -1193,6 +1280,279 @@ class TestKubernetesJobOperator:
mock_hook.return_value.create_job.assert_not_called()
mock_get_pods.assert_not_called()
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+ @patch(HOOK_CLASS)
+ def test_execute_cleans_up_all_pods_on_success(
+ self,
+ mock_hook,
+ mock_create_job,
+ mock_build_job_request_obj,
+ mock_get_pods,
+ mock_find_pod,
+ mock_pod_manager_prop,
+ ):
+ """Every monitoring pod is deleted after a successful job, including
under ``parallelism>1``.
+
+ Real ``post_complete_action`` runs; only the K8s SDK boundary is faked.
+ """
+ pm = _recording_pod_manager()
+ mock_pod_manager_prop.return_value = pm
+ mock_hook.return_value.is_job_failed.return_value = False
+ pod_1 = _pod("pod-1")
+ pod_2 = _pod("pod-2")
+ mock_get_pods.return_value = [pod_1, pod_2]
+ mock_find_pod.side_effect = [pod_1, pod_2]
+ mock_hook.return_value.get_pod.side_effect = lambda name, namespace: (
+ pod_1 if name == "pod-1" else pod_2
+ )
+
+ op = KubernetesJobOperator(task_id="test_task_id",
wait_until_job_complete=True, parallelism=2)
+ op.execute(context={"ti": mock.create_autospec(TaskInstance,
instance=True)})
+
+ assert sorted(pm.deleted) == ["pod-1", "pod-2"]
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+ @patch(HOOK_CLASS)
+ def test_execute_cleans_up_pod_on_failure(
+ self,
+ mock_hook,
+ mock_create_job,
+ mock_build_job_request_obj,
+ mock_get_pods,
+ mock_find_pod,
+ mock_pod_manager_prop,
+ ):
+ """When the job fails, the monitoring pod is still deleted and the
error propagates."""
+ pm = _recording_pod_manager()
+ mock_pod_manager_prop.return_value = pm
+ mock_hook.return_value.is_job_failed.return_value = "Error"
+ pod = _pod("pod-1", phase="Failed")
+ mock_get_pods.return_value = [pod]
+ mock_find_pod.return_value = pod
+ mock_hook.return_value.get_pod.return_value = pod
+
+ op = KubernetesJobOperator(task_id="test_task_id",
wait_until_job_complete=True)
+ with pytest.raises(AirflowException, match="is failed with error"):
+ op.execute(context={"ti": mock.create_autospec(TaskInstance,
instance=True)})
+
+ assert pm.deleted == ["pod-1"]
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+ @patch(HOOK_CLASS)
+ def test_execute_keep_pod_skips_cleanup(
+ self,
+ mock_hook,
+ mock_create_job,
+ mock_build_job_request_obj,
+ mock_get_pods,
+ mock_pod_manager_prop,
+ ):
+ """``on_finish_action=keep_pod`` leaves the monitoring pod
untouched."""
+ pm = _recording_pod_manager()
+ mock_pod_manager_prop.return_value = pm
+ mock_hook.return_value.is_job_failed.return_value = False
+ pod = _pod("pod-1")
+ mock_get_pods.return_value = [pod]
+ mock_hook.return_value.get_pod.return_value = pod
+
+ op = KubernetesJobOperator(
+ task_id="test_task_id", wait_until_job_complete=True,
on_finish_action="keep_pod"
+ )
+ op.execute(context={"ti": mock.create_autospec(TaskInstance,
instance=True)})
+
+ assert pm.deleted == []
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+ @patch(HOOK_CLASS)
+ def test_execute_cleanup_error_on_success_does_not_raise(
+ self,
+ mock_hook,
+ mock_create_job,
+ mock_build_job_request_obj,
+ mock_get_pods,
+ mock_find_pod,
+ mock_pod_manager_prop,
+ ):
+ """A delete failure on one pod must not stop cleanup of the others nor
fail the task.
+
+ Exercises the real ``post_complete_action`` against a recording
pod_manager
+ whose first ``delete_pod`` raises at the K8s SDK boundary.
+ """
+ mock_hook.return_value.is_job_failed.return_value = False
+ pod_1 = _pod("pod-1")
+ pod_2 = _pod("pod-2")
+ mock_get_pods.return_value = [pod_1, pod_2]
+ mock_find_pod.side_effect = [pod_1, pod_2]
+ mock_hook.return_value.get_pod.side_effect = lambda name, namespace: (
+ pod_1 if name == "pod-1" else pod_2
+ )
+ pm = _recording_pod_manager()
+ original_side_effect = pm.delete_pod.side_effect
+
+ def delete_side_effect(pod):
+ if pod.metadata.name == "pod-1":
+ raise ApiException(status=500, reason="boom")
+ original_side_effect(pod)
+
+ pm.delete_pod.side_effect = delete_side_effect
+ mock_pod_manager_prop.return_value = pm
+
+ op = KubernetesJobOperator(task_id="test_task_id",
wait_until_job_complete=True, parallelism=2)
+ op.execute(context={"ti": mock.create_autospec(TaskInstance,
instance=True)})
+
+ # pod-1's delete blew up; pod-2 still got deleted.
+ assert pm.deleted == ["pod-2"]
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+ @patch(HOOK_CLASS)
+ def test_execute_cleanup_error_does_not_mask_job_failure(
+ self,
+ mock_hook,
+ mock_create_job,
+ mock_build_job_request_obj,
+ mock_get_pods,
+ mock_find_pod,
+ mock_pod_manager_prop,
+ ):
+ """When the job already failed, a cleanup error must not replace the
job-level error.
+
+ The recording side_effect lets us observe that cleanup was *attempted*
+ even when it raises — the contract is "attempt cleanup, swallow any
+ error, preserve the original job failure".
+ """
+ mock_hook.return_value.is_job_failed.return_value = "Error"
+ pod = _pod("pod-1", phase="Failed")
+ mock_get_pods.return_value = [pod]
+ mock_find_pod.return_value = pod
+ mock_hook.return_value.get_pod.return_value = pod
+ attempted: list[str] = []
+
+ def boom(pod):
+ attempted.append(pod.metadata.name)
+ raise ApiException(status=500, reason="boom")
+
+ pm = mock.create_autospec(PodManager, instance=True)
+ pm.delete_pod.side_effect = boom
+ mock_pod_manager_prop.return_value = pm
+
+ op = KubernetesJobOperator(task_id="test_task_id",
wait_until_job_complete=True)
+ # The job-level error wins; the cleanup ApiException is swallowed.
+ with pytest.raises(AirflowException, match="is failed with error"):
+ op.execute(context={"ti": mock.create_autospec(TaskInstance,
instance=True)})
+
+ # Cleanup was still attempted even though it raised.
+ assert attempted == ["pod-1"]
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.execute_deferrable"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+ @patch(HOOK_CLASS)
+ def test_execute_does_not_cleanup_when_deferring(
+ self,
+ mock_hook,
+ mock_create_job,
+ mock_build_job_request_obj,
+ mock_get_pods,
+ mock_execute_deferrable,
+ mock_pod_manager_prop,
+ ):
+ """A ``TaskDeferred`` on the way out of ``execute()`` must leave pods
alive.
+
+ The trigger still needs to watch them; cleanup happens on resume in
+ ``execute_complete``.
+ """
+ pm = _recording_pod_manager()
+ mock_pod_manager_prop.return_value = pm
+ pod = _pod("pod-1")
+ mock_get_pods.return_value = [pod]
+ mock_execute_deferrable.side_effect = TaskDeferred(
+ trigger=mock.create_autospec(KubernetesJobTrigger, instance=True),
method_name="execute_complete"
+ )
+
+ op = KubernetesJobOperator(task_id="test_task_id",
wait_until_job_complete=True, deferrable=True)
+ with pytest.raises(TaskDeferred):
+ op.execute(context={"ti": mock.create_autospec(TaskInstance,
instance=True)})
+
+ assert pm.deleted == []
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+ @patch(HOOK_CLASS)
+ def test_execute_complete_cleans_up_pods(self, mock_hook,
mock_pod_manager_prop):
+ """Deferrable resume path cleans up every monitoring pod listed in the
event.
+
+ Covers ``parallelism>1`` and the original leak vector
``get_logs=False``.
+ Real ``post_complete_action`` runs; only the K8s SDK boundary is faked.
+ """
+ pm = _recording_pod_manager()
+ mock_pod_manager_prop.return_value = pm
+ pod_1 = _pod("pod-1")
+ pod_2 = _pod("pod-2")
+ mock_hook.return_value.get_pod.side_effect = lambda name, namespace: (
+ pod_1 if name == "pod-1" else pod_2
+ )
+ event = {
+ "status": "success",
+ "message": "ok",
+ "job": {"metadata": {"name": JOB_NAME, "namespace":
JOB_NAMESPACE}},
+ "pod_names": ["pod-1", "pod-2"],
+ "pod_namespace": POD_NAMESPACE,
+ "xcom_result": None,
+ }
+
+ KubernetesJobOperator(task_id="test_task_id",
get_logs=False).execute_complete(
+ context={"ti": mock.create_autospec(TaskInstance, instance=True)},
event=event
+ )
+
+ assert sorted(pm.deleted) == ["pod-1", "pod-2"]
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"),
new_callable=mock.PropertyMock)
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
+ def test_on_kill_deletes_monitoring_pods(self, mock_client,
mock_pod_manager_prop):
+ """``on_kill`` deletes the job and every monitoring pod owned by the
operator."""
+ pm = _recording_pod_manager()
+ mock_pod_manager_prop.return_value = pm
+ mock_job = mock.create_autospec(k8s.V1Job, instance=True)
+ mock_job.metadata.name = JOB_NAME
+ mock_job.metadata.namespace = JOB_NAMESPACE
+ pod_1 = _pod("pod-1")
+ pod_2 = _pod("pod-2")
+
+ op = KubernetesJobOperator(task_id="test_task_id")
+ op.job = mock_job
+ op.pods = [pod_1, pod_2]
+ op.on_kill()
+
+ assert sorted(pm.deleted) == ["pod-1", "pod-2"]
+
@pytest.mark.db_test
@pytest.mark.execution_timeout(300)
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_job.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_job.py
index 4f6f6597fcf..95d16776b81 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_job.py
@@ -20,6 +20,7 @@ from __future__ import annotations
from unittest import mock
import pytest
+from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.triggers.job import KubernetesJobTrigger
from airflow.triggers.base import TriggerEvent
@@ -120,6 +121,39 @@ class TestKubernetesJobTrigger:
}
)
+ @pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_CLASS}.hook")
+ async def test_run_success_emits_pod_info_when_get_logs_false(self,
mock_hook):
+ """pod_names/pod_namespace must be in the event even with
get_logs=False.
+
+ The operator's execute_complete needs them to clean up monitoring pods.
+ """
+ trigger_no_logs = KubernetesJobTrigger(
+ job_name=JOB_NAME,
+ job_namespace=NAMESPACE,
+ pod_names=[POD_NAME],
+ pod_namespace=NAMESPACE,
+ base_container_name=CONTAINER_NAME,
+ kubernetes_conn_id=CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ cluster_context=CLUSTER_CONTEXT,
+ config_file=CONFIG_FILE,
+ in_cluster=IN_CLUSTER,
+ get_logs=False,
+ do_xcom_push=XCOM_PUSH,
+ )
+ mock_job = mock.create_autospec(k8s.V1Job, instance=True)
+ mock_job.metadata.name = JOB_NAME
+ mock_job.metadata.namespace = NAMESPACE
+ mock_hook.wait_until_job_complete.side_effect =
mock.AsyncMock(return_value=mock_job)
+ mock_hook.is_job_failed.return_value = False
+
+ event_actual = await trigger_no_logs.run().asend(None)
+
+ assert event_actual.payload["pod_names"] == [POD_NAME]
+ assert event_actual.payload["pod_namespace"] == NAMESPACE
+ assert event_actual.payload["status"] == "success"
+
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_CLASS}.hook")
async def test_run_fail(self, mock_hook, trigger):