This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 69e4445b438 Fix monitoring-pod leak in KubernetesJobOperator (#67333)
69e4445b438 is described below

commit 69e4445b438c8b95acc0daf75a5d4187ee19b098
Author: Ville Jyrkkä <[email protected]>
AuthorDate: Fri May 29 14:43:02 2026 +0300

    Fix monitoring-pod leak in KubernetesJobOperator (#67333)
    
    * Fixing pod leak in KubernetesJobOperator (#1)
    
    * fix(providers/cncf/kubernetes): clean up monitoring pods in 
KubernetesJobOperator
    
    KubernetesJobOperator inherited from KubernetesPodOperator but overrode
    execute() without calling post_complete_action(), so the monitoring /
    log-streaming pods discovered via get_pods() were never deleted. These
    pods have no ownerReferences to the V1Job, so ttl_seconds_after_finished
    and the Foreground cascade in on_kill don't reap them either.
    
    - execute() and execute_complete() now wrap their work in try/finally and
      call post_complete_action() for each pod in self.pods. on_finish_action
      (delete_pod / delete_succeeded_pod / keep_pod) is now honoured.
    - on_kill() additionally calls pod_manager.delete_pod() for each
      monitoring pod (the Job's foreground cascade doesn't reach them).
    - Per-pod cleanup errors are logged but never mask the in-flight
      exception, so Job-level failures keep propagating.
    - execute_complete() resolves monitoring pods once and shares the lookup
      between the log-retrieval path and the cleanup path.
    - Added unit tests, a bugfix newsfragment, and an operators.rst section
      documenting the cleanup contract.
    
    * Address code review feedback: remove dead PodNotFoundException check, 
drop unused import, relax pod-deletion ordering in test, fix trailing comma
    
    * Potential fix for pull request finding
    
    In _cleanup_monitoring_pods, remote_pod is resolved via find_pod(), which 
is designed to locate a single matching pod by task-instance labels and can 
invoke duplicate-pod resolution logic (process_duplicate_label_pods). For 
KubernetesJobOperator with parallelism > 1, this lookup can return the wrong 
pod (or trigger duplicate-handling side effects), so post_complete_action() may 
receive a mismatched remote_pod. Consider using the already-discovered pod’s 
name/namespace to refresh state [...]
    
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    
    * Use isinstance(exc, TaskDeferred) instead of brittle string comparison
    
    * Potential fix for pull request finding
    
    The new unit tests add several mock.MagicMock() instances (pods, jobs, TI, 
etc.) without spec/autospec, and some patch() usages also create non-spec'd 
mocks by default. Using autospec=True on patches and 
create_autospec(...)/MagicMock(spec=...) for key Kubernetes objects helps catch 
typos/attribute mismatches in these tests and aligns with Airflow’s test 
hardening guidance.
    
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    
    * Address PR review comments: fix trigger pod_names, on_kill logging, and 
test assertions
    
    - triggers/job.py: Always include pod_names/pod_namespace in trigger event
      regardless of get_logs setting, so execute_complete() can reliably clean
      up monitoring pods even when get_logs=False
    - operators/job.py: Log unexpected ApiException in on_kill() instead of
      suppressing all ApiExceptions; remove unused `suppress` import
    - tests/test_job.py: Rewrite test_execute_respects_keep_pod and
      test_execute_deletes_pod_default to keep process_pod_deletion real and
      assert on pod_manager.delete_pod; stub hook.get_pod for remote_pod 
resolution
    - tests/test_job.py: Add regression test for get_logs=False deferrable path
    
    * Fix orphaned test_on_kill_deletes_monitoring_pods method body after 
accidental deletion of method signature
    
    * Make pod resolution best-effort in execute_complete
    
    * Address remaining KubernetesJobOperator review comments
    
    * Finalize review-comment fixes for KubernetesJobOperator
    
    * Fix remaining KubernetesJobOperator review comments
    
    * Update KubernetesJobOperator docs for action semantics
    
    * Improve KubernetesJobOperator newsfragment readability
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: Ville Jyrkkä <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    
    * Apply ruff format
    
    * Move provider changelog entry; drop newsfragment
    
    * Apply prek docstring auto-fix (D212 multi-line summary)
    
    * Drop changelog entry — added by release manager at release time
    
    * Cover remaining KubernetesJobOperator monitoring-pod cleanup paths
    
    * Consolidate PR tests: drop call-counting, dedupe, observe state
    
    * Apply test review fixes: hoist imports, add state assertions, dedupe with 
_pod helper
    
    * Fix Copilot review: spec all MagicMock usages in KubernetesJobOperator 
tests
    
    - _recording_pod_manager(): use create_autospec(PodManager) instead of 
MagicMock
    - test error masking: use create_autospec(PodManager) for boom-raising pm
    - test_execute_does_not_cleanup_when_deferring: use 
create_autospec(KubernetesJobTrigger) for trigger
    - test_execute_complete_pod_api_error_does_not_raise: use plain {} for 
event["job"] field
    - test_run_success_emits_pod_info_when_get_logs_false (triggers): use 
create_autospec(k8s.V1Job)
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 providers/cncf/kubernetes/docs/operators.rst       |  23 ++
 .../providers/cncf/kubernetes/operators/job.py     | 278 +++++++++++----
 .../providers/cncf/kubernetes/triggers/job.py      |   4 +-
 .../unit/cncf/kubernetes/operators/test_job.py     | 390 ++++++++++++++++++++-
 .../unit/cncf/kubernetes/triggers/test_job.py      |  34 ++
 5 files changed, 643 insertions(+), 86 deletions(-)

diff --git a/providers/cncf/kubernetes/docs/operators.rst 
b/providers/cncf/kubernetes/docs/operators.rst
index 72521af704f..a3f8e417237 100644
--- a/providers/cncf/kubernetes/docs/operators.rst
+++ b/providers/cncf/kubernetes/docs/operators.rst
@@ -713,6 +713,29 @@ It means that user can use all parameters from 
:class:`~airflow.providers.cncf.k
 
 More information about the Jobs here: `Kubernetes Job Documentation 
<https://kubernetes.io/docs/concepts/workloads/controllers/job/>`__
 
+Pod cleanup and ``on_finish_action``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When ``wait_until_job_complete=True``, the operator discovers Job pods via
+``get_pods()`` and streams logs/XCom from those pods while the Job runs.
+
+The inherited ``on_finish_action`` parameter controls what happens to these
+discovered pods at the end of the task:
+
+* ``delete_pod`` (default) — the pod is deleted after the task
+  finishes (success or failure).
+* ``delete_succeeded_pod`` — the pod is deleted only when the task
+  succeeded.
+* ``delete_active_pod`` — the pod is deleted only if it is still
+  active (``Pending`` or ``Running``).
+* ``keep_pod`` — the pod is kept (useful for offline log
+  inspection).
+
+When the task is killed, ``on_kill`` deletes the Job (with foreground cascade).
+For discovered pods, deletion is controlled by ``on_kill_action``:
+``delete_pod`` attempts direct pod deletion and ``keep_pod`` skips it.
+
+
 
 .. _howto/operator:KubernetesDeleteJobOperator:
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 510346d34ad..0e8540d815c 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -22,6 +22,7 @@ import copy
 import json
 import logging
 import os
+import sys
 import warnings
 from collections.abc import Sequence
 from functools import cached_property
@@ -41,9 +42,9 @@ from 
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
 from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, 
merge_objects
 from airflow.providers.cncf.kubernetes.triggers.job import KubernetesJobTrigger
-from airflow.providers.cncf.kubernetes.utils.pod_manager import 
EMPTY_XCOM_RESULT, PodNotFoundException
+from airflow.providers.cncf.kubernetes.utils.pod_manager import 
EMPTY_XCOM_RESULT, OnKillAction
 from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS
-from airflow.providers.common.compat.sdk import AirflowException, conf
+from airflow.providers.common.compat.sdk import AirflowException, 
TaskDeferred, conf
 from airflow.utils import yaml
 
 if AIRFLOW_V_3_1_PLUS:
@@ -218,44 +219,49 @@ class KubernetesJobOperator(KubernetesPodOperator):
         ti.xcom_push(key="job_name", value=self.job.metadata.name)
         ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
 
-        if self.wait_until_job_complete:
-            self.pods: Sequence[k8s.V1Pod] = self.get_pods(
-                pod_request_obj=self.pod_request_obj, context=context
-            )
+        try:
+            if self.wait_until_job_complete:
+                self.pods: Sequence[k8s.V1Pod] = self.get_pods(
+                    pod_request_obj=self.pod_request_obj, context=context
+                )
 
-            if self.deferrable:
-                self.execute_deferrable()
-                return
+                if self.deferrable:
+                    self.execute_deferrable()
+                    # execute_deferrable raises TaskDeferred; cleanup is 
handled
+                    # by execute_complete on resume.
+                    return
+
+                if self.do_xcom_push:
+                    xcom_result = []
+                    for pod in self.pods:
+                        self.pod_manager.await_container_completion(
+                            pod=pod, container_name=self.base_container_name
+                        )
+                        
self.pod_manager.await_xcom_sidecar_container_start(pod=pod)
+                        xcom_result.append(self.extract_xcom(pod=pod))
+                self.job = self.hook.wait_until_job_complete(
+                    job_name=self.job.metadata.name,
+                    namespace=self.job.metadata.namespace,
+                    job_poll_interval=self.job_poll_interval,
+                )
+                if self.get_logs:
+                    for pod in self.pods:
+                        self.pod_manager.fetch_requested_container_logs(
+                            pod=pod,
+                            containers=self.container_logs,
+                            follow_logs=True,
+                        )
 
-            if self.do_xcom_push:
-                xcom_result = []
-                for pod in self.pods:
-                    self.pod_manager.await_container_completion(
-                        pod=pod, container_name=self.base_container_name
-                    )
-                    
self.pod_manager.await_xcom_sidecar_container_start(pod=pod)
-                    xcom_result.append(self.extract_xcom(pod=pod))
-            self.job = self.hook.wait_until_job_complete(
-                job_name=self.job.metadata.name,
-                namespace=self.job.metadata.namespace,
-                job_poll_interval=self.job_poll_interval,
-            )
-            if self.get_logs:
-                for pod in self.pods:
-                    self.pod_manager.fetch_requested_container_logs(
-                        pod=pod,
-                        containers=self.container_logs,
-                        follow_logs=True,
+            ti.xcom_push(key="job", value=self.job.to_dict())
+            if self.wait_until_job_complete:
+                if error_message := self.hook.is_job_failed(job=self.job):
+                    raise AirflowException(
+                        f"Kubernetes job '{self.job.metadata.name}' is failed 
with error '{error_message}'"
                     )
-
-        ti.xcom_push(key="job", value=self.job.to_dict())
-        if self.wait_until_job_complete:
-            if error_message := self.hook.is_job_failed(job=self.job):
-                raise AirflowException(
-                    f"Kubernetes job '{self.job.metadata.name}' is failed with 
error '{error_message}'"
-                )
-            if self.do_xcom_push:
-                return xcom_result[0] if self.unwrap_single and 
len(xcom_result) == 1 else xcom_result
+                if self.do_xcom_push:
+                    return xcom_result[0] if self.unwrap_single and 
len(xcom_result) == 1 else xcom_result
+        finally:
+            self._cleanup_monitoring_pods(context)
 
     def execute_deferrable(self):
         self.defer(
@@ -277,39 +283,81 @@ class KubernetesJobOperator(KubernetesPodOperator):
         )
 
     def execute_complete(self, context: Context, event: dict, **kwargs):
-        ti = context["ti"]
-        ti.xcom_push(key="job", value=event["job"])
-        if event["status"] == "error":
-            raise AirflowException(event["message"])
-
-        if self.get_logs:
-            for pod_name in event["pod_names"]:
-                pod_namespace = event["pod_namespace"]
-                try:
-                    pod = self.hook.get_pod(pod_name, pod_namespace)
-                except ApiException as e:
-                    if e.status == 404:
-                        self.log.warning(
-                            "Pod %s in namespace %s not found (possibly 
deleted). Skipping log retrieval.",
-                            pod_name,
-                            pod_namespace,
-                        )
+        # Resolve monitoring pods up front so the log-retrieval path and the
+        # cleanup path in the finally block share the same lookup (no double
+        # ``hook.get_pod`` calls).
+        pods_by_name: dict[str, k8s.V1Pod] = {}
+        event_job = event.get("job")
+        job_namespace = (
+            event_job.get("metadata", {}).get("namespace") if 
isinstance(event_job, dict) else None
+        )
+        pod_namespace = event.get("pod_namespace") or event.get("namespace") 
or job_namespace
+        unresolved_pods: list[tuple[str, str]] = []
+        for pod_name in event.get("pod_names") or []:
+            if not pod_namespace:
+                self.log.warning(
+                    "Skipping pod %s lookup because no pod namespace was 
provided in trigger event.",
+                    pod_name,
+                )
+                continue
+            try:
+                pod = self.hook.get_pod(pod_name, pod_namespace)
+            except ApiException as e:
+                if e.status == 404:
+                    self.log.warning(
+                        "Pod %s in namespace %s not found (possibly deleted).",
+                        pod_name,
+                        pod_namespace,
+                    )
+                else:
+                    self.log.warning(
+                        "Failed to retrieve pod %s in namespace %s: %s. 
Skipping.",
+                        pod_name,
+                        pod_namespace,
+                        e,
+                    )
+                    unresolved_pods.append((pod_name, pod_namespace))
+                continue
+            except Exception as e:
+                self.log.warning(
+                    "Failed to retrieve pod %s in namespace %s: %s. Skipping.",
+                    pod_name,
+                    pod_namespace,
+                    e,
+                )
+                unresolved_pods.append((pod_name, pod_namespace))
+                continue
+            if pod is not None:
+                pods_by_name[pod_name] = pod
+
+        try:
+            ti = context["ti"]
+            ti.xcom_push(key="job", value=event["job"])
+            if event["status"] == "error":
+                raise AirflowException(event["message"])
+
+            if self.get_logs:
+                for pod_name in event.get("pod_names") or []:
+                    if pod_name not in pods_by_name:
+                        # Pod was reported by the trigger but missing now 
(e.g. 404)
+                        self.log.warning("Skipping log retrieval for pod %s 
(not found).", pod_name)
+                        continue
+                    self._write_logs(pods_by_name[pod_name])
+
+            if self.do_xcom_push:
+                xcom_results: list[Any | None] = []
+                for xcom_result in event["xcom_result"]:
+                    if isinstance(xcom_result, str) and xcom_result.rstrip() 
== EMPTY_XCOM_RESULT:
+                        self.log.info("xcom result file is empty.")
+                        xcom_results.append(None)
                         continue
-                    raise
-                if not pod:
-                    raise PodNotFoundException("Could not find pod after 
resuming from deferral")
-                self._write_logs(pod)
-
-        if self.do_xcom_push:
-            xcom_results: list[Any | None] = []
-            for xcom_result in event["xcom_result"]:
-                if isinstance(xcom_result, str) and xcom_result.rstrip() == 
EMPTY_XCOM_RESULT:
-                    self.log.info("xcom result file is empty.")
-                    xcom_results.append(None)
-                    continue
-                self.log.info("xcom result: \n%s", xcom_result)
-                xcom_results.append(json.loads(xcom_result))
-            return xcom_results[0] if self.unwrap_single and len(xcom_results) 
== 1 else xcom_results
+                    self.log.info("xcom result: \n%s", xcom_result)
+                    xcom_results.append(json.loads(xcom_result))
+                return xcom_results[0] if self.unwrap_single and 
len(xcom_results) == 1 else xcom_results
+        finally:
+            self._cleanup_monitoring_pods_from_dict(
+                context, pods_by_name, unresolved_pods=unresolved_pods, 
event_status=event.get("status")
+            )
 
     @staticmethod
     def deserialize_job_template_file(path: str) -> k8s.V1Job:
@@ -334,6 +382,7 @@ class KubernetesJobOperator(KubernetesPodOperator):
         return api_client._ApiClient__deserialize_model(job, k8s.V1Job)
 
     def on_kill(self) -> None:
+        self._killed = True
         if self.job:
             job = self.job
             kwargs = {
@@ -344,6 +393,97 @@ class KubernetesJobOperator(KubernetesPodOperator):
             if self.termination_grace_period is not None:
                 
kwargs.update(grace_period_seconds=self.termination_grace_period)
             self.job_client.delete_namespaced_job(**kwargs)
+        if self.on_kill_action == OnKillAction.KEEP_POD:
+            self.log.info(
+                "Skipping monitoring pod deletion since on_kill_action is set 
to %r.",
+                self.on_kill_action.value,
+            )
+            return
+        # Monitoring pods discovered via get_pods() have no ownerReferences and
+        # are not reaped by the Job's foreground cascade. Delete them directly.
+        for pod in getattr(self, "pods", None) or []:
+            try:
+                self.pod_manager.delete_pod(pod)
+            except ApiException:
+                self.log.exception(
+                    "Unable to delete monitoring pod %s",
+                    getattr(pod.metadata, "name", "<unknown>"),
+                )
+
+    def _cleanup_monitoring_pods(self, context: Context) -> None:
+        """
+        Run ``post_complete_action`` on each monitoring pod from ``self.pods``.
+
+        Honours ``on_finish_action`` (inherited from ``KubernetesPodOperator``)
+        and runs as a side-effect: any per-pod cleanup error is logged but 
never
+        masks the in-flight exception (e.g. an ``AirflowException`` raised 
because
+        the Job itself failed).
+        """
+        # Skip cleanup when control is leaving execute() via TaskDeferred: the
+        # deferred trigger still needs the monitoring pods to exist; the pods
+        # will be cleaned up by execute_complete() on resume.
+        exc = sys.exc_info()[1]
+        if isinstance(exc, TaskDeferred):
+            return
+        for pod in getattr(self, "pods", None) or []:
+            remote_pod = pod
+            try:
+                pod_name = getattr(pod.metadata, "name", None)
+                pod_namespace = getattr(pod.metadata, "namespace", None)
+                if pod_name and pod_namespace:
+                    remote_pod = self.hook.get_pod(name=pod_name, 
namespace=pod_namespace) or pod
+            except Exception:
+                remote_pod = pod
+            try:
+                self.post_complete_action(
+                    pod=pod,
+                    remote_pod=remote_pod,
+                    context=context,
+                    result=None,
+                )
+            except Exception:
+                # cleanup() can raise AirflowException for failed pods, and the
+                # k8s client can raise transport errors. For the Job operator 
we
+                # prefer the Job-level failure (or the original exception) to
+                # propagate instead of any per-pod cleanup error.
+                self.log.warning(
+                    "Error while cleaning up monitoring pod %s",
+                    getattr(pod.metadata, "name", "<unknown>"),
+                    exc_info=True,
+                )
+
+    def _cleanup_monitoring_pods_from_dict(
+        self,
+        context: Context,
+        pods_by_name: dict[str, k8s.V1Pod],
+        *,
+        unresolved_pods: list[tuple[str, str]] | None = None,
+        event_status: str | None = None,
+    ) -> None:
+        """
+        Run ``post_complete_action`` on each pod previously resolved via the 
trigger event.
+
+        Same semantics as :meth:`_cleanup_monitoring_pods` - errors are logged
+        but never mask the in-flight exception.
+        """
+        for pod_name, pod in pods_by_name.items():
+            try:
+                self.post_complete_action(pod=pod, remote_pod=pod, 
context=context, result=None)
+            except Exception:
+                self.log.warning(
+                    "Error while cleaning up monitoring pod %s",
+                    pod_name,
+                    exc_info=True,
+                )
+        pod_phase = (
+            "Succeeded" if event_status == "success" else "Failed" if 
event_status == "error" else None
+        )
+        for pod_name, pod_namespace in unresolved_pods or []:
+            fallback_pod = k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(name=pod_name, 
namespace=pod_namespace),
+                status=k8s.V1PodStatus(phase=pod_phase),
+            )
+            self.process_pod_deletion(fallback_pod, reraise=False)
 
     def build_job_request_obj(self, context: Context | None = None) -> 
k8s.V1Job:
         """
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
index b60373c2d53..271099598f6 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
@@ -145,8 +145,8 @@ class KubernetesJobTrigger(BaseTrigger):
             {
                 "name": job.metadata.name,
                 "namespace": job.metadata.namespace,
-                "pod_names": [pod_name for pod_name in self.pod_names] if 
self.get_logs else None,
-                "pod_namespace": self.pod_namespace if self.get_logs else None,
+                "pod_names": list(self.pod_names),
+                "pod_namespace": self.pod_namespace,
                 "status": "error" if error_message else "success",
                 "message": f"Job failed with error: {error_message}"
                 if error_message
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
index 6bde7c4772d..a0f3049cce5 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
@@ -25,6 +25,7 @@ from unittest.mock import patch
 import pendulum
 import pytest
 from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client.rest import ApiException
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models import DAG, DagModel, DagRun, TaskInstance
@@ -33,7 +34,9 @@ from airflow.providers.cncf.kubernetes.operators.job import (
     KubernetesJobOperator,
     KubernetesPatchJobOperator,
 )
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.cncf.kubernetes.triggers.job import KubernetesJobTrigger
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
+from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.types import DagRunType
@@ -105,6 +108,25 @@ def create_context(task, persist_to_db=False, 
map_index=None):
     }
 
 
+def _recording_pod_manager():
+    """A ``pod_manager`` mock that records ``delete_pod`` invocations by pod 
name.
+
+    Tests should assert against ``pm.deleted`` to observe which monitoring pods
+    were actually deleted — the only thing that matters to the cluster.
+    """
+    pm = mock.create_autospec(PodManager, instance=True)
+    pm.deleted = []
+    pm.delete_pod.side_effect = lambda pod: 
pm.deleted.append(pod.metadata.name)
+    return pm
+
+
+def _pod(name, phase="Succeeded", namespace=POD_NAMESPACE):
+    """Build a real ``V1Pod`` with the minimum fields ``cleanup`` needs."""
+    pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name=name, namespace=namespace))
+    pod.status = k8s.V1PodStatus(phase=phase, container_statuses=[])
+    return pod
+
+
 @pytest.mark.db_test
 @pytest.mark.execution_timeout(300)
 class TestKubernetesJobOperator:
@@ -883,29 +905,73 @@ class TestKubernetesJobOperator:
         mocked_write_logs.assert_not_called()
 
     @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
-    def test_execute_complete_pod_api_error_reraises(self, mock_hook, 
mocked_write_logs):
-        """Non-404 ApiExceptions should still be raised."""
-        from kubernetes.client.rest import ApiException
+    def test_execute_complete_pod_api_error_does_not_raise(
+        self, mock_hook, mocked_write_logs, mock_pod_manager_prop
+    ):
+        """Non-404 errors fetching the monitoring pod must not fail 
``execute_complete``.
 
-        mock_ti = mock.MagicMock()
-        context = {"ti": mock_ti}
-        mock_job = mock.MagicMock()
+        Cleanup is best-effort: log writing is skipped, but the pod is still
+        targeted for deletion by name so it doesn't outlive the job.
+        """
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        mock_hook.get_pod.side_effect = ApiException(status=403, 
reason="Forbidden")
         event = {
-            "job": mock_job,
+            "job": {},
             "status": "success",
             "pod_names": [POD_NAME],
             "pod_namespace": POD_NAMESPACE,
             "xcom_result": None,
         }
 
-        mock_hook.get_pod.side_effect = ApiException(status=403, 
reason="Forbidden")
+        # Must return normally — any raise here is a regression.
+        KubernetesJobOperator(task_id="test_task_id", get_logs=True, 
do_xcom_push=False).execute_complete(
+            context={"ti": mock.create_autospec(TaskInstance, instance=True)}, 
event=event
+        )
 
-        with pytest.raises(ApiException):
-            KubernetesJobOperator(task_id="test_task_id", get_logs=True, 
do_xcom_push=False).execute_complete(
-                context=context, event=event
-            )
+        # Best-effort cleanup: pod is still scheduled for deletion by name.
+        assert pm.deleted == [POD_NAME]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
+    def test_execute_complete_uses_event_namespace_fallback(
+        self, mock_hook, mocked_write_logs, mock_pod_manager_prop
+    ):
+        """``pod_namespace`` falls back to ``event["namespace"]`` when absent.
+
+        The monitoring pod must be fetched and deleted from that namespace.
+        """
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        pod = _pod(POD_NAME, namespace=JOB_NAMESPACE)
+
+        fetched_namespaces: list[str] = []
+
+        def get_pod(name, namespace):
+            fetched_namespaces.append(namespace)
+            return pod
+
+        mock_hook.get_pod.side_effect = get_pod
+
+        event = {
+            "job": {"metadata": {"name": JOB_NAME, "namespace": 
JOB_NAMESPACE}},
+            "namespace": JOB_NAMESPACE,
+            "status": "success",
+            "pod_names": [POD_NAME],
+            "xcom_result": None,
+        }
+
+        KubernetesJobOperator(task_id="test_task_id", get_logs=True, 
do_xcom_push=False).execute_complete(
+            context={"ti": mock.create_autospec(TaskInstance, instance=True)}, 
event=event
+        )
+
+        assert fetched_namespaces == [JOB_NAMESPACE]
+        assert pm.deleted == [POD_NAME]
 
     @pytest.mark.non_db_test_override
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
@@ -944,7 +1010,7 @@ class TestKubernetesJobOperator:
     @pytest.mark.non_db_test_override
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
     def test_on_kill(self, mock_client):
-        mock_job = mock.MagicMock()
+        mock_job = mock.create_autospec(k8s.V1Job, instance=True)
         mock_job.metadata.name = JOB_NAME
         mock_job.metadata.namespace = JOB_NAMESPACE
 
@@ -952,6 +1018,7 @@ class TestKubernetesJobOperator:
         op.job = mock_job
         op.on_kill()
 
+        assert op._killed is True
         mock_client.delete_namespaced_job.assert_called_once_with(
             name=JOB_NAME,
             namespace=JOB_NAMESPACE,
@@ -961,7 +1028,7 @@ class TestKubernetesJobOperator:
     @pytest.mark.non_db_test_override
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
     def test_on_kill_termination_grace_period(self, mock_client):
-        mock_job = mock.MagicMock()
+        mock_job = mock.create_autospec(k8s.V1Job, instance=True)
         mock_job.metadata.name = JOB_NAME
         mock_job.metadata.namespace = JOB_NAMESPACE
         mock_termination_grace_period = mock.MagicMock()
@@ -991,6 +1058,26 @@ class TestKubernetesJobOperator:
         mock_client.delete_namespaced_job.assert_not_called()
         mock_serialize.assert_not_called()
 
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
+    def test_on_kill_respects_keep_pod_action(self, mock_client, 
mock_pod_manager_prop):
+        """With ``on_kill_action="keep_pod"`` the job is deleted but 
monitoring pods survive."""
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        mock_job = mock.create_autospec(k8s.V1Job, instance=True)
+        mock_job.metadata.name = JOB_NAME
+        mock_job.metadata.namespace = JOB_NAMESPACE
+        pod = _pod(POD_NAME)
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
on_kill_action="keep_pod")
+        op.job = mock_job
+        op.pods = [pod]
+        op.on_kill()
+
+        assert op._killed is True
+        assert pm.deleted == []
+
     @pytest.mark.parametrize("parallelism", [1, 2])
     @pytest.mark.parametrize("do_xcom_push", [True, False])
     @pytest.mark.parametrize("get_logs", [True, False])
@@ -1193,6 +1280,279 @@ class TestKubernetesJobOperator:
         mock_hook.return_value.create_job.assert_not_called()
         mock_get_pods.assert_not_called()
 
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_cleans_up_all_pods_on_success(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_find_pod,
+        mock_pod_manager_prop,
+    ):
+        """Every monitoring pod is deleted after a successful job, including 
under ``parallelism>1``.
+
+        Real ``post_complete_action`` runs; only the K8s SDK boundary is faked.
+        """
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        mock_hook.return_value.is_job_failed.return_value = False
+        pod_1 = _pod("pod-1")
+        pod_2 = _pod("pod-2")
+        mock_get_pods.return_value = [pod_1, pod_2]
+        mock_find_pod.side_effect = [pod_1, pod_2]
+        mock_hook.return_value.get_pod.side_effect = lambda name, namespace: (
+            pod_1 if name == "pod-1" else pod_2
+        )
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
wait_until_job_complete=True, parallelism=2)
+        op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        assert sorted(pm.deleted) == ["pod-1", "pod-2"]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_cleans_up_pod_on_failure(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_find_pod,
+        mock_pod_manager_prop,
+    ):
+        """When the job fails, the monitoring pod is still deleted and the 
error propagates."""
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        mock_hook.return_value.is_job_failed.return_value = "Error"
+        pod = _pod("pod-1", phase="Failed")
+        mock_get_pods.return_value = [pod]
+        mock_find_pod.return_value = pod
+        mock_hook.return_value.get_pod.return_value = pod
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
wait_until_job_complete=True)
+        with pytest.raises(AirflowException, match="is failed with error"):
+            op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        assert pm.deleted == ["pod-1"]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_keep_pod_skips_cleanup(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_pod_manager_prop,
+    ):
+        """``on_finish_action=keep_pod`` leaves the monitoring pod 
untouched."""
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        mock_hook.return_value.is_job_failed.return_value = False
+        pod = _pod("pod-1")
+        mock_get_pods.return_value = [pod]
+        mock_hook.return_value.get_pod.return_value = pod
+
+        op = KubernetesJobOperator(
+            task_id="test_task_id", wait_until_job_complete=True, 
on_finish_action="keep_pod"
+        )
+        op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        assert pm.deleted == []
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_cleanup_error_on_success_does_not_raise(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_find_pod,
+        mock_pod_manager_prop,
+    ):
+        """A delete failure on one pod must not stop cleanup of the others nor 
fail the task.
+
+        Exercises the real ``post_complete_action`` against a recording 
pod_manager
+        whose first ``delete_pod`` raises at the K8s SDK boundary.
+        """
+        mock_hook.return_value.is_job_failed.return_value = False
+        pod_1 = _pod("pod-1")
+        pod_2 = _pod("pod-2")
+        mock_get_pods.return_value = [pod_1, pod_2]
+        mock_find_pod.side_effect = [pod_1, pod_2]
+        mock_hook.return_value.get_pod.side_effect = lambda name, namespace: (
+            pod_1 if name == "pod-1" else pod_2
+        )
+        pm = _recording_pod_manager()
+        original_side_effect = pm.delete_pod.side_effect
+
+        def delete_side_effect(pod):
+            if pod.metadata.name == "pod-1":
+                raise ApiException(status=500, reason="boom")
+            original_side_effect(pod)
+
+        pm.delete_pod.side_effect = delete_side_effect
+        mock_pod_manager_prop.return_value = pm
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
wait_until_job_complete=True, parallelism=2)
+        op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        # pod-1's delete blew up; pod-2 still got deleted.
+        assert pm.deleted == ["pod-2"]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_cleanup_error_does_not_mask_job_failure(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_find_pod,
+        mock_pod_manager_prop,
+    ):
+        """When the job already failed, a cleanup error must not replace the 
job-level error.
+
+        The recording side_effect lets us observe that cleanup was *attempted*
+        even when it raises — the contract is "attempt cleanup, swallow any
+        error, preserve the original job failure".
+        """
+        mock_hook.return_value.is_job_failed.return_value = "Error"
+        pod = _pod("pod-1", phase="Failed")
+        mock_get_pods.return_value = [pod]
+        mock_find_pod.return_value = pod
+        mock_hook.return_value.get_pod.return_value = pod
+        attempted: list[str] = []
+
+        def boom(pod):
+            attempted.append(pod.metadata.name)
+            raise ApiException(status=500, reason="boom")
+
+        pm = mock.create_autospec(PodManager, instance=True)
+        pm.delete_pod.side_effect = boom
+        mock_pod_manager_prop.return_value = pm
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
wait_until_job_complete=True)
+        # The job-level error wins; the cleanup ApiException is swallowed.
+        with pytest.raises(AirflowException, match="is failed with error"):
+            op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        # Cleanup was still attempted even though it raised.
+        assert attempted == ["pod-1"]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.execute_deferrable"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_does_not_cleanup_when_deferring(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_execute_deferrable,
+        mock_pod_manager_prop,
+    ):
+        """A ``TaskDeferred`` on the way out of ``execute()`` must leave pods 
alive.
+
+        The trigger still needs to watch them; cleanup happens on resume in
+        ``execute_complete``.
+        """
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        pod = _pod("pod-1")
+        mock_get_pods.return_value = [pod]
+        mock_execute_deferrable.side_effect = TaskDeferred(
+            trigger=mock.create_autospec(KubernetesJobTrigger, instance=True), 
method_name="execute_complete"
+        )
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
wait_until_job_complete=True, deferrable=True)
+        with pytest.raises(TaskDeferred):
+            op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        assert pm.deleted == []
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(HOOK_CLASS)
+    def test_execute_complete_cleans_up_pods(self, mock_hook, 
mock_pod_manager_prop):
+        """Deferrable resume path cleans up every monitoring pod listed in the 
event.
+
+        Covers ``parallelism>1`` and the original leak vector 
``get_logs=False``.
+        Real ``post_complete_action`` runs; only the K8s SDK boundary is faked.
+        """
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        pod_1 = _pod("pod-1")
+        pod_2 = _pod("pod-2")
+        mock_hook.return_value.get_pod.side_effect = lambda name, namespace: (
+            pod_1 if name == "pod-1" else pod_2
+        )
+        event = {
+            "status": "success",
+            "message": "ok",
+            "job": {"metadata": {"name": JOB_NAME, "namespace": 
JOB_NAMESPACE}},
+            "pod_names": ["pod-1", "pod-2"],
+            "pod_namespace": POD_NAMESPACE,
+            "xcom_result": None,
+        }
+
+        KubernetesJobOperator(task_id="test_task_id", 
get_logs=False).execute_complete(
+            context={"ti": mock.create_autospec(TaskInstance, instance=True)}, 
event=event
+        )
+
+        assert sorted(pm.deleted) == ["pod-1", "pod-2"]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
+    def test_on_kill_deletes_monitoring_pods(self, mock_client, 
mock_pod_manager_prop):
+        """``on_kill`` deletes the job and every monitoring pod owned by the 
operator."""
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        mock_job = mock.create_autospec(k8s.V1Job, instance=True)
+        mock_job.metadata.name = JOB_NAME
+        mock_job.metadata.namespace = JOB_NAMESPACE
+        pod_1 = _pod("pod-1")
+        pod_2 = _pod("pod-2")
+
+        op = KubernetesJobOperator(task_id="test_task_id")
+        op.job = mock_job
+        op.pods = [pod_1, pod_2]
+        op.on_kill()
+
+        assert sorted(pm.deleted) == ["pod-1", "pod-2"]
+
 
 @pytest.mark.db_test
 @pytest.mark.execution_timeout(300)
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_job.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_job.py
index 4f6f6597fcf..95d16776b81 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_job.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 from unittest import mock
 
 import pytest
+from kubernetes.client import models as k8s
 
 from airflow.providers.cncf.kubernetes.triggers.job import KubernetesJobTrigger
 from airflow.triggers.base import TriggerEvent
@@ -120,6 +121,39 @@ class TestKubernetesJobTrigger:
             }
         )
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_CLASS}.hook")
+    async def test_run_success_emits_pod_info_when_get_logs_false(self, 
mock_hook):
+        """pod_names/pod_namespace must be in the event even with 
get_logs=False.
+
+        The operator's execute_complete needs them to clean up monitoring pods.
+        """
+        trigger_no_logs = KubernetesJobTrigger(
+            job_name=JOB_NAME,
+            job_namespace=NAMESPACE,
+            pod_names=[POD_NAME],
+            pod_namespace=NAMESPACE,
+            base_container_name=CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_file=CONFIG_FILE,
+            in_cluster=IN_CLUSTER,
+            get_logs=False,
+            do_xcom_push=XCOM_PUSH,
+        )
+        mock_job = mock.create_autospec(k8s.V1Job, instance=True)
+        mock_job.metadata.name = JOB_NAME
+        mock_job.metadata.namespace = NAMESPACE
+        mock_hook.wait_until_job_complete.side_effect = 
mock.AsyncMock(return_value=mock_job)
+        mock_hook.is_job_failed.return_value = False
+
+        event_actual = await trigger_no_logs.run().asend(None)
+
+        assert event_actual.payload["pod_names"] == [POD_NAME]
+        assert event_actual.payload["pod_namespace"] == NAMESPACE
+        assert event_actual.payload["status"] == "success"
+
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_CLASS}.hook")
     async def test_run_fail(self, mock_hook, trigger):


Reply via email to