tatiana opened a new issue, #56723:
URL: https://github.com/apache/airflow/issues/56723

   ### Apache Airflow version
   
   3.1.0
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   Also reproduced in Airflow 3.0, 2.11, 2.10
   
   ### What happened?
   
   Airflow is not respecting the Task's `priority_weight` within a `TaskGroup` 
if the `TaskGroup` depends on another task.
   
   This issue does not happen if the `TaskGroup` does not depend on another 
task.
   
   This issue also does not happen if the exact topology of tasks is run 
directly in an Airflow `DAG`, without a `TaskGroup`.
   
   ### What you think should happen instead?
   
   Airflow should respect the Task's `priority_weight` within a `TaskGroup`, 
even if the `TaskGroup` depends on another task, having a consistent behaviour 
if the `TaskGroup` does not depend on other tasks and with Airflow `DAG`
   
   ### How to reproduce
   
   DAG representation:
   ```
       start → tg
   ```
   
   tg is composed by the following tasks:
   ```
       A → {B, C, E}
           C → D
           E → F
   ```
   
   Where A has the highest priority weight, and B and C can start before A has 
completed, but have a much lower priority weight.
   
   This topology is represented in the example DAG below, `bug_dag.py`:
   ```
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.utils.task_group import TaskGroup
   from datetime import datetime
   import time
   
   def wait_3_seconds():
       time.sleep(3)
   
   
   with DAG(
       dag_id="bug_dag",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
       default_args={
           "weight_rule": "absolute",
           "priority_weight": 10,
           "python_callable": wait_3_seconds,
       }
   ) as dag:
   
       # One task outside the TaskGroup
       start = PythonOperator(
           task_id="start"
       )
   
       # TaskGroup definition
       with TaskGroup(group_id="group_1") as group_1:
   
           # This task is critical and has the hightest prioroity weight, we 
expect it to be the first in the task group to be scheduled
           task_A = PythonOperator(
               task_id="A",
               priority_weight=1000,
               weight_rule="absolute",
           )
   
           # We are happy for B and C to start before A has finished
           task_B = PythonOperator(
               task_id="B",
               trigger_rule="always"
           )
   
           task_C = PythonOperator(
               task_id="C",
               trigger_rule="always"
           )
   
           task_D = PythonOperator(
               task_id="D"
           )
   
           task_E = PythonOperator(
               task_id="E"
           )
   
           task_F = PythonOperator(
               task_id="F"
           )
   
           # Dependencies inside the group
           task_A >> [task_B, task_C, task_E]
           task_C >> task_D
           task_E >> task_F
   
       # Overall DAG dependency
       start >> group_1
   ```
   
   The expectation is that the execution in the Gantt chart will be:
   - Phase 1. Run task `start` (before TaskGroup)
   - Phase 2. Task A, B and C can start together, with priority being given to 
A if the Airflow setup doesn't allow further concurrency
   - Phase 3. Other tasks follow their natural flow
   
   What is actually happening:
   - Phase 1. Run task `start` (before TaskGroup)
   - Phase 2: Tasks B and C start before task A
   - Phase 3: Other tasks follow their natural flow
   
   
   If we comment out the last line:
   ```
   #    start >> group_1
   ```
   
   The tasks priority weight is respected.
   
   
   ### Operating System
   
   MacOS
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   N/A
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to