xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2483790152


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
                 .where(~DM.is_paused)
                 .where(TI.state == TaskInstanceState.SCHEDULED)
                 .where(DM.bundle_name.is_not(None))
+                .join(
+                    dr_task_concurrency_subquery,
+                    TI.run_id == dr_task_concurrency_subquery.c.run_id,
+                    isouter=True,
+                )
+                .where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)

Review Comment:
   I did some testing and here is what I found.
   
   Let's assume that we have 3 dags with the same `run_id` and all the checks 
use only the `run_id`.
   
   * the query will return `max_active_tasks` from the 1st dag and ignore the 
other two because they have the same `run_id`
   * every consecutive iteration until the tasks from the 1st dag finish, won't 
pick any new tasks for examining and queueing
   * once the tasks finish, then it will pick `max_active_tasks` but it will 
again concentrate on 1 dag
   
   It would group all tasks from all dags as if they all belonged to the same 
dag_run. But the `dag_id, run_id` pair is unique and now the tasks are properly 
separated.
   
   This was a serious logical bug. I don't know how common it will be for 
multiple `dag_runs` to have the same `run_id` but it's good to have already 
accounted for that edge case. Thanks @kaxil, great catch! With the suggested 
changes it works as it should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to