danielcham opened a new issue, #33446:
URL: https://github.com/apache/airflow/issues/33446

   ### Apache Airflow version
   
   2.6.3
   
   ### What happened
   
   I am writing a DAG that transfers data from MSSQL to BigQuery, The part of 
the ETL process that actually fetches the data from MSSQL and moves it to BQ 
needs to parallelized.
   I am trying to write it as a task group where the first task moves data from 
MSSQL to GCS, and the 2nd task loads the file into BQ.
   for some odd reason when I expand the task group it is automatically marked 
as upstream_failed , at the very first moment the DAG is triggered.
   
   I have tested this with a simple dag (provided below) as well and the bug 
was reproduced.
   
   I found a similar issue 
[here](https://github.com/apache/airflow/issues/27449) but the bug seems to 
persist even after configuring 
`AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False` 
   
   ### What you think should happen instead
   
   The task group should be dynamically expanded **after all upstream tasks 
have finished** since `expand_kwargs` needs the previous task's output.
   
   ### How to reproduce
   
   ```from datetime import timedelta
   
   from airflow.decorators import dag, task, task_group
   from airflow.operators.bash import BashOperator
   from pendulum import datetime
   
   
   @dag(
       dag_id="example_task_group_expansion",
       schedule="@once",
       default_args={
           "depends_on_past": False,
           "email": ["[email protected]"],
           "email_on_failure": True,
           "email_on_retry": True,
           "retries": 0,
           "retry_delay": timedelta(minutes=5),
       },
       start_date=datetime(2023, 8, 1),
       catchup=False,
   )
   def example_dag():
       @task(task_id="TaskDistributer")
       def task_distributer():
           step = 10_000
           return [dict(interval_start=i, interval_end=i + step) for i in 
range(0, 100_000, step)]
   
       @task_group(group_id="tg1")
       def tg(interval_start, interval_end):
           task1 = BashOperator(
               task_id="task1",
               bash_command="echo $interval_start -- $interval_end",
               env={"interval_start": str(interval_start), "interval_end": 
str(interval_end)},
           )
   
           task2 = BashOperator(
               task_id="task2",
               bash_command="echo $interval_start -- $interval_end",
               env={"interval_start": str(interval_start), "interval_end": 
str(interval_end)},
           )
   
           task1 >> task2
   
           return task2
   
       tg.expand_kwargs(task_distributer())
   
   
   example_dag()
   
   ### Operating System
   
   MacOS 13.4.1
   
   ### Versions of Apache Airflow Providers
   
   No providers needed to reproduce
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Docker-compose
   
   Airflow image: apache/airflow:2.6.3-python3.9
   Executor: Celery
   Messaging queue: redis
   Metadata DB: MySQL 5.7
   
   ### Anything else
   
   The problem occurs every time.
   
   Here are some of the scheduler logs that may be relevant.
   ```
   [2023-08-16T12:48:32.520+0000] {taskinstance.py:1112} 
DEBUG - <TaskInstance: example_task_group_expansion.TaskDistributer 
manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} 
DEBUG - <TaskInstance: example_task_group_expansion.TaskDistributer 
manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Trigger Rule' 
PASSED: True, The task instance did not have any upstream tasks.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} 
DEBUG - <TaskInstance: example_task_group_expansion.TaskDistributer 
manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1103} 
DEBUG - Dependencies all met for dep_context=None ti=<TaskInstance: 
example_task_group_expansion.TaskDistributer 
manual__2023-08-16T12:48:31.265119+00:00 [None]>
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} 
DEBUG - <TaskInstance: example_task_group_expansion.tg1.task1 
manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} 
DEBUG - <TaskInstance: example_task_group_expansion.tg1.task1 
manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Trigger Rule' 
PASSED: True, The task instance did not have any upstream tasks.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1112} 
DEBUG - <TaskInstance: example_task_group_expansion.tg1.task1 
manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   [2023-08-16T12:48:32.521+0000] {taskinstance.py:1103} 
DEBUG - Dependencies all met for dep_context=None ti=<TaskInstance: 
example_task_group_expansion.tg1.task1 manual__2023-08-16T12:48:31.265119+00:00 
[None]>
   [2023-08-16T12:48:32.524+0000] {abstractoperator.py:414} 
ERROR - Cannot expand <Task(BashOperator): tg1.task1> for run 
manual__2023-08-16T12:48:31.265119+00:00; missing upstream values: 
['expand_kwargs() argument']
   [2023-08-16T12:48:32.535+0000] {taskinstance.py:1112} 
DEBUG - <TaskInstance: example_task_group_expansion.tg1.task2 
manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2023-08-16T12:48:32.538+0000] {taskinstance.py:1112} 
DEBUG - <TaskInstance: example_task_group_expansion.tg1.task2 
manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Trigger Rule' 
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to 
have succeeded, but found 1 non-success(es). 
upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, 
upstream_failed=0, removed=0, done=0), upstream_task_ids={'tg1.task1'}
   [2023-08-16T12:48:32.538+0000] {taskinstance.py:1093} 
DEBUG - Dependencies not met for <TaskInstance: 
example_task_group_expansion.tg1.task2 manual__2023-08-16T12:48:31.265119+00:00 
[None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' 
requires all upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, 
upstream_failed=0, removed=0, done=0), upstream_task_ids={'tg1.task1'}
   [2023-08-16T12:48:32.539+0000] {taskinstance.py:1112} 
DEBUG - <TaskInstance: example_task_group_expansion.tg1.task2 
manual__2023-08-16T12:48:31.265119+00:00 [None]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   ```
   
   As can be seen from the logs, no upstream tasks are in `done` state yet the 
expanded task is set as `upstream_failed`.
   
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to