kaxil commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2705274366
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ and_(
+ TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ ),
+ isouter=True,
+ )
+ .where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+ )
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+ # Create a subquery with row numbers partitioned by dag_id and
run_id.
+ # Different dags can have the same run_id but
+ # the dag_id combined with the run_id uniquely identify a run.
+ ranked_query = (
+ query.add_columns(
+ func.row_number()
+ .over(
+ partition_by=[TI.dag_id, TI.run_id],
+ order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+ )
+ .label("row_num"),
+ DM.max_active_tasks.label("dr_max_active_tasks"),
+ # Create columns for the order_by checks here for sqlite.
Review Comment:
The row_number ranking is applied here before the starvation filters
(starved_pools, starved_dags, etc) are added below. In the original code, those
filters were applied BEFORE the limit.
This means tasks in starved pools will consume row_number slots and then get
filtered out later, potentially excluding schedulable tasks from the same dag
run that would have fit within the limit.
Should we apply starvation filters to the base query before building
ranked_query?
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1266,6 +1312,71 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+ @conf_vars(
+ {
+ ("scheduler", "max_tis_per_query"): "100",
+ ("scheduler", "max_dagruns_to_create_per_loop"): "10",
+ ("scheduler", "max_dagruns_per_loop_to_schedule"): "20",
+ ("core", "parallelism"): "100",
+ ("core", "max_active_tasks_per_dag"): "4",
+ ("core", "max_active_runs_per_dag"): "10",
+ ("core", "default_pool_task_slot_count"): "64",
+ }
+ )
+ def test_per_dr_limit_applied_in_task_query(self, dag_maker,
mock_executors):
+ scheduler_job = Job()
+ scheduler_job.executor.parallelism = 100
+ scheduler_job.executor.slots_available = 70
+ scheduler_job.max_tis_per_query = 100
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+ session = settings.Session()
+
+ # Use the same run_id.
+ task_maker(dag_maker, session, "dag_1300_tasks", 1300, 4, "run1")
+ task_maker(dag_maker, session, "dag_1200_tasks", 1200, 4, "run1")
+ task_maker(dag_maker, session, "dag_1100_tasks", 1100, 4, "run1")
+ task_maker(dag_maker, session, "dag_100_tasks", 100, 4, "run1")
+ task_maker(dag_maker, session, "dag_90_tasks", 90, 4, "run1")
+ task_maker(dag_maker, session, "dag_80_tasks", 80, 4, "run1")
+
+ count = 0
+ iterations = 0
+
+ from airflow.configuration import conf
+
+ task_num = conf.getint("core", "max_active_tasks_per_dag") * 6
+
+ # 6 dags * 4 = 24.
+ assert task_num == 24
+
+ queued_tis = None
+ while count < task_num:
+ # Use `_executable_task_instances_to_queued` because it returns a
list of TIs
+ # while `_critical_section_enqueue_task_instances` just returns
the number of the TIs.
+ queued_tis = self.job_runner._executable_task_instances_to_queued(
+ max_tis=scheduler_job.executor.slots_available, session=session
+ )
+ count += len(queued_tis)
+ iterations += 1
+
+ assert iterations == 1
+ assert count == task_num
+
+ assert queued_tis is not None
+
+ dag_counts = Counter(ti.dag_id for ti in queued_tis)
+
+ # Tasks from all 6 dags should have been queued.
+ assert len(dag_counts) == 6
+ assert dag_counts == {
+ "dag_1300_tasks": 4,
+ "dag_1200_tasks": 4,
+ "dag_1100_tasks": 4,
+ "dag_100_tasks": 4,
+ "dag_90_tasks": 4,
+ "dag_80_tasks": 4,
+ }, "Count for each dag_id should be 4 but it isn't"
+
def test_find_executable_task_instances_order_priority_with_pools(self,
dag_maker):
"""
The scheduler job should pick tasks with higher priority for execution
Review Comment:
Couple of test cases that would be good to add:
1. **Starvation filter ordering**: Create a dag run with tasks in mixed
pools (some starved, some not). Verify that tasks in non-starved pools aren't
excluded just because starved-pool tasks consumed row_number slots.
2. **Partial capacity**: Create a dag run with `max_active_tasks=4` where 2
tasks are already RUNNING. Add 10 SCHEDULED tasks. Verify the query only
returns 2 (not 4) scheduled tasks for that run.
Both would catch the edge cases where the current implementation
over-selects or under-fills.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -490,10 +515,60 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ and_(
+ TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ ),
+ isouter=True,
+ )
+ .where(
+
func.coalesce(dr_task_concurrency_subquery.c.task_per_dr_count, 0) <
DM.max_active_tasks
+ )
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+ # Create a subquery with row numbers partitioned by dag_id and
run_id.
+ # Different dags can have the same run_id but
+ # the dag_id combined with the run_id uniquely identify a run.
+ ranked_query = (
+ query.add_columns(
+ func.row_number()
+ .over(
+ partition_by=[TI.dag_id, TI.run_id],
+ order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+ )
+ .label("row_num"),
+ DM.max_active_tasks.label("dr_max_active_tasks"),
+ # Create columns for the order_by checks here for sqlite.
+ TI.priority_weight.label("priority_weight_for_ordering"),
+ DR.logical_date.label("logical_date_for_ordering"),
+ TI.map_index.label("map_index_for_ordering"),
+ )
+ ).subquery()
+
+ # Select only rows where row_number <= max_active_tasks.
+ query = (
+ select(TI)
+ .select_from(ranked_query)
+ .join(
+ TI,
+ (TI.dag_id == ranked_query.c.dag_id)
+ & (TI.task_id == ranked_query.c.task_id)
+ & (TI.run_id == ranked_query.c.run_id)
+ & (TI.map_index == ranked_query.c.map_index),
+ )
+ .where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
+ # Add the order_by columns from the ranked query for sqlite.
+ .order_by(
+ -ranked_query.c.priority_weight_for_ordering,
+ ranked_query.c.logical_date_for_ordering,
Review Comment:
This doesn't account for already running/queued tasks. If `max_active_tasks
= 4` and there are already 2 tasks running, this still returns up to 4
scheduled tasks, but we can only queue 2.
Should be something like:
```python
.where(ranked_query.c.row_num <= (ranked_query.c.dr_max_active_tasks -
func.coalesce(ranked_query.c.task_per_dr_count, 0)))
```
You'd need to add `task_per_dr_count` as a column in ranked_query for this
to work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]