This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9c8114f1f94 Fix KubernetesJobOperator failing when pods are deleted
after job completion (#63569)
9c8114f1f94 is described below
commit 9c8114f1f94463f86a5aae42d54ac04910551572
Author: Shubham Gondane <[email protected]>
AuthorDate: Sat Mar 14 02:20:30 2026 -0700
Fix KubernetesJobOperator failing when pods are deleted after job
completion (#63569)
* Fix KubernetesJobOperator failing when pods are deleted after job
completion
Handle 404 ApiException in execute_complete to skip log retrieval
gracefully instead of failing the task when pods are cleaned up before Airflow
fetches logs.
* Add multi-pod test for partial pod deletion in execute_complete
---
.../providers/cncf/kubernetes/operators/job.py | 12 ++-
.../unit/cncf/kubernetes/operators/test_job.py | 86 ++++++++++++++++++++++
2 files changed, 97 insertions(+), 1 deletion(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 22e80514996..31f000272d7 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -286,7 +286,17 @@ class KubernetesJobOperator(KubernetesPodOperator):
if self.get_logs:
for pod_name in event["pod_names"]:
pod_namespace = event["pod_namespace"]
- pod = self.hook.get_pod(pod_name, pod_namespace)
+ try:
+ pod = self.hook.get_pod(pod_name, pod_namespace)
+ except ApiException as e:
+ if e.status == 404:
+ self.log.warning(
+ "Pod %s in namespace %s not found (possibly
deleted). Skipping log retrieval.",
+ pod_name,
+ pod_namespace,
+ )
+ continue
+ raise
if not pod:
raise PodNotFoundException("Could not find pod after
resuming from deferral")
self._write_logs(pod)
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
index ab0b7943a3b..4e1bd0c0eb0 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
@@ -855,6 +855,92 @@ class TestKubernetesJobOperator:
mock_ti.xcom_push.assert_called_once_with(key="job", value=mock_job)
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
+ def test_execute_complete_pod_not_found_skips_logs(self, mock_hook,
mocked_write_logs):
+ """When a pod is deleted before log retrieval, the task should succeed
instead of failing."""
+ from kubernetes.client.rest import ApiException
+
+ mock_ti = mock.MagicMock()
+ context = {"ti": mock_ti}
+ mock_job = mock.MagicMock()
+ event = {
+ "job": mock_job,
+ "status": "success",
+ "pod_names": [POD_NAME],
+ "pod_namespace": POD_NAMESPACE,
+ "xcom_result": None,
+ }
+
+ mock_hook.get_pod.side_effect = ApiException(status=404, reason="Not
Found")
+
+ KubernetesJobOperator(task_id="test_task_id", get_logs=True,
do_xcom_push=False).execute_complete(
+ context=context, event=event
+ )
+
+ mock_hook.get_pod.assert_called_once_with(POD_NAME, POD_NAMESPACE)
+ mocked_write_logs.assert_not_called()
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
+ def test_execute_complete_pod_api_error_reraises(self, mock_hook,
mocked_write_logs):
+ """Non-404 ApiExceptions should still be raised."""
+ from kubernetes.client.rest import ApiException
+
+ mock_ti = mock.MagicMock()
+ context = {"ti": mock_ti}
+ mock_job = mock.MagicMock()
+ event = {
+ "job": mock_job,
+ "status": "success",
+ "pod_names": [POD_NAME],
+ "pod_namespace": POD_NAMESPACE,
+ "xcom_result": None,
+ }
+
+ mock_hook.get_pod.side_effect = ApiException(status=403,
reason="Forbidden")
+
+ with pytest.raises(ApiException):
+ KubernetesJobOperator(task_id="test_task_id", get_logs=True,
do_xcom_push=False).execute_complete(
+ context=context, event=event
+ )
+
+ @pytest.mark.non_db_test_override
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator._write_logs"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"))
+ def test_execute_complete_multi_pod_partial_not_found(self, mock_hook,
mocked_write_logs):
+ """With multiple pods, deleted pods are skipped while surviving pods
still get logs written."""
+ from kubernetes.client.rest import ApiException
+
+ mock_ti = mock.MagicMock()
+ context = {"ti": mock_ti}
+ mock_job = mock.MagicMock()
+ surviving_pod = mock.MagicMock()
+
+ event = {
+ "job": mock_job,
+ "status": "success",
+ "pod_names": ["deleted-pod", "surviving-pod"],
+ "pod_namespace": POD_NAMESPACE,
+ "xcom_result": None,
+ }
+
+ def get_pod_side_effect(name, namespace):
+ if name == "deleted-pod":
+ raise ApiException(status=404, reason="Not Found")
+ return surviving_pod
+
+ mock_hook.get_pod.side_effect = get_pod_side_effect
+
+ KubernetesJobOperator(task_id="test_task_id", get_logs=True,
do_xcom_push=False).execute_complete(
+ context=context, event=event
+ )
+
+ assert mock_hook.get_pod.call_count == 2
+ mocked_write_logs.assert_called_once_with(surviving_pod)
+
@pytest.mark.non_db_test_override
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.job_client"))
def test_on_kill(self, mock_client):