chefmtt opened a new issue, #30883: URL: https://github.com/apache/airflow/issues/30883
### Apache Airflow version 2.5.3 ### What happened Hello! When using a branching operator in a mapped task group, skipped tasks will be for all mapped instances of the task_group. Here is an example DAG exhibiting the issue.  When the BranchOperator sets a downstream task as "skipped", it will also do so retroactively. If branch_a is selected and has time to run before the first time where branch_b is selected, it will do without issue. However, the status of that instance will still be set to skipped. Any subsequent choice of "branch_a" will be skipped. Logs for such a case below (obtained using the DAG below.). I am running Airflow v2.5.1. ### What you think should happen instead branch_a selected: ```log [2023-04-26, 13:58:09 UTC] {python.py:177} INFO - Done. Returned value was: showcase_branching_issues.branch_a [2023-04-26, 13:58:09 UTC] {python.py:211} INFO - Branch callable return showcase_branching_issues.branch_a [2023-04-26, 13:58:09 UTC] {skipmixin.py:155} INFO - Following branch showcase_branching_issues.branch_a [2023-04-26, 13:58:09 UTC] {skipmixin.py:211} INFO - Skipping tasks ['showcase_branching_issues.branch_b'] [2023-04-26, 13:58:09 UTC] {taskinstance.py:1318} INFO - Marking task as SUCCESS. dag_id=branching_issue, task_id=showcase_branching_issues.branch_int, map_index=0, execution_date=20230426T135806, start_date=20230426T135809, end_date=20230426T135809 [2023-04-26, 13:58:09 UTC] {local_task_job.py:208} INFO - Task exited with return code 0 [2023-04-26, 13:58:09 UTC] {taskinstance.py:2578} INFO - 2 downstream tasks scheduled from follow-on schedule check ``` branch_a running: ```log [2023-04-26, 13:58:10 UTC] {python.py:177} INFO - Done. Returned value was: None [2023-04-26, 13:58:10 UTC] {taskinstance.py:1318} INFO - Marking task as SUCCESS. dag_id=branching_issue, task_id=showcase_branching_issues.branch_a, map_index=0, execution_date=20230426T135806, start_date=20230426T135810, end_date=20230426T135810 [2023-04-26, 13:58:10 UTC] {local_task_job.py:208} INFO - Task exited with return code 0 [2023-04-26, 13:58:10 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` branch_b selected: ```log [2023-04-26, 13:58:14 UTC] {python.py:177} INFO - Done. Returned value was: showcase_branching_issues.branch_b [2023-04-26, 13:58:14 UTC] {python.py:211} INFO - Branch callable return showcase_branching_issues.branch_b [2023-04-26, 13:58:14 UTC] {skipmixin.py:155} INFO - Following branch showcase_branching_issues.branch_b [2023-04-26, 13:58:14 UTC] {skipmixin.py:211} INFO - Skipping tasks ['showcase_branching_issues.branch_a'] [2023-04-26, 13:58:14 UTC] {taskinstance.py:1318} INFO - Marking task as SUCCESS. dag_id=branching_issue, task_id=showcase_branching_issues.branch_int, map_index=1, execution_date=20230426T135806, start_date=20230426T135809, end_date=20230426T135814 [2023-04-26, 13:58:14 UTC] {local_task_job.py:208} INFO - Task exited with return code 0 [2023-04-26, 13:58:14 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` All branch_a and branch_b instances set to skipped, no task_b instance ran. --- Branch selection and "skipped" status should be relative to a particular task_group instance. ### How to reproduce Here is a minimal example DAG which showcases the issue: ```python from datetime import datetime from airflow.decorators import dag, task, task_group @dag( dag_id="branching_issue", schedule=None, start_date=datetime(2021, 1, 1), ) def BranchingIssue(): @task def branch_b(): pass @task def branch_a(): pass @task def initiate_dynamic_mapping(): import random random_len = random.randint(1, 10) return [i for i in range(random_len)] @task.branch def branch_int(k): import time branch = "showcase_branching_issues." if k % 2 == 0: branch += "branch_a" else: time.sleep(5) branch += "branch_b" return branch @task_group def showcase_branching_issues(k): selected_branch = branch_int(k) selected_branch >> [branch_a(), branch_b()] list_k = initiate_dynamic_mapping() showcase_branching_issues.expand(k=list_k) dag = BranchingIssue() ``` ### Operating System Ubuntu 22.04.1 LTS ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else I tried searching for related issues or fixes in newer/upcoming releases but found nothing, please excuse me if I missed something. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
