kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705351657
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ and_(
+ TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ ),
+ isouter=True,
+ )
+ .where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+ )
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+ # Create a subquery with row numbers partitioned by dag_id and
run_id.
+ # Different dags can have the same run_id but
+ # the dag_id combined with the run_id uniquely identify a run.
+ ranked_query = (
+ query.add_columns(
+ func.row_number()
+ .over(
+ partition_by=[TI.dag_id, TI.run_id],
+ order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+ )
+ .label("row_num"),
+ DM.max_active_tasks.label("dr_max_active_tasks"),
+ # Create columns for the order_by checks here for sqlite.
+ TI.priority_weight.label("priority_weight_for_ordering"),
+ DR.logical_date.label("logical_date_for_ordering"),
+ TI.map_index.label("map_index_for_ordering"),
+ )
+ ).subquery()
+
+ # Select only rows where row_number <= max_active_tasks.
+ query = (
+ select(TI)
+ .select_from(ranked_query)
+ .join(
+ TI,
+ (TI.dag_id == ranked_query.c.dag_id)
+ & (TI.task_id == ranked_query.c.task_id)
+ & (TI.run_id == ranked_query.c.run_id)
+ & (TI.map_index == ranked_query.c.map_index),
+ )
+ .where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+ # Add the order_by columns from the ranked query for sqlite.
+ .order_by(
+ -ranked_query.c.priority_weight_for_ordering,
Review Comment:
This doesn't account for already running/queued tasks. If `max_active_tasks
= 4` and there are already 2 running, we still return up to 4 scheduled tasks,
but can only queue 2.
Should be:
```python
.where(ranked_query.c.row_num <= (ranked_query.c.dr_max_active_tasks -
func.coalesce(ranked_query.c.task_per_dr_count, 0)))
```
You'd need to add `task_per_dr_count` as a column in ranked_query.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ and_(
+ TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ ),
+ isouter=True,
+ )
+ .where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+ )
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+ # Create a subquery with row numbers partitioned by dag_id and
run_id.
+ # Different dags can have the same run_id but
+ # the dag_id combined with the run_id uniquely identify a run.
+ ranked_query = (
+ query.add_columns(
+ func.row_number()
+ .over(
+ partition_by=[TI.dag_id, TI.run_id],
+ order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+ )
+ .label("row_num"),
+ DM.max_active_tasks.label("dr_max_active_tasks"),
+ # Create columns for the order_by checks here for sqlite.
+ TI.priority_weight.label("priority_weight_for_ordering"),
+ DR.logical_date.label("logical_date_for_ordering"),
+ TI.map_index.label("map_index_for_ordering"),
+ )
+ ).subquery()
+
+ # Select only rows where row_number <= max_active_tasks.
+ query = (
+ select(TI)
+ .select_from(ranked_query)
+ .join(
+ TI,
+ (TI.dag_id == ranked_query.c.dag_id)
+ & (TI.task_id == ranked_query.c.task_id)
+ & (TI.run_id == ranked_query.c.run_id)
+ & (TI.map_index == ranked_query.c.map_index),
+ )
+ .where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+ # Add the order_by columns from the ranked query for sqlite.
+ .order_by(
+ -ranked_query.c.priority_weight_for_ordering,
+ ranked_query.c.logical_date_for_ordering,
+ ranked_query.c.map_index_for_ordering,
+ )
+ )
+
if starved_pools:
query = query.where(TI.pool.not_in(starved_pools))
Review Comment:
This new query is missing `.options(selectinload(TI.dag_model))` which was
on the original query above. When we rebuild the query here, we lose the eager
loading - so every access to `ti.dag_model` later will trigger a separate
query. With 50 TIs that's 50+ extra queries per loop, partially negating the
perf gains.
Also missing `.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")`
- should add both here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]