This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d8086a3db5 Add possibility to disable logging the pod template in a 
case when task fails (#31595)
d8086a3db5 is described below

commit d8086a3db5ef020ef28158249105e7ce5639b1a5
Author: Serhii Dimchenko <[email protected]>
AuthorDate: Mon Jun 5 14:15:48 2023 +0200

    Add possibility to disable logging the pod template in a case when task 
fails (#31595)
---
 airflow/providers/cncf/kubernetes/operators/pod.py    | 16 +++++++++++++---
 tests/providers/cncf/kubernetes/operators/test_pod.py | 17 ++++++++++++++++-
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 2edc3911e4..2ca80b84f5 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -225,6 +225,7 @@ class KubernetesPodOperator(BaseOperator):
         container name to use.
     :param deferrable: Run operator in the deferrable mode.
     :param poll_interval: Polling period in seconds to check for the status. 
Used only in deferrable mode.
+    :param log_pod_spec_on_failure: Log the pod's specification if a failure 
occurs
     """
 
     # This field can be overloaded at the instance level via 
base_container_name
@@ -301,6 +302,7 @@ class KubernetesPodOperator(BaseOperator):
         base_container_name: str | None = None,
         deferrable: bool = False,
         poll_interval: float = 2,
+        log_pod_spec_on_failure: bool = True,
         **kwargs,
     ) -> None:
         # TODO: remove in provider 6.0.0 release. This is a mitigate step to 
advise users to switch to the
@@ -381,6 +383,7 @@ class KubernetesPodOperator(BaseOperator):
         self.deferrable = deferrable
         self.poll_interval = poll_interval
         self.remote_pod: k8s.V1Pod | None = None
+        self.log_pod_spec_on_failure = log_pod_spec_on_failure
         self._config_dict: dict | None = None  # TODO: remove it when removing 
convert_config_file_to_dict
 
     @cached_property
@@ -676,7 +679,6 @@ class KubernetesPodOperator(BaseOperator):
             self.process_pod_deletion(remote_pod, reraise=False)
 
             error_message = get_container_termination_message(remote_pod, 
self.base_container_name)
-            error_message = "\n" + error_message if error_message else ""
             if self.skip_on_exit_code is not None:
                 container_statuses = (
                     remote_pod.status.container_statuses if remote_pod and 
remote_pod.status else None
@@ -697,8 +699,16 @@ class KubernetesPodOperator(BaseOperator):
                         f"{self.skip_on_exit_code}. Skipping."
                     )
             raise AirflowException(
-                f"Pod {pod and pod.metadata.name} returned a 
failure:\n{error_message}\n"
-                f"remote_pod: {remote_pod}"
+                "\n".join(
+                    filter(
+                        None,
+                        [
+                            f"Pod {pod and pod.metadata.name} returned a 
failure.",
+                            error_message if isinstance(error_message, str) 
else None,
+                            f"remote_pod: {remote_pod}" if 
self.log_pod_spec_on_failure else None,
+                        ],
+                    )
+                )
             )
         else:
             self.process_pod_deletion(remote_pod, reraise=False)
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 7c06fa17f9..40feb30789 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -23,7 +23,7 @@ from unittest.mock import MagicMock, patch
 
 import pendulum
 import pytest
-from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client import ApiClient, V1PodStatus, models as k8s
 from pytest import param
 from urllib3 import HTTPResponse
 from urllib3.packages.six import BytesIO
@@ -34,6 +34,7 @@ from airflow.models import DAG, DagModel, DagRun, TaskInstance
 from airflow.models.xcom import XCom
 from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator, _optionally_suppress
 from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
 from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
 from airflow.utils import timezone
 from airflow.utils.session import create_session
@@ -1537,3 +1538,17 @@ class TestKubernetesPodOperatorAsync:
             post_complete_action.assert_called_once()
         else:
             mock_manager.return_value.read_pod_logs.assert_not_called()
+
+    @pytest.mark.parametrize(
+        "log_pod_spec_on_failure,expect_match",
+        [
+            (True, r"Pod task-.* returned a failure.\nremote_pod:.*"),
+            (False, r"Pod task-.* returned a failure.(?!\nremote_pod:)"),
+        ],
+    )
+    def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, 
expect_match):
+        k = KubernetesPodOperator(task_id="task", 
log_pod_spec_on_failure=log_pod_spec_on_failure)
+        pod = k.build_pod_request_obj(create_context(k))
+        pod.status = V1PodStatus(phase=PodPhase.FAILED)
+        with pytest.raises(AirflowException, match=expect_match):
+            k.cleanup(pod, pod)

Reply via email to