Copilot commented on code in PR #54103:
URL: https://github.com/apache/airflow/pull/54103#discussion_r2594698573


##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1180,6 +1226,102 @@ def 
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
         ]
         assert len(b_tis_in_wrong_executor) == 0
 
+    def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+        dag_tasks = {}
+
+        with dag_maker(dag_id=dag_id):
+            for i in range(task_num):
+                # Assign priority weight to certain tasks.
+                if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...
+                    weight = int(i / 2)
+                    dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}", 
priority_weight=weight)
+                else:
+                    # No executor specified, runs on default executor
+                    dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        task_tis = {}
+
+        tis_list = []
+        for i in range(task_num):
+            task_tis[f"ti{i}"] = 
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+            # add
+            tis_list.append(task_tis[f"ti{i}"])
+
+        for ti in tis_list:
+            ti.state = State.SCHEDULED
+            ti.dag_model.max_active_tasks = 4
+
+        session.flush()
+
+        return tis_list

Review Comment:
   The `task_helper` method (lines 1229-1258) is nearly identical to the 
module-level `task_maker` function (lines 184-227). The only significant 
differences are:
   1. `task_helper` has a hardcoded `max_active_tasks = 4` (line 1254) instead 
of accepting it as a parameter
   2. `task_helper` doesn't support custom `run_id` parameter
   
   Consider removing this duplicate method and using the `task_maker` function 
instead, or refactor to reduce code duplication. For example, the test at line 
1271 could use `task_maker` directly.



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -181,6 +181,52 @@ def _create_dagrun(
     return _create_dagrun
 
 
+def task_maker(
+    dag_maker, session, dag_id: str, task_num: int, max_active_tasks: int, 
run_id: str | None = None
+):
+    dag_tasks = {}
+
+    with dag_maker(dag_id=dag_id):
+        for i in range(task_num):
+            # Assign priority weight to certain tasks.
+            if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...

Review Comment:
   The comment suggests the condition matches tasks at indices 10, 20, 30, 
etc., but the condition `(i % 10) == 0` actually matches indices 0, 10, 20, 30, 
etc. The comment should be updated to "# 0, 10, 20, 30, 40, ..." to accurately 
reflect when the condition is true.
   ```suggestion
               if (i % 10) == 0:  # 0, 10, 20, 30, 40, ...
   ```



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -488,10 +513,58 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
                 .where(~DM.is_paused)
                 .where(TI.state == TaskInstanceState.SCHEDULED)
                 .where(DM.bundle_name.is_not(None))
+                .join(
+                    dr_task_concurrency_subquery,
+                    and_(
+                        TI.dag_id == dr_task_concurrency_subquery.c.dag_id,
+                        TI.run_id == dr_task_concurrency_subquery.c.run_id,
+                    ),
+                    isouter=True,
+                )
+                .where(func.coalesce(dr_task_concurrency_subquery.c.dr_count, 
0) < DM.max_active_tasks)
                 .options(selectinload(TI.dag_model))
                 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
             )
 
+            # Create a subquery with row numbers partitioned by dag_id and 
run_id.
+            # Different dags can have the same run_id but
+            # the dag_id combined with the run_id uniquely identify a run.
+            ranked_query = (
+                query.add_columns(
+                    func.row_number()
+                    .over(
+                        partition_by=[TI.dag_id, TI.run_id],
+                        order_by=[-TI.priority_weight, DR.logical_date, 
TI.map_index],
+                    )
+                    .label("row_num"),
+                    DM.max_active_tasks.label("dr_max_active_tasks"),
+                    # Create columns for the order_by checks here for sqlite.
+                    TI.priority_weight.label("priority_weight_for_ordering"),
+                    DR.logical_date.label("logical_date_for_ordering"),
+                    TI.map_index.label("map_index_for_ordering"),
+                )
+            ).subquery()
+
+            # Select only rows where row_number <= max_active_tasks.
+            query = (
+                select(TI)
+                .select_from(ranked_query)
+                .join(
+                    TI,
+                    (TI.dag_id == ranked_query.c.dag_id)
+                    & (TI.task_id == ranked_query.c.task_id)
+                    & (TI.run_id == ranked_query.c.run_id)
+                    & (TI.map_index == ranked_query.c.map_index),
+                )
+                .where(ranked_query.c.row_num <= 
ranked_query.c.dr_max_active_tasks)

Review Comment:
   The window function limits selection to `max_active_tasks` per dag_run 
without considering currently executing tasks. For example, if a dag_run has 3 
tasks already executing and `max_active_tasks=4`, this query would select 4 
scheduled tasks (since `row_num` goes 1-4), but only 1 can actually be queued. 
This means up to 3 extra tasks are retrieved and then rejected at line 701.
   
   Consider incorporating the current task count (`dr_count` from line 505) 
into the row number filter to improve efficiency:
   1. Add `dr_task_concurrency_subquery.c.dr_count` to the columns at line 533
   2. Change line 559 to: `where(ranked_query.c.row_num <= 
(ranked_query.c.dr_max_active_tasks - func.coalesce(ranked_query.c.dr_count, 
0)))`
   
   This would reduce unnecessary database retrievals and subsequent filtering 
in Python code.



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1180,6 +1226,102 @@ def 
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
         ]
         assert len(b_tis_in_wrong_executor) == 0
 
+    def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+        dag_tasks = {}
+
+        with dag_maker(dag_id=dag_id):
+            for i in range(task_num):
+                # Assign priority weight to certain tasks.
+                if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...
+                    weight = int(i / 2)
+                    dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}", 
priority_weight=weight)
+                else:
+                    # No executor specified, runs on default executor
+                    dag_tasks[f"op{i}"] = EmptyOperator(task_id=f"dummy{i}")
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        task_tis = {}
+
+        tis_list = []
+        for i in range(task_num):
+            task_tis[f"ti{i}"] = 
dag_run.get_task_instance(dag_tasks[f"op{i}"].task_id, session)
+            # add

Review Comment:
   [nitpick] The comment "# add" is too vague and doesn't add meaningful 
information. Consider either removing it or making it more descriptive, such as 
"# Add task instance to list for state updates".
   ```suggestion
               # Add task instance to list for state updates
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -1180,6 +1226,102 @@ def 
test_find_executable_task_instances_executor_with_teams(self, dag_maker, moc
         ]
         assert len(b_tis_in_wrong_executor) == 0
 
+    def task_helper(self, dag_maker, session, dag_id: str, task_num: int):
+        dag_tasks = {}
+
+        with dag_maker(dag_id=dag_id):
+            for i in range(task_num):
+                # Assign priority weight to certain tasks.
+                if (i % 10) == 0:  # 10, 20, 30, 40, 50, ...

Review Comment:
   The comment suggests the condition matches tasks at indices 10, 20, 30, 
etc., but the condition `(i % 10) == 0` actually matches indices 0, 10, 20, 30, 
etc. The comment should be updated to "# 0, 10, 20, 30, 40, ..." to accurately 
reflect when the condition is true.
   ```suggestion
                   if (i % 10) == 0:  # 0, 10, 20, 30, 40, 50, ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to