chefmtt opened a new issue, #30883:
URL: https://github.com/apache/airflow/issues/30883

   ### Apache Airflow version
   
   2.5.3
   
   ### What happened
   
   Hello! 
   
   When using a branching operator in a mapped task group, skipped tasks will 
be for all mapped instances of the task_group.
   
   Here is an example DAG exhibiting the issue.
   
![image](https://user-images.githubusercontent.com/11246353/234595433-6c1460b7-e808-4de1-9eb8-8b9fdb6f616c.png)
   
   When the BranchOperator sets a downstream task as "skipped", it will also do 
so retroactively.
   If branch_a is selected and has time to run before the first time where 
branch_b is selected, it will do without issue. However, the status of that 
instance will still be set to skipped. Any subsequent choice of "branch_a" will 
be skipped.
    
   Logs for such a case below (obtained using the DAG below.).
   
   I am running Airflow v2.5.1.
   
   ### What you think should happen instead
   
   branch_a selected:
   ```log
   [2023-04-26, 13:58:09 UTC] {python.py:177} INFO - Done. Returned value was: 
showcase_branching_issues.branch_a
   [2023-04-26, 13:58:09 UTC] {python.py:211} INFO - Branch callable return 
showcase_branching_issues.branch_a
   [2023-04-26, 13:58:09 UTC] {skipmixin.py:155} INFO - Following branch 
showcase_branching_issues.branch_a
   [2023-04-26, 13:58:09 UTC] {skipmixin.py:211} INFO - Skipping tasks 
['showcase_branching_issues.branch_b']
   [2023-04-26, 13:58:09 UTC] {taskinstance.py:1318} INFO - Marking task as 
SUCCESS. dag_id=branching_issue, task_id=showcase_branching_issues.branch_int, 
map_index=0, execution_date=20230426T135806, start_date=20230426T135809, 
end_date=20230426T135809
   [2023-04-26, 13:58:09 UTC] {local_task_job.py:208} INFO - Task exited with 
return code 0
   [2023-04-26, 13:58:09 UTC] {taskinstance.py:2578} INFO - 2 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   branch_a running:
   ```log
   [2023-04-26, 13:58:10 UTC] {python.py:177} INFO - Done. Returned value was: 
None
   [2023-04-26, 13:58:10 UTC] {taskinstance.py:1318} INFO - Marking task as 
SUCCESS. dag_id=branching_issue, task_id=showcase_branching_issues.branch_a, 
map_index=0, execution_date=20230426T135806, start_date=20230426T135810, 
end_date=20230426T135810
   [2023-04-26, 13:58:10 UTC] {local_task_job.py:208} INFO - Task exited with 
return code 0
   [2023-04-26, 13:58:10 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   branch_b selected:
   ```log
   [2023-04-26, 13:58:14 UTC] {python.py:177} INFO - Done. Returned value was: 
showcase_branching_issues.branch_b
   [2023-04-26, 13:58:14 UTC] {python.py:211} INFO - Branch callable return 
showcase_branching_issues.branch_b
   [2023-04-26, 13:58:14 UTC] {skipmixin.py:155} INFO - Following branch 
showcase_branching_issues.branch_b
   [2023-04-26, 13:58:14 UTC] {skipmixin.py:211} INFO - Skipping tasks 
['showcase_branching_issues.branch_a']
   [2023-04-26, 13:58:14 UTC] {taskinstance.py:1318} INFO - Marking task as 
SUCCESS. dag_id=branching_issue, task_id=showcase_branching_issues.branch_int, 
map_index=1, execution_date=20230426T135806, start_date=20230426T135809, 
end_date=20230426T135814
   [2023-04-26, 13:58:14 UTC] {local_task_job.py:208} INFO - Task exited with 
return code 0
   [2023-04-26, 13:58:14 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   All branch_a and branch_b instances set to skipped, no task_b instance ran.
   
   ---
   
   Branch selection and "skipped" status should be relative to a particular 
task_group instance.
   
   
   ### How to reproduce
   
   Here is a minimal example DAG which showcases the issue:
   
   ```python
   from datetime import datetime
   
   from airflow.decorators import dag, task, task_group
   
   
   @dag(
       dag_id="branching_issue",
       schedule=None,
       start_date=datetime(2021, 1, 1),
   )
   def BranchingIssue():
       @task
       def branch_b():
           pass
   
       @task
       def branch_a():
           pass
   
       @task
       def initiate_dynamic_mapping():
           import random
   
           random_len = random.randint(1, 10)
           return [i for i in range(random_len)]
   
       @task.branch
       def branch_int(k):
           import time
   
           branch = "showcase_branching_issues."
           if k % 2 == 0:
               branch += "branch_a"
           else:
               time.sleep(5)
               branch += "branch_b"
           return branch
   
       @task_group
       def showcase_branching_issues(k):
           selected_branch = branch_int(k)
           selected_branch >> [branch_a(), branch_b()]
   
       list_k = initiate_dynamic_mapping()
       showcase_branching_issues.expand(k=list_k)
   
   
   dag = BranchingIssue()
   ```
   
   
   ### Operating System
   
   Ubuntu 22.04.1 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   I tried searching for related issues or fixes in newer/upcoming releases but 
found nothing, please excuse me if I missed something.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to