mfjackson opened a new issue #13522:
URL: https://github.com/apache/airflow/issues/13522


   I'm encountering an issue that I believe is similar to #13348, but am 
opening a new issue to expand on what is happening in more detail.
   
   **Apache Airflow version**: 2.0.0
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): 1.19.4
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: GCP, using GKE
   
   
   **What happened**:
   
   My DAG in Airflow 1.10.13 was using a couple of built-in variables and Jinja 
templating to pass information between `KubernetesPodOperator` tasks (e.g. file 
paths, execution dates); it looked similar to this:
   
   ```python
   from airflow import DAG
   from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
   
   default_args = {'owner': 'airflow'}
   
   example_workflow = DAG('my_dag',
                          default_args=default_args,
                          default_view='graph',
                          schedule_interval='@daily')
   
   with example_workflow:
       example_k8s_task = KubernetesPodOperator(namespace='my_namespace',
                                                image="ubuntu:latest",
                                                name="pod1",
                                                task_id='my_task_id',
                                                env_vars={'TASK_ID': '{{ 
task.task_id }}'},
                                                cmds=["echo $TASK_ID"]
                                                )
   
       example_k8s_task
   ```
   
   which successfully echoed `'my_task_id'` in the logs when the DAG was 
triggered and ran successfully.
   
   With the changes to the `KubernetesPodOperator` in Airflow 2.0, I followed 
the guidelines 
[here](https://airflow.apache.org/docs/apache-airflow/stable/upgrading-to-2.html#changed-parameters-for-the-kubernetespodoperator)
 and refactored the above code to look like this:
   
   ```python
   from airflow import DAG
   from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
   from kubernetes.client import models as k8s
   
   default_args = {'owner': 'airflow'}
   
   my_env_vars = [
       k8s.V1EnvVar(
           name='TASK_ID',
           value='{{ task.task_id }}'
       )]
   
   example_workflow = DAG(dag_id='my_dag',
                          default_args=default_args,
                          default_view='graph',
                          schedule_interval='@daily')
   
   with example_workflow:
       example_k8s_task = KubernetesPodOperator(namespace='my_namespace',
                                                image="ubuntu:latest",
                                                name="pod1",
                                                task_id='my_task_id',
                                                env_vars=my_env_vars,
                                                cmds=["echo $TASK_ID"]
                                                )
   
       example_k8s_task
   ```
   
   However, this echoed `'{{ task.task_id }}'` in the logs, meaning the 
templated variable did not render properly.
   
   **What I expected to happen**:
   
   I expected to have the templated variable rendered properly and return the 
variable value, e.g. `'my_task_id'`.
   
   **What I think actually happened**:
   
   While `KubernetesPodOperator` `env_var` parameter is marked as "(templated)" 
in the [source 
code](https://github.com/apache/airflow/blob/9c75ea3c14b71d2f96d997aeef68c764c7d2984c/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L83),
 when a string is passed in the `value` parameter of `kubernetes.client` 
`V1EnvVar` object ([source 
code](https://github.com/kubernetes-client/python/blob/b79ad6837b2f5326c7dad488a64eed7c3987e856/kubernetes/client/models/v1_env_var.py)),
 it does not render the template, but rather treats it as a literal string.
   
   **How to reproduce it**:
   
   Set up Airflow on minikube or use your cloud provider's Airflow instance (so 
easy to set up, I know /s) and run the second Python code snippet above (using 
Airflow 2.0).
   
   Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to