tatiana opened a new issue, #56723:
URL: https://github.com/apache/airflow/issues/56723
### Apache Airflow version
3.1.0
### If "Other Airflow 2/3 version" selected, which one?
Also reproduced in Airflow 3.0, 2.11, 2.10
### What happened?
Airflow is not respecting the Task's `priority_weight` within a `TaskGroup`
if the `TaskGroup` depends on another task.
This issue does not happen if the `TaskGroup` does not depend on another
task.
This issue also does not happen if the exact topology of tasks is run
directly in an Airflow `DAG`, without a `TaskGroup`.
### What you think should happen instead?
Airflow should respect the Task's `priority_weight` within a `TaskGroup`,
even if the `TaskGroup` depends on another task, having a consistent behaviour
if the `TaskGroup` does not depend on other tasks and with Airflow `DAG`
### How to reproduce
DAG representation:
```
start → tg
```
tg is composed by the following tasks:
```
A → {B, C, E}
C → D
E → F
```
Where A has the highest priority weight, and B and C can start before A has
completed, but have a much lower priority weight.
This topology is represented in the example DAG below, `bug_dag.py`:
```
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
import time
def wait_3_seconds():
time.sleep(3)
with DAG(
dag_id="bug_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
default_args={
"weight_rule": "absolute",
"priority_weight": 10,
"python_callable": wait_3_seconds,
}
) as dag:
# One task outside the TaskGroup
start = PythonOperator(
task_id="start"
)
# TaskGroup definition
with TaskGroup(group_id="group_1") as group_1:
# This task is critical and has the hightest prioroity weight, we
expect it to be the first in the task group to be scheduled
task_A = PythonOperator(
task_id="A",
priority_weight=1000,
weight_rule="absolute",
)
# We are happy for B and C to start before A has finished
task_B = PythonOperator(
task_id="B",
trigger_rule="always"
)
task_C = PythonOperator(
task_id="C",
trigger_rule="always"
)
task_D = PythonOperator(
task_id="D"
)
task_E = PythonOperator(
task_id="E"
)
task_F = PythonOperator(
task_id="F"
)
# Dependencies inside the group
task_A >> [task_B, task_C, task_E]
task_C >> task_D
task_E >> task_F
# Overall DAG dependency
start >> group_1
```
The expectation is that the execution in the Gantt chart will be:
- Phase 1. Run task `start` (before TaskGroup)
- Phase 2. Task A, B and C can start together, with priority being given to
A if the Airflow setup doesn't allow further concurrency
- Phase 3. Other tasks follow their natural flow
What is actually happening:
- Phase 1. Run task `start` (before TaskGroup)
- Phase 2: Tasks B and C start before task A
- Phase 3: Other tasks follow their natural flow
If we comment out the last line:
```
# start >> group_1
```
The tasks priority weight is respected.
### Operating System
MacOS
### Versions of Apache Airflow Providers
N/A
### Deployment
Other
### Deployment details
N/A
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]