pshrivastava27 opened a new issue, #29432:
URL: https://github.com/apache/airflow/issues/29432

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Google Cloud Composer Version - 2.1.5
   Airflow Version - 2.4.3
   
   We are trying to use dynamic task mapping with Kubernetes Pod Operator. Our 
use-case is to return the pod's CPU and memory requirements from a function 
which is included as a macro in DAG
   
   Without dynamic task mapping it works perfectly, but when used with the 
dynamic task mapping, it is unable to recognize the macro.
   
   container_resources is a templated field as per the 
[docs](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/operators/kubernetes_pod/index.html#airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator),
 the feature was introduced in this 
[PR](https://github.com/apache/airflow/pull/27457).
   We also tried the toggling the boolean `render_template_as_native_obj`, but 
still no luck.
   
   Providing below a trimmed version of our DAG to help reproduce the issue. 
(function to return cpu and memory is trivial here just to show example)
   
   
   ### What you think should happen instead
   
   It should have worked similar with or without dynamic task mapping.
   
   ### How to reproduce
   
   Deployed the following DAG in Google Cloud Composer.
   ```
   import datetime
   import os
   
   from airflow import models
   from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
       KubernetesPodOperator,
   )
   from kubernetes.client import models as k8s_models
   
   dvt_image = os.environ.get("DVT_IMAGE")
   
   default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)}
   
   
   def pod_mem():
       return "4000M"
   
   
   def pod_cpu():
       return "1000m"
   
   
   with models.DAG(
       "sample_dag",
       schedule_interval=None,
       default_args=default_dag_args,
       render_template_as_native_obj=True,
       user_defined_macros={
           "pod_mem": pod_mem,
           "pod_cpu": pod_cpu,
       },
   ) as dag:
   
       task_1 = KubernetesPodOperator(
           task_id="task_1",
           name="task_1",
           namespace="default",
           image=dvt_image,
           cmds=["bash", "-cx"],
           arguments=["echo hello"],
           service_account_name="sa-k8s",
           container_resources=k8s_models.V1ResourceRequirements(
               limits={
                   "memory": "{{ pod_mem() }}",
                   "cpu": "{{ pod_cpu() }}",
               }
           ),
           startup_timeout_seconds=1800,
           get_logs=True,
           image_pull_policy="Always",
           config_file="/home/airflow/composer_kube_config",
           dag=dag,
       )
   
       task_2 = KubernetesPodOperator.partial(
           task_id="task_2",
           name="task_2",
           namespace="default",
           image=dvt_image,
           cmds=["bash", "-cx"],
           service_account_name="sa-k8s",
           container_resources=k8s_models.V1ResourceRequirements(
               limits={
                   "memory": "{{ pod_mem() }}",
                   "cpu": "{{ pod_cpu() }}",
               }
           ),
           startup_timeout_seconds=1800,
           get_logs=True,
           image_pull_policy="Always",
           config_file="/home/airflow/composer_kube_config",
           dag=dag,
       ).expand(arguments=[["echo hello"]])
   
       task_1 >> task_2
   ```
   
   task_1 (without dynamic task mapping) completes successfully, while 
task_2(with dynamic task mapping) fails. 
   
   Looking at the error logs, it failed while rendering the Pod spec since the 
calls to pod_cpu() and pod_mem() are unresolved.
   
   Here is the traceback:
   
    Exception when attempting to create Namespaced Pod: {   "apiVersion": "v1", 
  "kind": "Pod",   "metadata": {     "annotations": {},     "labels": {       
"dag_id": "sample_dag",       "task_id": "task_2",       "run_id": 
"manual__2023-02-08T183926.890852Z-eee90e4ee",       "kubernetes_pod_operator": 
"True",       "map_index": "0",       "try_number": "2",       
"airflow_version": "2.4.3-composer",       "airflow_kpo_in_cluster": "False"    
 },     "name": "task-2-46f76eb0432d42ae9a331a6fc53835b3",     "namespace": 
"default"   },   "spec": {     "affinity": {},     "containers": [       {      
   "args": [           "echo hello"         ],         "command": [           
"bash",           "-cx"         ],         "env": [],         "envFrom": [],    
     "image": "us.gcr.io/ams-e2e-testing/edw-dvt-tool",         
"imagePullPolicy": "Always",         "name": "base",         "ports": [],       
  "resources": {           "limits": {             "memory": "{{ pod_mem() }}", 
            
 "cpu": "{{ pod_cpu() }}"           }         },         "volumeMounts": []     
  }     ],     "hostNetwork": false,     "imagePullSecrets": [],     
"initContainers": [],     "nodeSelector": {},     "restartPolicy": "Never",     
"securityContext": {},     "serviceAccountName": "sa-k8s",     "tolerations": 
[],     "volumes": []   } }
   Traceback (most recent call last):
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
 line 143, in run_pod_async
       resp = self._client.create_namespaced_pod(
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py",
 line 7356, in create_namespaced_pod
       return self.create_namespaced_pod_with_http_info(namespace, body, 
**kwargs)  # noqa: E501
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py",
 line 7455, in create_namespaced_pod_with_http_info
       return self.api_client.call_api(
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", 
line 348, in call_api
       return self.__call_api(resource_path, method,
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", 
line 180, in __call_api
       response_data = self.request(
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", 
line 391, in request
       return self.rest_client.POST(url,
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 
275, in POST
       return self.request("POST", url,
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 
234, in request
       raise ApiException(http_resp=r)
   kubernetes.client.exceptions.ApiException: (400)
   Reason: Bad Request
   HTTP response headers: HTTPHeaderDict({'Audit-Id': 
'1ef20c0b-6980-4173-b9cc-9af5b4792e86', 'Cache-Control': 'no-cache, private', 
'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 
'1b263a21-4c75-4ef8-8147-c18780a13f0e', 'X-Kubernetes-Pf-Prioritylevel-Uid': 
'3cd4cda4-908c-4944-a422-5512b0fb88d6', 'Date': 'Wed, 08 Feb 2023 18:45:23 
GMT', 'Content-Length': '256'})
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod
 in version \"v1\" cannot be handled as a Pod: quantities must match the 
regular expression 
'^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'","reason":"BadRequest","code":400}
   
   
   
   ### Operating System
   
   Google Composer Kubernetes Cluster
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to