frankbreetz opened a new issue #17686:
URL: https://github.com/apache/airflow/issues/17686
<!--
Welcome to Apache Airflow!
Please complete the next sections or the issue will be closed.
-->
**Apache Airflow version**:2.1.1
<!-- AIRFLOW VERSION IS MANDATORY -->
**OS**:Debian GNU/Linux 10
<!-- MANDATORY! You can get it via `cat /etc/oss-release` for example -->
**Apache Airflow Provider versions**:
<!-- You can use `pip freeze | grep apache-airflow-providers` (you can leave
only relevant ones)-->
**Deployment**:Astronomer
<!-- e.g. Virtualenv / VM / Docker-compose / K8S / Helm Chart / Managed
Airflow Service -->
<!-- Please include your deployment tools and versions: docker-compose, k8s,
helm, etc -->
**What happened**:
When Using the TaskFlow API To implement XComms, you can get a unwanted
dependency created in the UI
<details><summary> XCOMM DAG using TaskFlow API</summary>
```
from datetime import datetime, timedelta
from airflow.decorators import dag
from airflow.decorators import task
def get_run_date():
return '01-01-2021'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
@dag (default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 8, 1),
dagrun_timeout=timedelta(hours=4),
max_active_runs=1)
def xcomm_taskflow_dag():
@task()
def set_date():
date ="1-2-21"
return date
@task()
def xcomm_task2(date):
print(f"xcomm_task2:{date}")
@task()
def xcomm_task3(date):
print(f"xcomm_task3:{date}")
set_date=set_date()
task2=xcomm_task2(set_date)
task3=xcomm_task3(set_date)
set_date>>task2>>task3
xcomm_taskflow_dag=xcomm_taskflow_dag()
```
</details>
Because both downstream tasks use the varible created by the first task
there is a direct connection from each of the task to the first task.

This also duplicates tasks in the Tree View (xcomm_task3)

This is a simple example, but one can imagine it could become quite unwieldy
with complex DAGs
<!-- Please include exact error messages if you can -->
**What you expected to happen**:
If we create an XCOMM without using the TaskFlow API, there is no unwanted
dependency created, just the one explicitly stated using the bit shift
operators
<details><summary> XCOMM DAG without Using TaskFlow API</summary>
```
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from airflow.decorators import dag
def get_run_date(ti):
date= '01-01-2021'
ti.xcom_push(key="date",value=date)
def var_fun2(ti):
date=ti.xcom_pull(key="date",task_ids="get_date")
print(date)
def var_fun3(ti):
date=ti.xcom_pull(key="date",task_ids="get_date")
print(date)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
@dag (default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 8, 1),
dagrun_timeout=timedelta(hours=4),
max_active_runs=1)
def xcomm_dag():
set_date=PythonOperator(
task_id="get_date",
python_callable=get_run_date,
provide_context=True
)
var_task2= PythonOperator(
task_id="xcomm_task2",
python_callable = var_fun2,
provide_context=True
)
var_task3= PythonOperator(
task_id="xcomm_task3",
python_callable = var_fun3,
provide_context=True
)
set_date >> var_task2 >> var_task3
xcomm_dag=xcomm_dag()
```
</details>
No there is no drect connection to downstream tasks in the Graph View

There are no duplicated tasks in the Tree View

<!-- What do you think went wrong? -->
**How to reproduce it**:
Upload the Included DAGs and View in the UI
<!--
As minimally and precisely as possible. Keep in mind we do not have access
to your cluster or dags.
If this is a UI bug, please provide a screenshot of the bug or a link to a
youtube video of the bug in action
You can include images/screen-casts etc. by drag-dropping the image here.
-->
**Anything else we need to know**: I understand how this could be designed
this way as there is not only a downstream dependency, but direct dependency
between the first DAG and downstream DAG. The old version did not create this
dependency and the new TaskFlow API does. This may be feature request along the
lines of Config setting to disable downstream dependencies or not create a
dependency based of the TaskFlow API, or at least not have them shown in the
UI. It would be nice to have clearer code like in the TaskFlow API example and
also clearer UI like in the non-TaskFlow API example.
<!--
How often does this problem occur? Once? Every time etc?
Any relevant logs to include? Put them here inside fenced
``` ``` blocks or inside a foldable details tag if it's long:
<details><summary>x.log</summary> lots of stuff </details>
-->
**Are you willing to submit a PR?**
<!---
This is absolutely not required, but we are happy to guide you in
contribution process
especially if you already have a good understanding of how to implement the
fix.
Airflow is a community-managed project and we love to bring new contributors
in.
Find us in #airflow-how-to-pr on Slack!
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]