pshrivastava27 commented on issue #29432:
URL: https://github.com/apache/airflow/issues/29432#issuecomment-1437887986
> @jose-lpa using `Variable.get()` in the dag script is not recommended
because the `DagFileProcessor` process the script each X minutes, and it loads
this variable from the DB. Also this solution works with Variable but not all
the other jinja templates.
>
> @pshrivastava27 here is a solution for your need
>
> ```python
> import datetime
> import os
>
> from airflow import models
> from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
> KubernetesPodOperator,
> )
> from kubernetes.client import models as k8s_models
>
> dvt_image = os.environ.get("DVT_IMAGE", "dev")
>
> default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)}
>
>
> class PatchedResourceRequirements(k8s_models.V1ResourceRequirements):
> template_fields = ("limits", "requests")
>
>
> def pod_mem():
> return "4000M"
>
>
> def pod_cpu():
> return "1000m"
>
>
> with models.DAG(
> "sample_dag",
> schedule_interval=None,
> default_args=default_dag_args,
> render_template_as_native_obj=True,
> user_defined_macros={
> "pod_mem": pod_mem,
> "pod_cpu": pod_cpu,
> },
> ) as dag:
>
> task_1 = KubernetesPodOperator(
> task_id="task_1",
> name="task_1",
> namespace="default",
> image=dvt_image,
> cmds=["bash", "-cx"],
> arguments=["echo hello"],
> service_account_name="sa-k8s",
> container_resources=PatchedResourceRequirements(
> limits={
> "memory": "{{ pod_mem() }}",
> "cpu": "{{ pod_cpu() }}",
> }
> ),
> startup_timeout_seconds=1800,
> get_logs=True,
> image_pull_policy="Always",
> config_file="/home/airflow/composer_kube_config",
> dag=dag,
> )
>
> task_2 = KubernetesPodOperator.partial(
> task_id="task_2",
> name="task_2",
> namespace="default",
> image=dvt_image,
> cmds=["bash", "-cx"],
> service_account_name="sa-k8s",
> container_resources=PatchedResourceRequirements(
> limits={
> "memory": "{{ pod_mem() }}",
> "cpu": "{{ pod_cpu() }}",
> }
> ),
> startup_timeout_seconds=1800,
> get_logs=True,
> image_pull_policy="Always",
> config_file="/home/airflow/composer_kube_config",
> dag=dag,
> ).expand(arguments=[["echo hello"]])
>
> task_1 >> task_2
> ```
>
> You can do the same for the other classes if needed.
Thanks for the help! @hussein-awala
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]