This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 47a2b9ee7f Add container_resources as KubernetesPodOperator
templatable (#27457)
47a2b9ee7f is described below
commit 47a2b9ee7f1ff2cc1cc1aa1c3d1b523c88ba29fb
Author: alanatlemba <[email protected]>
AuthorDate: Wed Nov 9 02:47:55 2022 -0600
Add container_resources as KubernetesPodOperator templatable (#27457)
closes https://github.com/apache/airflow/issues/23529
---
.../cncf/kubernetes/operators/kubernetes_pod.py | 8 ++++-
.../kubernetes/operators/test_kubernetes_pod.py | 35 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index b22d0780c7..c1735158ec 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -137,7 +137,7 @@ class KubernetesPodOperator(BaseOperator):
:param annotations: non-identifying metadata you can attach to the Pod.
Can be a large range of data, and can include characters
that are not permitted by labels.
- :param container_resources: resources for the launched pod.
+ :param container_resources: resources for the launched pod. (templated)
:param affinity: affinity scheduling rules for the launched pod.
:param config_file: The path to the Kubernetes config file. (templated)
If not specified, default value is ``~/.kube/config``
@@ -185,6 +185,7 @@ class KubernetesPodOperator(BaseOperator):
"config_file",
"pod_template_file",
"namespace",
+ "container_resources",
)
template_fields_renderers = {"env_vars": "py"}
@@ -320,6 +321,11 @@ class KubernetesPodOperator(BaseOperator):
self._do_render_template_fields(content, ("value", "name"),
context, jinja_env, seen_oids)
return
+ if id(content) not in seen_oids and isinstance(content,
k8s.V1ResourceRequirements):
+ seen_oids.add(id(content))
+ self._do_render_template_fields(content, ("limits", "requests"),
context, jinja_env, seen_oids)
+ return
+
super()._render_nested_template_fields(content, context, jinja_env,
seen_oids)
@staticmethod
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index f3d462f3c2..a354a53702 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -101,6 +101,41 @@ class TestKubernetesPodOperator:
patch.stopall()
+ def test_templates(self, create_task_instance_of_operator):
+ dag_id = "TestKubernetesPodOperator"
+ ti = create_task_instance_of_operator(
+ KubernetesPodOperator,
+ dag_id=dag_id,
+ task_id="task-id",
+ namespace="{{ dag.dag_id }}",
+ container_resources=k8s.V1ResourceRequirements(
+ requests={"memory": "{{ dag.dag_id }}", "cpu": "{{ dag.dag_id
}}"},
+ limits={"memory": "{{ dag.dag_id }}", "cpu": "{{ dag.dag_id
}}"},
+ ),
+ pod_template_file="{{ dag.dag_id }}",
+ config_file="{{ dag.dag_id }}",
+ labels="{{ dag.dag_id }}",
+ env_vars=["{{ dag.dag_id }}"],
+ arguments="{{ dag.dag_id }}",
+ cmds="{{ dag.dag_id }}",
+ image="{{ dag.dag_id }}",
+ )
+
+ rendered = ti.render_templates()
+
+ assert dag_id == rendered.container_resources.limits["memory"]
+ assert dag_id == rendered.container_resources.limits["cpu"]
+ assert dag_id == rendered.container_resources.requests["memory"]
+ assert dag_id == rendered.container_resources.requests["cpu"]
+ assert dag_id == ti.task.image
+ assert dag_id == ti.task.cmds
+ assert dag_id == ti.task.namespace
+ assert dag_id == ti.task.config_file
+ assert dag_id == ti.task.labels
+ assert dag_id == ti.task.pod_template_file
+ assert dag_id == ti.task.arguments
+ assert dag_id == ti.task.env_vars[0]
+
def run_pod(self, operator: KubernetesPodOperator, map_index: int = -1) ->
k8s.V1Pod:
with self.dag_maker(dag_id="dag") as dag:
operator.dag = dag