This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bcc7856b80 Add `termination_message_policy` parameter to
`KubernetesPodOperator` (#32885)
bcc7856b80 is described below
commit bcc7856b80ee6f48de0395089b89424cf4b1c98a
Author: Daniele Cesarini <[email protected]>
AuthorDate: Sat Jul 29 10:52:48 2023 +0100
Add `termination_message_policy` parameter to `KubernetesPodOperator`
(#32885)
* KubernetesPodOperator: add termination_message_policy option
Allow setting termination_message_policy in the Pod container
---------
Co-authored-by: eladkal <[email protected]>
Co-authored-by: Hussein Awala <[email protected]>
---
airflow/providers/cncf/kubernetes/operators/pod.py | 5 +++++
kubernetes_tests/test_kubernetes_pod_operator.py | 2 ++
tests/providers/cncf/kubernetes/operators/test_pod.py | 17 +++++++++++++++++
3 files changed, 24 insertions(+)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 6e6acbec4e..704745e433 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -238,6 +238,8 @@ class KubernetesPodOperator(BaseOperator):
state, or the execution is interrupted. If True (default), delete the
pod; if False, leave the pod.
Deprecated - use `on_finish_action` instead.
+ :param termination_message_policy: The termination message policy of the
base container.
+ Default value is "File"
"""
# This field can be overloaded at the instance level via
base_container_name
@@ -317,6 +319,7 @@ class KubernetesPodOperator(BaseOperator):
log_pod_spec_on_failure: bool = True,
on_finish_action: str = "delete_pod",
is_delete_operator_pod: None | bool = None,
+ termination_message_policy: str = "File",
**kwargs,
) -> None:
# TODO: remove in provider 6.0.0 release. This is a mitigate step to
advise users to switch to the
@@ -415,6 +418,7 @@ class KubernetesPodOperator(BaseOperator):
else:
self.on_finish_action = OnFinishAction(on_finish_action)
self.is_delete_operator_pod = self.on_finish_action ==
OnFinishAction.DELETE_POD
+ self.termination_message_policy = termination_message_policy
self._config_dict: dict | None = None # TODO: remove it when removing
convert_config_file_to_dict
@@ -838,6 +842,7 @@ class KubernetesPodOperator(BaseOperator):
env=self.env_vars,
env_from=self.env_from,
security_context=self.container_security_context,
+
termination_message_policy=self.termination_message_policy,
)
],
image_pull_secrets=self.image_pull_secrets,
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index d4a621055e..002394611d 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -124,6 +124,7 @@ class TestKubernetesPodOperatorSystem:
"envFrom": [],
"name": "base",
"ports": [],
+ "terminationMessagePolicy": "File",
"volumeMounts": [],
}
],
@@ -957,6 +958,7 @@ class TestKubernetesPodOperatorSystem:
"name": "base",
"ports": [],
"resources": {"limits": {"memory": "200Mi"},
"requests": {"memory": "100Mi"}},
+ "terminationMessagePolicy": "File",
"volumeMounts": [{"mountPath": "/airflow/xcom",
"name": "xcom"}],
},
{
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 4383ae32f6..3c1b752f71 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -589,6 +589,23 @@ class TestKubernetesPodOperator:
pod = k.build_pod_request_obj(create_context(k))
assert pod.spec.containers[0].image_pull_policy == "Always"
+ def test_termination_message_policy_correctly_set(self):
+ k = KubernetesPodOperator(
+ task_id="task",
+ termination_message_policy="FallbackToLogsOnError",
+ )
+
+ pod = k.build_pod_request_obj(create_context(k))
+ assert pod.spec.containers[0].termination_message_policy ==
"FallbackToLogsOnError"
+
+ def test_termination_message_policy_default_value_correctly_set(self):
+ k = KubernetesPodOperator(
+ task_id="task",
+ )
+
+ pod = k.build_pod_request_obj(create_context(k))
+ assert pod.spec.containers[0].termination_message_policy == "File"
+
@pytest.mark.parametrize(
"task_kwargs, should_be_deleted",
[