hussein-awala commented on issue #29432:
URL: https://github.com/apache/airflow/issues/29432#issuecomment-1437570393

   @jose-lpa using `Variable.get()` in the dag script is not recommended 
because the `DagFileProcessor` process the script each X minutes, and it loads 
this variable from the DB. Also this solution works with Variable but not all 
the other jinja templates.
   
   @pshrivastava27 here is a solution for your need
   ```python
   import datetime
   import os
   
   from airflow import models
   from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
       KubernetesPodOperator,
   )
   from kubernetes.client import models as k8s_models
   
   dvt_image = os.environ.get("DVT_IMAGE", "dev")
   
   default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)}
   
   
   class PatchedResourceRequirements(k8s_models.V1ResourceRequirements):
       template_fields = ("limits", "requests")
   
   
   def pod_mem():
       return "4000M"
   
   
   def pod_cpu():
       return "1000m"
   
   
   with models.DAG(
       "sample_dag",
       schedule_interval=None,
       default_args=default_dag_args,
       render_template_as_native_obj=True,
       user_defined_macros={
           "pod_mem": pod_mem,
           "pod_cpu": pod_cpu,
       },
   ) as dag:
   
       task_1 = KubernetesPodOperator(
           task_id="task_1",
           name="task_1",
           namespace="default",
           image=dvt_image,
           cmds=["bash", "-cx"],
           arguments=["echo hello"],
           service_account_name="sa-k8s",
           container_resources=PatchedResourceRequirements(
               limits={
                   "memory": "{{ pod_mem() }}",
                   "cpu": "{{ pod_cpu() }}",
               }
           ),
           startup_timeout_seconds=1800,
           get_logs=True,
           image_pull_policy="Always",
           config_file="/home/airflow/composer_kube_config",
           dag=dag,
       )
   
       task_2 = KubernetesPodOperator.partial(
           task_id="task_2",
           name="task_2",
           namespace="default",
           image=dvt_image,
           cmds=["bash", "-cx"],
           service_account_name="sa-k8s",
           container_resources=PatchedResourceRequirements(
               limits={
                   "memory": "{{ pod_mem() }}",
                   "cpu": "{{ pod_cpu() }}",
               }
           ),
           startup_timeout_seconds=1800,
           get_logs=True,
           image_pull_policy="Always",
           config_file="/home/airflow/composer_kube_config",
           dag=dag,
       ).expand(arguments=[["echo hello"]])
   
       task_1 >> task_2
   ``` 
   
   You can do the same for the other classes if needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to