ashb commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2480697365


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
                 .where(~DM.is_paused)
                 .where(TI.state == TaskInstanceState.SCHEDULED)
                 .where(DM.bundle_name.is_not(None))
+                .join(
+                    dr_task_concurrency_subquery,
+                    TI.run_id == dr_task_concurrency_subquery.c.run_id,
+                    isouter=True,
+                )
+                .where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)

Review Comment:
   Yup, you are likely right.
   
   I'm testing now with
   
   ```
   for dag in dag_45_tasks dag_250_tasks dag_470_tasks dag_10_tasks 
dag_1000_tasks ;do airflowctl dagrun trigger --dag-id $dag --dag-run-id run1; 
done
   for dag in dag_45_tasks dag_250_tasks dag_470_tasks dag_10_tasks 
dag_1000_tasks ;do airflowctl dagrun trigger --dag-id $dag --dag-run-id run2; 
done
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to