pshrivastava27 opened a new issue, #29432: URL: https://github.com/apache/airflow/issues/29432
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened Google Cloud Composer Version - 2.1.5 Airflow Version - 2.4.3 We are trying to use dynamic task mapping with Kubernetes Pod Operator. Our use-case is to return the pod's CPU and memory requirements from a function which is included as a macro in DAG Without dynamic task mapping it works perfectly, but when used with the dynamic task mapping, it is unable to recognize the macro. container_resources is a templated field as per the [docs](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/operators/kubernetes_pod/index.html#airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator), the feature was introduced in this [PR](https://github.com/apache/airflow/pull/27457). We also tried the toggling the boolean `render_template_as_native_obj`, but still no luck. Providing below a trimmed version of our DAG to help reproduce the issue. (function to return cpu and memory is trivial here just to show example) ### What you think should happen instead It should have worked similar with or without dynamic task mapping. ### How to reproduce Deployed the following DAG in Google Cloud Composer. ``` import datetime import os from airflow import models from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) from kubernetes.client import models as k8s_models dvt_image = os.environ.get("DVT_IMAGE") default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)} def pod_mem(): return "4000M" def pod_cpu(): return "1000m" with models.DAG( "sample_dag", schedule_interval=None, default_args=default_dag_args, render_template_as_native_obj=True, user_defined_macros={ "pod_mem": pod_mem, "pod_cpu": pod_cpu, }, ) as dag: task_1 = KubernetesPodOperator( task_id="task_1", name="task_1", namespace="default", image=dvt_image, cmds=["bash", "-cx"], arguments=["echo hello"], service_account_name="sa-k8s", container_resources=k8s_models.V1ResourceRequirements( limits={ "memory": "{{ pod_mem() }}", "cpu": "{{ pod_cpu() }}", } ), startup_timeout_seconds=1800, get_logs=True, image_pull_policy="Always", config_file="/home/airflow/composer_kube_config", dag=dag, ) task_2 = KubernetesPodOperator.partial( task_id="task_2", name="task_2", namespace="default", image=dvt_image, cmds=["bash", "-cx"], service_account_name="sa-k8s", container_resources=k8s_models.V1ResourceRequirements( limits={ "memory": "{{ pod_mem() }}", "cpu": "{{ pod_cpu() }}", } ), startup_timeout_seconds=1800, get_logs=True, image_pull_policy="Always", config_file="/home/airflow/composer_kube_config", dag=dag, ).expand(arguments=[["echo hello"]]) task_1 >> task_2 ``` task_1 (without dynamic task mapping) completes successfully, while task_2(with dynamic task mapping) fails. Looking at the error logs, it failed while rendering the Pod spec since the calls to pod_cpu() and pod_mem() are unresolved. Here is the traceback: Exception when attempting to create Namespaced Pod: { "apiVersion": "v1", "kind": "Pod", "metadata": { "annotations": {}, "labels": { "dag_id": "sample_dag", "task_id": "task_2", "run_id": "manual__2023-02-08T183926.890852Z-eee90e4ee", "kubernetes_pod_operator": "True", "map_index": "0", "try_number": "2", "airflow_version": "2.4.3-composer", "airflow_kpo_in_cluster": "False" }, "name": "task-2-46f76eb0432d42ae9a331a6fc53835b3", "namespace": "default" }, "spec": { "affinity": {}, "containers": [ { "args": [ "echo hello" ], "command": [ "bash", "-cx" ], "env": [], "envFrom": [], "image": "us.gcr.io/ams-e2e-testing/edw-dvt-tool", "imagePullPolicy": "Always", "name": "base", "ports": [], "resources": { "limits": { "memory": "{{ pod_mem() }}", "cpu": "{{ pod_cpu() }}" } }, "volumeMounts": [] } ], "hostNetwork": false, "imagePullSecrets": [], "initContainers": [], "nodeSelector": {}, "restartPolicy": "Never", "securityContext": {}, "serviceAccountName": "sa-k8s", "tolerations": [], "volumes": [] } } Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 143, in run_pod_async resp = self._client.create_namespaced_pod( File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7356, in create_namespaced_pod return self.create_namespaced_pod_with_http_info(namespace, body, **kwargs) # noqa: E501 File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7455, in create_namespaced_pod_with_http_info return self.api_client.call_api( File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api return self.__call_api(resource_path, method, File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api response_data = self.request( File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request return self.rest_client.POST(url, File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST return self.request("POST", url, File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request raise ApiException(http_resp=r) kubernetes.client.exceptions.ApiException: (400) Reason: Bad Request HTTP response headers: HTTPHeaderDict({'Audit-Id': '1ef20c0b-6980-4173-b9cc-9af5b4792e86', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '1b263a21-4c75-4ef8-8147-c18780a13f0e', 'X-Kubernetes-Pf-Prioritylevel-Uid': '3cd4cda4-908c-4944-a422-5512b0fb88d6', 'Date': 'Wed, 08 Feb 2023 18:45:23 GMT', 'Content-Length': '256'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version \"v1\" cannot be handled as a Pod: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'","reason":"BadRequest","code":400} ### Operating System Google Composer Kubernetes Cluster ### Versions of Apache Airflow Providers _No response_ ### Deployment Composer ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
