kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2479689911
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ isouter=True,
+ )
+ .where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
Review Comment:
Hmm, `run_id` is only unique per DAG, but this subquery groups by `run_id`
alone and the join uses just `TI.run_id`. When one DAG reaches
`max_active_tasks`, every other DAG that happens to share the same `run_id`
string (common for scheduled runs) now fails the `< DM.max_active_tasks` check
and never gets considered.
Better to group by `(dag_id, run_id)` and join on both columns instead?
https://github.com/apache/airflow/blob/085b0edfe7c7151517cd727020c13a2834409210/airflow-core/src/airflow/models/dagrun.py#L233
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ isouter=True,
+ )
+ .where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+ # Create a subquery with row numbers partitioned by run_id.
+ #
+ # run_id | task_id | priority_weight | row_num
+ # ----------|---------|-----------------|--------
+ # dag1_dr1 | task1 | 100 | 1
+ # dag1_dr1 | task22 | 90 | 2
+ # dag1_dr1 | task5 | 80 | 3
+ # dag1_dr1 | task13 | 70 | 4
+ # dag2_dr1 | task3 | 95 | 1
+ # dag2_dr1 | task1 | 85 | 2
+ # dag2_dr1 | task5 | 75 | 3
+ ranked_query = (
+ query.add_columns(
+ func.row_number()
+ .over(
+ partition_by=TI.run_id,
+ order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+ )
+ .label("row_num"),
+ DM.max_active_tasks.label("dr_max_active_tasks"),
+ # Create columns for the order_by checks here for sqlite.
+ TI.priority_weight.label("priority_weight_for_ordering"),
+ DR.logical_date.label("logical_date_for_ordering"),
+ TI.map_index.label("map_index_for_ordering"),
+ )
+ ).subquery()
+
+ # Select only rows where row_number <= max_active_tasks.
+ query = (
+ select(TI)
+ .select_from(ranked_query)
+ .join(
+ TI,
+ (TI.dag_id == ranked_query.c.dag_id)
+ & (TI.task_id == ranked_query.c.task_id)
+ & (TI.run_id == ranked_query.c.run_id)
+ & (TI.map_index == ranked_query.c.map_index),
+ )
+ .where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+ # Add the order_by columns from the ranked query for sqlite.
+ .order_by(
+ -ranked_query.c.priority_weight_for_ordering,
+ ranked_query.c.logical_date_for_ordering,
+ ranked_query.c.map_index_for_ordering,
+ )
+ )
+
Review Comment:
Similar thing here: The window only partitions on `run_id`, which collides
across DAGs. Once one DAG consumes its `max_active_tasks`, every other DAG with
the same `run_id` is filtered out by `row_num <= dr_max_active_tasks`, even if
they have zero active tasks.
Better would be to partition by both `TI.dag_id` and `TI.run_id` (or the dag
run PK) before applying the limit so each DAG run keeps its own quota.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -408,10 +433,63 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ isouter=True,
+ )
+ .where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
Review Comment:
<img width="856" height="528" alt="Image"
src="https://github.com/user-attachments/assets/80293a36-7909-40c9-95e2-21a2fb9ba81a"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]