This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d8086a3db5 Add possibility to disable logging the pod template in a
case when task fails (#31595)
d8086a3db5 is described below
commit d8086a3db5ef020ef28158249105e7ce5639b1a5
Author: Serhii Dimchenko <[email protected]>
AuthorDate: Mon Jun 5 14:15:48 2023 +0200
Add possibility to disable logging the pod template in a case when task
fails (#31595)
---
airflow/providers/cncf/kubernetes/operators/pod.py | 16 +++++++++++++---
tests/providers/cncf/kubernetes/operators/test_pod.py | 17 ++++++++++++++++-
2 files changed, 29 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 2edc3911e4..2ca80b84f5 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -225,6 +225,7 @@ class KubernetesPodOperator(BaseOperator):
container name to use.
:param deferrable: Run operator in the deferrable mode.
:param poll_interval: Polling period in seconds to check for the status.
Used only in deferrable mode.
+ :param log_pod_spec_on_failure: Log the pod's specification if a failure
occurs
"""
# This field can be overloaded at the instance level via
base_container_name
@@ -301,6 +302,7 @@ class KubernetesPodOperator(BaseOperator):
base_container_name: str | None = None,
deferrable: bool = False,
poll_interval: float = 2,
+ log_pod_spec_on_failure: bool = True,
**kwargs,
) -> None:
# TODO: remove in provider 6.0.0 release. This is a mitigate step to
advise users to switch to the
@@ -381,6 +383,7 @@ class KubernetesPodOperator(BaseOperator):
self.deferrable = deferrable
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
+ self.log_pod_spec_on_failure = log_pod_spec_on_failure
self._config_dict: dict | None = None # TODO: remove it when removing
convert_config_file_to_dict
@cached_property
@@ -676,7 +679,6 @@ class KubernetesPodOperator(BaseOperator):
self.process_pod_deletion(remote_pod, reraise=False)
error_message = get_container_termination_message(remote_pod,
self.base_container_name)
- error_message = "\n" + error_message if error_message else ""
if self.skip_on_exit_code is not None:
container_statuses = (
remote_pod.status.container_statuses if remote_pod and
remote_pod.status else None
@@ -697,8 +699,16 @@ class KubernetesPodOperator(BaseOperator):
f"{self.skip_on_exit_code}. Skipping."
)
raise AirflowException(
- f"Pod {pod and pod.metadata.name} returned a
failure:\n{error_message}\n"
- f"remote_pod: {remote_pod}"
+ "\n".join(
+ filter(
+ None,
+ [
+ f"Pod {pod and pod.metadata.name} returned a
failure.",
+ error_message if isinstance(error_message, str)
else None,
+ f"remote_pod: {remote_pod}" if
self.log_pod_spec_on_failure else None,
+ ],
+ )
+ )
)
else:
self.process_pod_deletion(remote_pod, reraise=False)
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 7c06fa17f9..40feb30789 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -23,7 +23,7 @@ from unittest.mock import MagicMock, patch
import pendulum
import pytest
-from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client import ApiClient, V1PodStatus, models as k8s
from pytest import param
from urllib3 import HTTPResponse
from urllib3.packages.six import BytesIO
@@ -34,6 +34,7 @@ from airflow.models import DAG, DagModel, DagRun, TaskInstance
from airflow.models.xcom import XCom
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator, _optionally_suppress
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils import timezone
from airflow.utils.session import create_session
@@ -1537,3 +1538,17 @@ class TestKubernetesPodOperatorAsync:
post_complete_action.assert_called_once()
else:
mock_manager.return_value.read_pod_logs.assert_not_called()
+
+ @pytest.mark.parametrize(
+ "log_pod_spec_on_failure,expect_match",
+ [
+ (True, r"Pod task-.* returned a failure.\nremote_pod:.*"),
+ (False, r"Pod task-.* returned a failure.(?!\nremote_pod:)"),
+ ],
+ )
+ def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure,
expect_match):
+ k = KubernetesPodOperator(task_id="task",
log_pod_spec_on_failure=log_pod_spec_on_failure)
+ pod = k.build_pod_request_obj(create_context(k))
+ pod.status = V1PodStatus(phase=PodPhase.FAILED)
+ with pytest.raises(AirflowException, match=expect_match):
+ k.cleanup(pod, pod)