This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 809f59722db Refactor scheduler_dag_execution_timing.py to use SQLA2
(#59419)
809f59722db is described below
commit 809f59722dbed0de04ea2ea7effed7648ccefba1
Author: Yeonguk Choo <[email protected]>
AuthorDate: Tue Dec 16 13:55:08 2025 +0900
Refactor scheduler_dag_execution_timing.py to use SQLA2 (#59419)
---
.pre-commit-config.yaml | 1 +
dev/airflow_perf/scheduler_dag_execution_timing.py | 9 +++++----
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 46d505f55ab..f2221ed27e1 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -413,6 +413,7 @@ repos:
^airflow-ctl.*\.py$|
^airflow-core/src/airflow/models/.*\.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$|
+ ^dev/airflow_perf/scheduler_dag_execution_timing.py$|
^providers/openlineage/.*\.py$|
^task_sdk.*\.py$
pass_filenames: true
diff --git a/dev/airflow_perf/scheduler_dag_execution_timing.py
b/dev/airflow_perf/scheduler_dag_execution_timing.py
index 47eaf92d7c8..48427f35ce0 100755
--- a/dev/airflow_perf/scheduler_dag_execution_timing.py
+++ b/dev/airflow_perf/scheduler_dag_execution_timing.py
@@ -27,6 +27,7 @@ from argparse import Namespace
from operator import attrgetter
import rich_click as click
+from sqlalchemy import delete, update
from airflow.jobs.job import run_job
from airflow.utils.types import DagRunTriggeredByType
@@ -135,9 +136,9 @@ def reset_dag(dag, session):
TI = airflow.models.TaskInstance
dag_id = dag.dag_id
- session.query(DM).filter(DM.dag_id == dag_id).update({"is_paused": False})
- session.query(DR).filter(DR.dag_id == dag_id).delete()
- session.query(TI).filter(TI.dag_id == dag_id).delete()
+ session.execute(update(DM).where(DM.dag_id ==
dag_id).values(is_paused=False))
+ session.execute(delete(DR).where(DR.dag_id == dag_id))
+ session.execute(delete(TI).where(TI.dag_id == dag_id))
def pause_all_dags(session):
@@ -146,7 +147,7 @@ def pause_all_dags(session):
"""
from airflow.models.dag import DagModel
- session.query(DagModel).update({"is_paused": True})
+ session.execute(update(DagModel).values(is_paused=True))
def create_dag_runs(dag, num_runs, session):