danielcham opened a new issue, #33446: URL: https://github.com/apache/airflow/issues/33446
### Apache Airflow version 2.6.3 ### What happened I am writing a DAG that transfers data from MSSQL to BigQuery, The part of the ETL process that actually fetches the data from MSSQL and moves it to BQ needs to parallelized. I am trying to write it as a task group where the first task moves data from MSSQL to GCS, and the 2nd task loads the file into BQ. for some odd reason when I expand the task group it is automatically marked as upstream_failed , at the very first moment the DAG is triggered. I have tested this with a simple dag (provided below) as well and the bug was reproduced. I found a similar issue [here](https://github.com/apache/airflow/issues/27449) but the bug seems to persist even after configuring `AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False` ### What you think should happen instead The task group should be dynamically expanded **after all upstream tasks have finished** since `expand_kwargs` needs the previous task's output. ### How to reproduce ```from datetime import timedelta from airflow.decorators import dag, task, task_group from airflow.operators.bash import BashOperator from pendulum import datetime @dag( dag_id="example_task_group_expansion", schedule="@once", default_args={ "depends_on_past": False, "email": ["[email protected]"], "email_on_failure": True, "email_on_retry": True, "retries": 0, "retry_delay": timedelta(minutes=5), }, start_date=datetime(2023, 8, 1), catchup=False, ) def example_dag(): @task(task_id="TaskDistributer") def task_distributer(): step = 10_000 return [dict(interval_start=i, interval_end=i + step) for i in range(0, 100_000, step)] @task_group(group_id="tg1") def tg(interval_start, interval_end): task1 = BashOperator( task_id="task1", bash_command="echo $interval_start -- $interval_end", env={"interval_start": str(interval_start), "interval_end": str(interval_end)}, ) task2 = BashOperator( task_id="task2", bash_command="echo $interval_start -- $interval_end", env={"interval_start": str(interval_start), "interval_end": str(interval_end)}, ) task1 >> task2 return task2 tg.expand_kwargs(task_distributer()) example_dag() ### Operating System MacOS 13.4.1 ### Versions of Apache Airflow Providers No providers needed to reproduce ### Deployment Docker-Compose ### Deployment details Docker-compose Airflow image: apache/airflow:2.6.3-python3.9 Executor: Celery Messaging queue: redis Metadata DB: MySQL 5.7 ### Anything else The problem occurs every time. Here are some of the scheduler logs that may be relevant. ``` [[34m2023-08-16T12:48:32.520+0000[0m] {[34mtaskinstance.py:[0m1112} DEBUG[0m - <TaskInstance: example_task_group_expansion.TaskDistributer manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.[0m [[34m2023-08-16T12:48:32.521+0000[0m] {[34mtaskinstance.py:[0m1112} DEBUG[0m - <TaskInstance: example_task_group_expansion.TaskDistributer manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.[0m [[34m2023-08-16T12:48:32.521+0000[0m] {[34mtaskinstance.py:[0m1112} DEBUG[0m - <TaskInstance: example_task_group_expansion.TaskDistributer manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.[0m [[34m2023-08-16T12:48:32.521+0000[0m] {[34mtaskinstance.py:[0m1103} DEBUG[0m - Dependencies all met for dep_context=None ti=<TaskInstance: example_task_group_expansion.TaskDistributer manual__2023-08-16T12:48:31.265119+00:00 [None]>[0m [[34m2023-08-16T12:48:32.521+0000[0m] {[34mtaskinstance.py:[0m1112} DEBUG[0m - <TaskInstance: example_task_group_expansion.tg1.task1 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.[0m [[34m2023-08-16T12:48:32.521+0000[0m] {[34mtaskinstance.py:[0m1112} DEBUG[0m - <TaskInstance: example_task_group_expansion.tg1.task1 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.[0m [[34m2023-08-16T12:48:32.521+0000[0m] {[34mtaskinstance.py:[0m1112} DEBUG[0m - <TaskInstance: example_task_group_expansion.tg1.task1 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.[0m [[34m2023-08-16T12:48:32.521+0000[0m] {[34mtaskinstance.py:[0m1103} DEBUG[0m - Dependencies all met for dep_context=None ti=<TaskInstance: example_task_group_expansion.tg1.task1 manual__2023-08-16T12:48:31.265119+00:00 [None]>[0m [[34m2023-08-16T12:48:32.524+0000[0m] {[34mabstractoperator.py:[0m414} ERROR[0m - Cannot expand <Task(BashOperator): tg1.task1> for run manual__2023-08-16T12:48:31.265119+00:00; missing upstream values: ['expand_kwargs() argument'][0m [[34m2023-08-16T12:48:32.535+0000[0m] {[34mtaskinstance.py:[0m1112} DEBUG[0m - <TaskInstance: example_task_group_expansion.tg1.task2 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.[0m [[34m2023-08-16T12:48:32.538+0000[0m] {[34mtaskinstance.py:[0m1112} DEBUG[0m - <TaskInstance: example_task_group_expansion.tg1.task2 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0), upstream_task_ids={'tg1.task1'}[0m [[34m2023-08-16T12:48:32.538+0000[0m] {[34mtaskinstance.py:[0m1093} DEBUG[0m - Dependencies not met for <TaskInstance: example_task_group_expansion.tg1.task2 manual__2023-08-16T12:48:31.265119+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0), upstream_task_ids={'tg1.task1'}[0m [[34m2023-08-16T12:48:32.539+0000[0m] {[34mtaskinstance.py:[0m1112} DEBUG[0m - <TaskInstance: example_task_group_expansion.tg1.task2 manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.[0m ``` As can be seen from the logs, no upstream tasks are in `done` state yet the expanded task is set as `upstream_failed`. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
