Copilot commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2594698573
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1180,6 +1226,102 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+ def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+ dag_tasks = {}
+
+ with dag_maker(dag_id=dag_id):
+ for i in range(task_num):
+ # Assign priority weight to certain tasks.
+ if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
+ weight = int(i / 2)
+ dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}",
priority_weight=weight)
+ else:
+ # No executor specified, runs on default executor
+ dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+ dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+ task_tis = {}
+
+ tis_list = []
+ for i in range(task_num):
+ task_tis[f"ti{i}"] =
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+ # add
+ tis_list.append(task_tis[f"ti{i}"])
+
+ for ti in tis_list:
+ ti.state = State.SCHEDULED
+ ti.dag_model.max_active_tasks = 4
+
+ session.flush()
+
+ return tis_list
Review Comment:
The `task_helper` method (lines 1229-1258) is nearly identical to the
module-level `task_maker` function (lines 184-227). The only significant
differences are:
1. `task_helper` has a hardcoded `max_active_tasks = 4` (line 1254) instead
of accepting it as a parameter
2. `task_helper` doesn't support custom `run_id` parameter
Consider removing this duplicate method and using the `task_maker` function
instead, or refactor to reduce code duplication. For example, the test at line
1271 could use `task_maker` directly.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -181,6 +181,52 @@ def _create_dagrun(
return _create_dagrun
+def task_maker(
+ dag_maker, session, dag_id: str, task_num: int, max_active_tasks: int,
run_id: str | None = None
+):
+ dag_tasks = {}
+
+ with dag_maker(dag_id=dag_id):
+ for i in range(task_num):
+ # Assign priority weight to certain tasks.
+ if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
Review Comment:
The comment suggests the condition matches tasks at indices 10, 20, 30,
etc., but the condition `(i % 10) == 0` actually matches indices 0, 10, 20, 30,
etc. The comment should be updated to "# 0, 10, 20, 30, 40, ..." to accurately
reflect when the condition is true.
```suggestion
if (i % 10) == 0: # 0, 10, 20, 30, 40, ...
```
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -488,10 +513,58 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
.where(DM.bundle_name.is_not(None))
+ .join(
+ dr_task_concurrency_subquery,
+ and_(
+ TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+ TI.run_id == dr_task_concurrency_subquery.c.run_id,
+ ),
+ isouter=True,
+ )
+ .where(func.coalesce(dr_task_concurrency_subquery.c.dr_count,
0) < DM.max_active_tasks)
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
+ # Create a subquery with row numbers partitioned by dag_id and
run_id.
+ # Different dags can have the same run_id but
+ # the dag_id combined with the run_id uniquely identify a run.
+ ranked_query = (
+ query.add_columns(
+ func.row_number()
+ .over(
+ partition_by=[TI.dag_id, TI.run_id],
+ order_by=[-TI.priority_weight, DR.logical_date,
TI.map_index],
+ )
+ .label("row_num"),
+ DM.max_active_tasks.label("dr_max_active_tasks"),
+ # Create columns for the order_by checks here for sqlite.
+ TI.priority_weight.label("priority_weight_for_ordering"),
+ DR.logical_date.label("logical_date_for_ordering"),
+ TI.map_index.label("map_index_for_ordering"),
+ )
+ ).subquery()
+
+ # Select only rows where row_number <= max_active_tasks.
+ query = (
+ select(TI)
+ .select_from(ranked_query)
+ .join(
+ TI,
+ (TI.dag_id == ranked_query.c.dag_id)
+ & (TI.task_id == ranked_query.c.task_id)
+ & (TI.run_id == ranked_query.c.run_id)
+ & (TI.map_index == ranked_query.c.map_index),
+ )
+ .where(ranked_query.c.row_num <=
ranked_query.c.dr_max_active_tasks)
Review Comment:
The window function limits selection to `max_active_tasks` per dag_run
without considering currently executing tasks. For example, if a dag_run has 3
tasks already executing and `max_active_tasks=4`, this query would select 4
scheduled tasks (since `row_num` goes 1-4), but only 1 can actually be queued.
This means up to 3 extra tasks are retrieved and then rejected at line 701.
Consider incorporating the current task count (`dr_count` from line 505)
into the row number filter to improve efficiency:
1. Add `dr_task_concurrency_subquery.c.dr_count` to the columns at line 533
2. Change line 559 to: `where(ranked_query.c.row_num <=
(ranked_query.c.dr_max_active_tasks - func.coalesce(ranked_query.c.dr_count,
0)))`
This would reduce unnecessary database retrievals and subsequent filtering
in Python code.
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1180,6 +1226,102 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+ def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+ dag_tasks = {}
+
+ with dag_maker(dag_id=dag_id):
+ for i in range(task_num):
+ # Assign priority weight to certain tasks.
+ if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
+ weight = int(i / 2)
+ dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}",
priority_weight=weight)
+ else:
+ # No executor specified, runs on default executor
+ dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+ dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+ task_tis = {}
+
+ tis_list = []
+ for i in range(task_num):
+ task_tis[f"ti{i}"] =
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+ # add
Review Comment:
[nitpick] The comment "# add" is too vague and doesn't add meaningful
information. Consider either removing it or making it more descriptive, such as
"# Add task instance to list for state updates".
```suggestion
# Add task instance to list for state updates
```
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1180,6 +1226,102 @@ def
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
]
assert len(b_tis_in_wrong_executor) == 0
+ def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+ dag_tasks = {}
+
+ with dag_maker(dag_id=dag_id):
+ for i in range(task_num):
+ # Assign priority weight to certain tasks.
+ if (i % 10) == 0: # 10, 20, 30, 40, 50, ...
Review Comment:
The comment suggests the condition matches tasks at indices 10, 20, 30,
etc., but the condition `(i % 10) == 0` actually matches indices 0, 10, 20, 30,
etc. The comment should be updated to "# 0, 10, 20, 30, 40, ..." to accurately
reflect when the condition is true.
```suggestion
if (i % 10) == 0: # 0, 10, 20, 30, 40, 50, ...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]