le-chartreux opened a new issue, #40196:
URL: https://github.com/apache/airflow/issues/40196

   ### Apache Airflow version
   
   2.9.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Hello Airflow team,
   
   I found something I consider to be a bug, or at least an unexpected behavior.
   
   When I try to set a task_group `my_task_group` as the downstream of a task 
`start_task`:
   
   - If `my_task_group` **doesn't return anything**, the downstream is set to 
the **first** task of `my_task_group` (expected behavior).
   - If `my_task_group` **returns something**, the downstream is set to the 
task of `my_task_group` **that returns this value** (unexpected behavior).
   
   ## Examples:
   
   ### Without return value
   
   In this case, `start` is linked to `task_1_of_group` (expected behavior).
   
   #### Screenshot
   
   ![downstream to first task of group without return 
value](https://github.com/apache/airflow/assets/59126928/3ce9b892-1dc9-4398-8ee2-ac32e6d0bf80)
   
   #### Code
   
   ```python
   from airflow.decorators import dag, task_group
   from airflow.operators.empty import EmptyOperator
   from pendulum import datetime
   
   
   @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
   def downstream_to_first_task_of_group_without_return_value() -> None:
       """Downstream to the first task of a taskflow group that doesn't return 
a value."""
       start_task = EmptyOperator(task_id='start')
       end_task = EmptyOperator(task_id='end')
       start_task >> my_task_group() >> end_task
   
   
   @task_group
   def my_task_group() -> None:
       t1 = EmptyOperator(task_id='task_1_of_group')
       t2 = EmptyOperator(task_id='task_2_of_group')
       t1 >> t2
       # no return
   
   
   downstream_to_first_task_of_group_without_return_value()
   ```
   
   ### With return value
   
   In this case, `start` is linked to `task_2_of_group` (because 
`task_2_of_group` is returned by the task).
   
   
   #### Screenshot
   
   ![downstream to first task of group with return value not 
working](https://github.com/apache/airflow/assets/59126928/3c1e614f-a1ec-4283-9bb7-a0b787e87f83)
   
   #### Code
   
   ```python
   from airflow.decorators import dag, task_group
   from airflow.operators.empty import EmptyOperator
   from pendulum import datetime
   
   
   @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
   def downstream_to_first_task_of_group_with_return_value_not_working() -> 
None:
       """Try to downstream to the first task of a taskflow group that returns 
a value.
       
       Downstreaming to the first task of a group when the group returns a 
value does not
       work as expected when using the '>>' operator.
       Indeed, a group that returns a value will return the task that produces 
the value, 
       so trying to downstream a task to the group will link it to the returned 
task of 
       the group.
       This dag shows this issue.
       """
       start_task = EmptyOperator(task_id='start')
       end_task = EmptyOperator(task_id='end')
       my_task_group_result = my_task_group()
       start_task >> my_task_group_result >> end_task
   
   
   @task_group
   def my_task_group():
       t1 = EmptyOperator(task_id='task_1_of_group')
       t2 = EmptyOperator(task_id='task_2_of_group')
       t1 >> t2
       return t2
   
   
   downstream_to_first_task_of_group_with_return_value_not_working()
   ```
   
   ## Workaround
   
   A workaround I found is to use an EmptyOperator as an entrypoint.
   The procedure is to set it as downstream for the task before the group, then 
to give it to the group as a parameter and to set it as upstream task for the 
first task of the group.
   
   ### Screenshot
   
   ![downstream to first task of group with return 
value](https://github.com/apache/airflow/assets/59126928/608e53e6-5e30-4db6-8d57-e19b4485f4a5)
   
   ### Code
   
   ```python
   from airflow.decorators import dag, task_group
   from airflow.operators.empty import EmptyOperator
   from pendulum import datetime
   
   
   @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
   def downstream_to_first_task_of_group_with_return_value() -> None:
       """Downstream to the first task of a taskflow group that returns a value.
   
       Downstreaming to the first task of a group when the group returns a 
value does not
       work as expected when using the '>>' operator.
       Indeed, a group that returns a value will return the task that produces 
the value, 
       so trying to downstream a task to the group will link it to the returned 
task of 
       the group.
       To work around the problem, it is possible to use an EmptyOperator as 
entrypoint.
       """
       start_task = EmptyOperator(task_id='start')
       end_task = EmptyOperator(task_id='end')
       entrypoint_my_task_group = 
EmptyOperator(task_id="entrypoint_my_task_group")
       my_task_group_result = my_task_group(entrypoint_my_task_group)
       start_task >> entrypoint_my_task_group
       my_task_group_result >> end_task
   
   
   @task_group
   def my_task_group(entrypoint_my_task_group):
       t1 = EmptyOperator(task_id='task_1_of_group')
       t2 = EmptyOperator(task_id='task_2_of_group')
       entrypoint_my_task_group >> t1 >> t2
       return t2
   
   
   downstream_to_first_task_of_group_with_return_value()
   ```
   
   Thank you for your work, Airflow is awesome!
   Best regards,
   
   Nathan Rousseau, A.K.A le-chartreux
   
   
   
   ### What you think should happen instead?
   
   The behavior of setting the downstream to a `task_group` should not change 
no matter this `task_group` returns something or not.
   As a user, I expect it to always be set to the first task of the 
`task_group`.
   
   ### How to reproduce
   
   Copy/paste the codes of the following DAGs (especially the one with a return 
value since it's the one with an unexpected behavior).
   
   ## Without return value
   
   In this case, `start` is linked to `task_1_of_group` (expected behavior).
   
   ### Screenshot
   
   ![downstream to first task of group without return 
value](https://github.com/apache/airflow/assets/59126928/3ce9b892-1dc9-4398-8ee2-ac32e6d0bf80)
   
   ### Code
   
   ```python
   from airflow.decorators import dag, task_group
   from airflow.operators.empty import EmptyOperator
   from pendulum import datetime
   
   
   @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
   def downstream_to_first_task_of_group_without_return_value() -> None:
       """Downstream to the first task of a taskflow group that doesn't return 
a value."""
       start_task = EmptyOperator(task_id='start')
       end_task = EmptyOperator(task_id='end')
       start_task >> my_task_group() >> end_task
   
   
   @task_group
   def my_task_group() -> None:
       t1 = EmptyOperator(task_id='task_1_of_group')
       t2 = EmptyOperator(task_id='task_2_of_group')
       t1 >> t2
       # no return
   
   
   downstream_to_first_task_of_group_without_return_value()
   ```
   
   ## With return value
   
   In this case, `start` is linked to `task_2_of_group` (because 
`task_2_of_group` is returned by the task).
   
   
   ### Screenshot
   
   ![downstream to first task of group with return value not 
working](https://github.com/apache/airflow/assets/59126928/3c1e614f-a1ec-4283-9bb7-a0b787e87f83)
   
   ### Code
   
   ```python
   from airflow.decorators import dag, task_group
   from airflow.operators.empty import EmptyOperator
   from pendulum import datetime
   
   
   @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
   def downstream_to_first_task_of_group_with_return_value_not_working() -> 
None:
       """Try to downstream to the first task of a taskflow group that returns 
a value.
       
       Downstreaming to the first task of a group when the group returns a 
value does not
       work as expected when using the '>>' operator.
       Indeed, a group that returns a value will return the task that produces 
the value, 
       so trying to downstream a task to the group will link it to the returned 
task of 
       the group.
       This dag shows this issue.
       """
       start_task = EmptyOperator(task_id='start')
       end_task = EmptyOperator(task_id='end')
       my_task_group_result = my_task_group()
       start_task >> my_task_group_result >> end_task
   
   
   @task_group
   def my_task_group():
       t1 = EmptyOperator(task_id='task_1_of_group')
       t2 = EmptyOperator(task_id='task_2_of_group')
       t1 >> t2
       return t2
   
   
   downstream_to_first_task_of_group_with_return_value_not_working()
   ```
   
   ## Workaround
   
   A workaround I found is to use an EmptyOperator as an entrypoint.
   The procedure is to set it as downstream for the task before the group, then 
to give it to the group as a parameter and to set it as upstream task for the 
first task of the group.
   
   ### Screenshot
   
   ![downstream to first task of group with return 
value](https://github.com/apache/airflow/assets/59126928/608e53e6-5e30-4db6-8d57-e19b4485f4a5)
   
   ### Code
   
   ```python
   from airflow.decorators import dag, task_group
   from airflow.operators.empty import EmptyOperator
   from pendulum import datetime
   
   
   @dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
   def downstream_to_first_task_of_group_with_return_value() -> None:
       """Downstream to the first task of a taskflow group that returns a value.
   
       Downstreaming to the first task of a group when the group returns a 
value does not
       work as expected when using the '>>' operator.
       Indeed, a group that returns a value will return the task that produces 
the value, 
       so trying to downstream a task to the group will link it to the returned 
task of 
       the group.
       To work around the problem, it is possible to use an EmptyOperator as 
entrypoint.
       """
       start_task = EmptyOperator(task_id='start')
       end_task = EmptyOperator(task_id='end')
       entrypoint_my_task_group = 
EmptyOperator(task_id="entrypoint_my_task_group")
       my_task_group_result = my_task_group(entrypoint_my_task_group)
       start_task >> entrypoint_my_task_group
       my_task_group_result >> end_task
   
   
   @task_group
   def my_task_group(entrypoint_my_task_group):
       t1 = EmptyOperator(task_id='task_1_of_group')
       t2 = EmptyOperator(task_id='task_2_of_group')
       entrypoint_my_task_group >> t1 >> t2
       return t2
   
   
   downstream_to_first_task_of_group_with_return_value()
   ```
   
   
   
   ### Operating System
   
   Red Hat Enterprise Linux 8.9 (Ootpa)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-common-sql==1.13.0
   apache-airflow-providers-fab==1.1.0
   apache-airflow-providers-ftp==3.9.0
   apache-airflow-providers-http==4.11.0
   apache-airflow-providers-imap==3.6.0
   apache-airflow-providers-smtp==1.7.0
   apache-airflow-providers-sqlite==3.8.0
   
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   I just used the standard pip install inside a venv.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to