This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c8114f1f94 Fix KubernetesJobOperator failing when pods are deleted 
after job completion (#63569)
9c8114f1f94 is described below

commit 9c8114f1f94463f86a5aae42d54ac04910551572
Author: Shubham Gondane <[email protected]>
AuthorDate: Sat Mar 14 02:20:30 2026 -0700

    Fix KubernetesJobOperator failing when pods are deleted after job 
completion (#63569)
    
    * Fix KubernetesJobOperator failing when pods are deleted after job 
completion
    
    Handle 404 ApiException in execute_complete to skip log retrieval 
gracefully instead of failing the task when pods are cleaned up before Airflow 
fetches logs.
    
    * Add multi-pod test for partial pod deletion in execute_complete
---
 .../providers/cncf/kubernetes/operators/job.py     | 12 ++-
 .../unit/cncf/kubernetes/operators/test_job.py     | 86 ++++++++++++++++++++++
 2 files changed, 97 insertions(+), 1 deletion(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 22e80514996..31f000272d7 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -286,7 +286,17 @@ class KubernetesJobOperator(KubernetesPodOperator):
         if self.get_logs:
             for pod_name in event["pod_names"]:
                 pod_namespace = event["pod_namespace"]
-                pod = self.hook.get_pod(pod_name, pod_namespace)
+                try:
+                    pod = self.hook.get_pod(pod_name, pod_namespace)
+                except ApiException as e:
+                    if e.status == 404:
+                        self.log.warning(
+                            "Pod %s in namespace %s not found (possibly 
deleted). Skipping log retrieval.",
+                            pod_name,
+                            pod_namespace,
+                        )
+                        continue
+                    raise
                 if not pod:
                     raise PodNotFoundException("Could not find pod after 
resuming from deferral")
                 self._write_logs(pod)
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
index ab0b7943a3b..4e1bd0c0eb0 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
@@ -855,6 +855,92 @@ class TestKubernetesJobOperator:
 
         mock_ti.xcom_push.assert_called_once_with(key="job", value=mock_job)
 
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
+    def test_execute_complete_pod_not_found_skips_logs(self, mock_hook, 
mocked_write_logs):
+        """When a pod is deleted before log retrieval, the task should succeed 
instead of failing."""
+        from kubernetes.client.rest import ApiException
+
+        mock_ti = mock.MagicMock()
+        context = {"ti": mock_ti}
+        mock_job = mock.MagicMock()
+        event = {
+            "job": mock_job,
+            "status": "success",
+            "pod_names": [POD_NAME],
+            "pod_namespace": POD_NAMESPACE,
+            "xcom_result": None,
+        }
+
+        mock_hook.get_pod.side_effect = ApiException(status=404, reason="Not 
Found")
+
+        KubernetesJobOperator(task_id="test_task_id", get_logs=True, 
do_xcom_push=False).execute_complete(
+            context=context, event=event
+        )
+
+        mock_hook.get_pod.assert_called_once_with(POD_NAME, POD_NAMESPACE)
+        mocked_write_logs.assert_not_called()
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
+    def test_execute_complete_pod_api_error_reraises(self, mock_hook, 
mocked_write_logs):
+        """Non-404 ApiExceptions should still be raised."""
+        from kubernetes.client.rest import ApiException
+
+        mock_ti = mock.MagicMock()
+        context = {"ti": mock_ti}
+        mock_job = mock.MagicMock()
+        event = {
+            "job": mock_job,
+            "status": "success",
+            "pod_names": [POD_NAME],
+            "pod_namespace": POD_NAMESPACE,
+            "xcom_result": None,
+        }
+
+        mock_hook.get_pod.side_effect = ApiException(status=403, 
reason="Forbidden")
+
+        with pytest.raises(ApiException):
+            KubernetesJobOperator(task_id="test_task_id", get_logs=True, 
do_xcom_push=False).execute_complete(
+                context=context, event=event
+            )
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
+    def test_execute_complete_multi_pod_partial_not_found(self, mock_hook, 
mocked_write_logs):
+        """With multiple pods, deleted pods are skipped while surviving pods 
still get logs written."""
+        from kubernetes.client.rest import ApiException
+
+        mock_ti = mock.MagicMock()
+        context = {"ti": mock_ti}
+        mock_job = mock.MagicMock()
+        surviving_pod = mock.MagicMock()
+
+        event = {
+            "job": mock_job,
+            "status": "success",
+            "pod_names": ["deleted-pod", "surviving-pod"],
+            "pod_namespace": POD_NAMESPACE,
+            "xcom_result": None,
+        }
+
+        def get_pod_side_effect(name, namespace):
+            if name == "deleted-pod":
+                raise ApiException(status=404, reason="Not Found")
+            return surviving_pod
+
+        mock_hook.get_pod.side_effect = get_pod_side_effect
+
+        KubernetesJobOperator(task_id="test_task_id", get_logs=True, 
do_xcom_push=False).execute_complete(
+            context=context, event=event
+        )
+
+        assert mock_hook.get_pod.call_count == 2
+        mocked_write_logs.assert_called_once_with(surviving_pod)
+
     @pytest.mark.non_db_test_override
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
     def test_on_kill(self, mock_client):

Reply via email to