frankbreetz opened a new issue #17686:
URL: https://github.com/apache/airflow/issues/17686


   <!--
   Welcome to Apache Airflow!
   
   Please complete the next sections or the issue will be closed.
   -->
   
   **Apache Airflow version**:2.1.1
   
   <!-- AIRFLOW VERSION IS MANDATORY -->
   
   **OS**:Debian GNU/Linux 10
   
   <!-- MANDATORY! You can get it via `cat /etc/oss-release` for example -->
   
   **Apache Airflow Provider versions**: 
   
   <!-- You can use `pip freeze | grep apache-airflow-providers` (you can leave 
only relevant ones)-->
   
   **Deployment**:Astronomer
   
   <!-- e.g. Virtualenv / VM / Docker-compose / K8S / Helm Chart / Managed 
Airflow Service -->
   
   <!-- Please include your deployment tools and versions: docker-compose, k8s, 
helm, etc -->
   
   **What happened**:
   When Using the TaskFlow API To implement XComms, you can get a unwanted 
dependency created in the UI
   <details><summary> XCOMM DAG  using TaskFlow API</summary>
   
   ```
   from datetime import datetime, timedelta
   from airflow.decorators import dag
   from airflow.decorators import task
   
   def get_run_date():
       return '01-01-2021'
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5)
   }
   @dag (default_args=default_args,
         schedule_interval=None,
           start_date=datetime(2021, 8, 1),
          dagrun_timeout=timedelta(hours=4),
          max_active_runs=1)
   def xcomm_taskflow_dag():
       @task()
       def set_date():
           date ="1-2-21"
           return date
       @task()
       def xcomm_task2(date):
           print(f"xcomm_task2:{date}")
       @task()
       def xcomm_task3(date):
           print(f"xcomm_task3:{date}")
   
       set_date=set_date()
       task2=xcomm_task2(set_date)
       task3=xcomm_task3(set_date)
       set_date>>task2>>task3
   
   xcomm_taskflow_dag=xcomm_taskflow_dag()
   ```
   
   </details>
   
   Because both downstream tasks use the varible created by the first task 
there is a direct connection from each of the task to the first task. 
   
   ![Screen Shot 2021-08-18 at 10 06 11 
AM](https://user-images.githubusercontent.com/24467723/129924392-ed364261-6bd9-4597-b9b6-5be53d62ab6c.png)
   
   This also duplicates tasks in the Tree View (xcomm_task3)
   ![Screen Shot 2021-08-18 at 10 08 42 
AM](https://user-images.githubusercontent.com/24467723/129924405-496e4688-81b3-43df-98ef-3210733374e1.png)
   
   This is a simple example, but one can imagine it could become quite unwieldy 
with complex DAGs
   
   <!-- Please include exact error messages if you can -->
   
   **What you expected to happen**:
   If we create an XCOMM without using the TaskFlow API, there is no unwanted 
dependency created, just the one explicitly stated using the bit shift 
operators 
   
   <details><summary> XCOMM DAG without  Using TaskFlow API</summary>
   
   
   ```
   from airflow.operators.python_operator import PythonOperator
   from datetime import datetime, timedelta
   from airflow.decorators import dag
   
   def get_run_date(ti):
       date= '01-01-2021'
       ti.xcom_push(key="date",value=date)
   def var_fun2(ti):
       date=ti.xcom_pull(key="date",task_ids="get_date")
       print(date)
   
   def var_fun3(ti):
       date=ti.xcom_pull(key="date",task_ids="get_date")
       print(date)
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5)
   }
   
   @dag (default_args=default_args,
         schedule_interval=None,
           start_date=datetime(2021, 8, 1),
          dagrun_timeout=timedelta(hours=4),
          max_active_runs=1)
   def xcomm_dag():
       set_date=PythonOperator(
           task_id="get_date",
           python_callable=get_run_date,
           provide_context=True
       )
   
       var_task2= PythonOperator(
           task_id="xcomm_task2",
           python_callable = var_fun2,
           provide_context=True
       )
   
       var_task3= PythonOperator(
           task_id="xcomm_task3",
           python_callable = var_fun3,
           provide_context=True
       )
   
       set_date >> var_task2 >> var_task3
   
   xcomm_dag=xcomm_dag()
   ```
   
   </details>
   No there is no drect connection to downstream tasks in the Graph View
   
   ![Screen Shot 2021-08-18 at 10 14 13 
AM](https://user-images.githubusercontent.com/24467723/129924659-fd7e9172-e5ad-40d4-bfe9-f5e8bcab21a5.png)
   
   There are no duplicated tasks in the Tree View
   
   ![Screen Shot 2021-08-18 at 10 14 31 
AM](https://user-images.githubusercontent.com/24467723/129924667-f52bf04c-5271-4444-aeb2-9ed6fcdd941e.png)
   
   <!-- What do you think went wrong? -->
   
   **How to reproduce it**:
   Upload the Included DAGs and View in the UI
   <!--
   As minimally and precisely as possible. Keep in mind we do not have access 
to your cluster or dags.
   If this is a UI bug, please provide a screenshot of the bug or a link to a 
youtube video of the bug in action
   You can include images/screen-casts etc. by drag-dropping the image here.
   -->
   
   **Anything else we need to know**: I understand how this could be designed 
this way as there is not only a downstream dependency, but direct dependency 
between the first DAG and downstream DAG. The old version did not create this 
dependency and the new TaskFlow API does. This may be feature request along the 
lines of Config setting to disable downstream dependencies or not create a 
dependency based of the TaskFlow API, or at least not have them shown in the 
UI. It would be nice to have clearer code like in the TaskFlow API example and 
also clearer UI like in the non-TaskFlow API example.
   
   <!--
   How often does this problem occur? Once? Every time etc?
   Any relevant logs to include? Put them here inside fenced
   ``` ``` blocks or inside a foldable details tag if it's long:
   <details><summary>x.log</summary> lots of stuff </details>
   -->
   
   **Are you willing to submit a PR?**
   
   
   <!---
   This is absolutely not required, but we are happy to guide you in 
contribution process
   especially if you already have a good understanding of how to implement the 
fix.
   Airflow is a community-managed project and we love to bring new contributors 
in.
   Find us in #airflow-how-to-pr on Slack!
    -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to