le-chartreux opened a new issue, #40196: URL: https://github.com/apache/airflow/issues/40196
### Apache Airflow version 2.9.2 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? Hello Airflow team, I found something I consider to be a bug, or at least an unexpected behavior. When I try to set a task_group `my_task_group` as the downstream of a task `start_task`: - If `my_task_group` **doesn't return anything**, the downstream is set to the **first** task of `my_task_group` (expected behavior). - If `my_task_group` **returns something**, the downstream is set to the task of `my_task_group` **that returns this value** (unexpected behavior). ## Examples: ### Without return value In this case, `start` is linked to `task_1_of_group` (expected behavior). #### Screenshot  #### Code ```python from airflow.decorators import dag, task_group from airflow.operators.empty import EmptyOperator from pendulum import datetime @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False) def downstream_to_first_task_of_group_without_return_value() -> None: """Downstream to the first task of a taskflow group that doesn't return a value.""" start_task = EmptyOperator(task_id='start') end_task = EmptyOperator(task_id='end') start_task >> my_task_group() >> end_task @task_group def my_task_group() -> None: t1 = EmptyOperator(task_id='task_1_of_group') t2 = EmptyOperator(task_id='task_2_of_group') t1 >> t2 # no return downstream_to_first_task_of_group_without_return_value() ``` ### With return value In this case, `start` is linked to `task_2_of_group` (because `task_2_of_group` is returned by the task). #### Screenshot  #### Code ```python from airflow.decorators import dag, task_group from airflow.operators.empty import EmptyOperator from pendulum import datetime @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False) def downstream_to_first_task_of_group_with_return_value_not_working() -> None: """Try to downstream to the first task of a taskflow group that returns a value. Downstreaming to the first task of a group when the group returns a value does not work as expected when using the '>>' operator. Indeed, a group that returns a value will return the task that produces the value, so trying to downstream a task to the group will link it to the returned task of the group. This dag shows this issue. """ start_task = EmptyOperator(task_id='start') end_task = EmptyOperator(task_id='end') my_task_group_result = my_task_group() start_task >> my_task_group_result >> end_task @task_group def my_task_group(): t1 = EmptyOperator(task_id='task_1_of_group') t2 = EmptyOperator(task_id='task_2_of_group') t1 >> t2 return t2 downstream_to_first_task_of_group_with_return_value_not_working() ``` ## Workaround A workaround I found is to use an EmptyOperator as an entrypoint. The procedure is to set it as downstream for the task before the group, then to give it to the group as a parameter and to set it as upstream task for the first task of the group. ### Screenshot  ### Code ```python from airflow.decorators import dag, task_group from airflow.operators.empty import EmptyOperator from pendulum import datetime @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False) def downstream_to_first_task_of_group_with_return_value() -> None: """Downstream to the first task of a taskflow group that returns a value. Downstreaming to the first task of a group when the group returns a value does not work as expected when using the '>>' operator. Indeed, a group that returns a value will return the task that produces the value, so trying to downstream a task to the group will link it to the returned task of the group. To work around the problem, it is possible to use an EmptyOperator as entrypoint. """ start_task = EmptyOperator(task_id='start') end_task = EmptyOperator(task_id='end') entrypoint_my_task_group = EmptyOperator(task_id="entrypoint_my_task_group") my_task_group_result = my_task_group(entrypoint_my_task_group) start_task >> entrypoint_my_task_group my_task_group_result >> end_task @task_group def my_task_group(entrypoint_my_task_group): t1 = EmptyOperator(task_id='task_1_of_group') t2 = EmptyOperator(task_id='task_2_of_group') entrypoint_my_task_group >> t1 >> t2 return t2 downstream_to_first_task_of_group_with_return_value() ``` Thank you for your work, Airflow is awesome! Best regards, Nathan Rousseau, A.K.A le-chartreux ### What you think should happen instead? The behavior of setting the downstream to a `task_group` should not change no matter this `task_group` returns something or not. As a user, I expect it to always be set to the first task of the `task_group`. ### How to reproduce Copy/paste the codes of the following DAGs (especially the one with a return value since it's the one with an unexpected behavior). ## Without return value In this case, `start` is linked to `task_1_of_group` (expected behavior). ### Screenshot  ### Code ```python from airflow.decorators import dag, task_group from airflow.operators.empty import EmptyOperator from pendulum import datetime @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False) def downstream_to_first_task_of_group_without_return_value() -> None: """Downstream to the first task of a taskflow group that doesn't return a value.""" start_task = EmptyOperator(task_id='start') end_task = EmptyOperator(task_id='end') start_task >> my_task_group() >> end_task @task_group def my_task_group() -> None: t1 = EmptyOperator(task_id='task_1_of_group') t2 = EmptyOperator(task_id='task_2_of_group') t1 >> t2 # no return downstream_to_first_task_of_group_without_return_value() ``` ## With return value In this case, `start` is linked to `task_2_of_group` (because `task_2_of_group` is returned by the task). ### Screenshot  ### Code ```python from airflow.decorators import dag, task_group from airflow.operators.empty import EmptyOperator from pendulum import datetime @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False) def downstream_to_first_task_of_group_with_return_value_not_working() -> None: """Try to downstream to the first task of a taskflow group that returns a value. Downstreaming to the first task of a group when the group returns a value does not work as expected when using the '>>' operator. Indeed, a group that returns a value will return the task that produces the value, so trying to downstream a task to the group will link it to the returned task of the group. This dag shows this issue. """ start_task = EmptyOperator(task_id='start') end_task = EmptyOperator(task_id='end') my_task_group_result = my_task_group() start_task >> my_task_group_result >> end_task @task_group def my_task_group(): t1 = EmptyOperator(task_id='task_1_of_group') t2 = EmptyOperator(task_id='task_2_of_group') t1 >> t2 return t2 downstream_to_first_task_of_group_with_return_value_not_working() ``` ## Workaround A workaround I found is to use an EmptyOperator as an entrypoint. The procedure is to set it as downstream for the task before the group, then to give it to the group as a parameter and to set it as upstream task for the first task of the group. ### Screenshot  ### Code ```python from airflow.decorators import dag, task_group from airflow.operators.empty import EmptyOperator from pendulum import datetime @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False) def downstream_to_first_task_of_group_with_return_value() -> None: """Downstream to the first task of a taskflow group that returns a value. Downstreaming to the first task of a group when the group returns a value does not work as expected when using the '>>' operator. Indeed, a group that returns a value will return the task that produces the value, so trying to downstream a task to the group will link it to the returned task of the group. To work around the problem, it is possible to use an EmptyOperator as entrypoint. """ start_task = EmptyOperator(task_id='start') end_task = EmptyOperator(task_id='end') entrypoint_my_task_group = EmptyOperator(task_id="entrypoint_my_task_group") my_task_group_result = my_task_group(entrypoint_my_task_group) start_task >> entrypoint_my_task_group my_task_group_result >> end_task @task_group def my_task_group(entrypoint_my_task_group): t1 = EmptyOperator(task_id='task_1_of_group') t2 = EmptyOperator(task_id='task_2_of_group') entrypoint_my_task_group >> t1 >> t2 return t2 downstream_to_first_task_of_group_with_return_value() ``` ### Operating System Red Hat Enterprise Linux 8.9 (Ootpa) ### Versions of Apache Airflow Providers apache-airflow-providers-common-sql==1.13.0 apache-airflow-providers-fab==1.1.0 apache-airflow-providers-ftp==3.9.0 apache-airflow-providers-http==4.11.0 apache-airflow-providers-imap==3.6.0 apache-airflow-providers-smtp==1.7.0 apache-airflow-providers-sqlite==3.8.0 ### Deployment Virtualenv installation ### Deployment details I just used the standard pip install inside a venv. ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
