xBis7 commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2483790152
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ isouter=True,
+ )
+ .where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
Review Comment:
I did some testing and here is what I found.
Let's assume that we have 3 dags with the same `run_id` and all the checks
use only the `run_id`.
* the query will return `max_active_tasks` from the 1st dag and ignore the
other two because they have the same `run_id`
* every consecutive iteration until the tasks from the 1st dag finish, won't
pick any new tasks for examining and queueing
* once the tasks finish, then it will pick `max_active_tasks` but it will
again concentrate on 1 dag
It would group all tasks from all dags as if they all belonged to the same
dag_run. But the `dag_id, run_id` pair is unique and now the tasks are properly
separated.
This was a serious logical bug. I don't know how common it will be for
multiple `dag_runs` to have the same `run_id` but it's good to have already
accounted for that edge case. Thanks @kaxil, great catch! With the suggested
changes it works as it should.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]