This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7eeec694f3e SQLA2: fix types + upgrade `query()` to `execute()` 
(#59690)
7eeec694f3e is described below

commit 7eeec694f3e890768bbeb3e24663f642f9ae9ac9
Author: Dev-iL <[email protected]>
AuthorDate: Mon Dec 22 06:10:55 2025 +0200

    SQLA2: fix types + upgrade `query()` to `execute()` (#59690)
---
 airflow-core/src/airflow/jobs/scheduler_job_runner.py | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index eb8e7b22770..9f85ac7cc11 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2450,8 +2450,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     @provide_session
     def _emit_ti_metrics(self, session: Session = NEW_SESSION) -> None:
         metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING, 
State.DEFERRED}
-        all_states_metric = (
-            session.query(
+        stmt = (
+            select(
                 TaskInstance.state,
                 TaskInstance.dag_id,
                 TaskInstance.task_id,
@@ -2460,22 +2460,24 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
             .filter(TaskInstance.state.in_(metric_states))
             .group_by(TaskInstance.state, TaskInstance.dag_id, 
TaskInstance.task_id, TaskInstance.queue)
-            .all()
         )
+        all_states_metric = session.execute(stmt).all()
 
         for state in metric_states:
             if state not in self.previous_ti_metrics:
                 self.previous_ti_metrics[state] = {}
 
             ti_metrics = {
-                (row.dag_id, row.task_id, row.queue): row.count
-                for row in all_states_metric
-                if row.state == state
+                (dag_id, task_id, queue): count
+                for row_state, dag_id, task_id, queue, count in 
all_states_metric
+                if row_state == state
             }
 
             for (dag_id, task_id, queue), count in ti_metrics.items():
-                Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}", count)
-                Stats.gauge(f"ti.{state}", count, tags={"queue": queue, 
"dag_id": dag_id, "task_id": task_id})
+                Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}", 
float(count))
+                Stats.gauge(
+                    f"ti.{state}", float(count), tags={"queue": queue, 
"dag_id": dag_id, "task_id": task_id}
+                )
 
             for prev_key in self.previous_ti_metrics[state]:
                 # Reset previously exported stats that are no longer present 
in current metrics to zero

Reply via email to