This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1193e5e29d3 Fix scheduler MySQL task instance index hint (#66785)
1193e5e29d3 is described below

commit 1193e5e29d371e66b16938aaab401847d9f67f68
Author: SilverGun <[email protected]>
AuthorDate: Mon May 18 13:45:39 2026 +0900

    Fix scheduler MySQL task instance index hint (#66785)
    
    Co-authored-by: nanaones <[email protected]>
---
 .../src/airflow/jobs/scheduler_job_runner.py       |  1 -
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 29 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 9a650b110c9..18e5dd3fd22 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -632,7 +632,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             # Select only rows where row_number <= max_active_tasks.
             query = (
                 select(TI)
-                .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
                 .select_from(ranked_query)
                 .join(
                     TI,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index ff1222c1222..5bb780fdbac 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -37,6 +37,7 @@ import psutil
 import pytest
 import time_machine
 from sqlalchemy import delete, func, select, update
+from sqlalchemy.dialects import mysql
 from sqlalchemy.orm import joinedload
 
 from airflow import settings
@@ -1248,6 +1249,34 @@ class TestSchedulerJob:
         assert {x.key for x in queued_tis} == {ti_non_backfill.key, 
ti_backfill.key}
         session.rollback()
 
+    def 
test_find_executable_task_instances_mysql_hint_only_applies_to_inner_query(self,
 dag_maker, session):
+        dag_id = 
"SchedulerJobTest.test_find_executable_task_instances_mysql_hint_only_applies_to_inner_query"
+        task_id = "dummy"
+        with dag_maker(dag_id=dag_id, max_active_tasks=16):
+            task = EmptyOperator(task_id=task_id)
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+        ti = dag_run.get_task_instance(task.task_id)
+        ti.state = State.SCHEDULED
+        session.merge(ti)
+        session.flush()
+
+        captured_queries = []
+
+        def capture_locked_query(query, **kwargs):
+            captured_queries.append(query)
+            return query
+
+        with mock.patch("airflow.jobs.scheduler_job_runner.with_row_locks", 
side_effect=capture_locked_query):
+            queued_tis = 
self.job_runner._executable_task_instances_to_queued(max_tis=32, 
session=session)
+
+        assert {queued_ti.key for queued_ti in queued_tis} == {ti.key}
+        compiled_query = 
str(captured_queries[0].compile(dialect=mysql.dialect()))
+        assert compiled_query.count("USE INDEX (ti_state)") == 1
+
     def test_find_executable_task_instances_pool(self, dag_maker):
         dag_id = "SchedulerJobTest.test_find_executable_task_instances_pool"
         task_id_1 = "dummy"

Reply via email to