kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705247082
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ and_(
+ TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
Review Comment:
Nice - the outer join with COALESCE handles dag runs with 0 executing tasks
cleanly.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -194,6 +207,16 @@ def _is_parent_process() -> bool:
return multiprocessing.current_process().name == "MainProcess"
Review Comment:
This queries the same data as `ConcurrencyMap.load()` which is still called
and used for the check at lines 617-634. With the SQL-level filtering now in
place, that Python check should always pass (barring race conditions). Worth
adding a comment explaining why we keep both - race condition protection
between query time and check time?
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ and_(
+ TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ ),
+ isouter=True,
+ )
+ .where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+ )
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+ # Create a subquery with row numbers partitioned by dag_id and
run_id.
+ # Different dags can have the same run_id but
+ # the dag_id combined with the run_id uniquely identify a run.
+ ranked_query = (
+ query.add_columns(
+ func.row_number()
+ .over(
+ partition_by=[TI.dag_id, TI.run_id],
+ order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+ )
+ .label("row_num"),
+ DM.max_active_tasks.label("dr_max_active_tasks"),
+ # Create columns for the order_by checks here for sqlite.
+ TI.priority_weight.label("priority_weight_for_ordering"),
+ DR.logical_date.label("logical_date_for_ordering"),
+ TI.map_index.label("map_index_for_ordering"),
+ )
+ ).subquery()
+
+ # Select only rows where row_number <= max_active_tasks.
+ query = (
+ select(TI)
+ .select_from(ranked_query)
+ .join(
+ TI,
+ (TI.dag_id == ranked_query.c.dag_id)
+ & (TI.task_id == ranked_query.c.task_id)
+ & (TI.run_id == ranked_query.c.run_id)
+ & (TI.map_index == ranked_query.c.map_index),
+ )
+ .where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+ # Add the order_by columns from the ranked query for sqlite.
+ .order_by(
+ -ranked_query.c.priority_weight_for_ordering,
+ ranked_query.c.logical_date_for_ordering,
+ ranked_query.c.map_index_for_ordering,
+ )
+ )
Review Comment:
This new query is missing `.options(selectinload(TI.dag_model))` which was
on the original query above. When we rebuild the query here, we lose the eager
loading - so every access to `ti.dag_model` later will trigger a separate
query. With 50 TIs that's 50+ extra queries per loop, which partially negates
the perf gains from this PR.
Also missing the `.with_hint(TI, "USE INDEX (ti_state)",
dialect_name="mysql")` - should add both here.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -515,7 +590,13 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
try:
locked_query = with_row_locks(query, of=TI, session=session,
skip_locked=True)
- task_instances_to_examine: list[TI] =
list(session.scalars(locked_query).all())
+ task_instances_to_examine = session.scalars(locked_query).all()
+
+ self.log.debug("Length of the tis to examine is %d",
len(task_instances_to_examine))
+ self.log.debug(
+ "TaskInstance selection is: %s",
Review Comment:
nit: The `Counter()` iteration happens even when debug logging is disabled.
Not a big deal but if we're optimizing for perf, might want to guard this:
```python
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(...)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]