This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7eeec694f3e SQLA2: fix types + upgrade `query()` to `execute()`
(#59690)
7eeec694f3e is described below
commit 7eeec694f3e890768bbeb3e24663f642f9ae9ac9
Author: Dev-iL <[email protected]>
AuthorDate: Mon Dec 22 06:10:55 2025 +0200
SQLA2: fix types + upgrade `query()` to `execute()` (#59690)
---
airflow-core/src/airflow/jobs/scheduler_job_runner.py | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index eb8e7b22770..9f85ac7cc11 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2450,8 +2450,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
@provide_session
def _emit_ti_metrics(self, session: Session = NEW_SESSION) -> None:
metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING,
State.DEFERRED}
- all_states_metric = (
- session.query(
+ stmt = (
+ select(
TaskInstance.state,
TaskInstance.dag_id,
TaskInstance.task_id,
@@ -2460,22 +2460,24 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
.filter(TaskInstance.state.in_(metric_states))
.group_by(TaskInstance.state, TaskInstance.dag_id,
TaskInstance.task_id, TaskInstance.queue)
- .all()
)
+ all_states_metric = session.execute(stmt).all()
for state in metric_states:
if state not in self.previous_ti_metrics:
self.previous_ti_metrics[state] = {}
ti_metrics = {
- (row.dag_id, row.task_id, row.queue): row.count
- for row in all_states_metric
- if row.state == state
+ (dag_id, task_id, queue): count
+ for row_state, dag_id, task_id, queue, count in
all_states_metric
+ if row_state == state
}
for (dag_id, task_id, queue), count in ti_metrics.items():
- Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}", count)
- Stats.gauge(f"ti.{state}", count, tags={"queue": queue,
"dag_id": dag_id, "task_id": task_id})
+ Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}",
float(count))
+ Stats.gauge(
+ f"ti.{state}", float(count), tags={"queue": queue,
"dag_id": dag_id, "task_id": task_id}
+ )
for prev_key in self.previous_ti_metrics[state]:
# Reset previously exported stats that are no longer present
in current metrics to zero