This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 809f59722db Refactor scheduler_dag_execution_timing.py to use SQLA2 
(#59419)
809f59722db is described below

commit 809f59722dbed0de04ea2ea7effed7648ccefba1
Author: Yeonguk Choo <[email protected]>
AuthorDate: Tue Dec 16 13:55:08 2025 +0900

    Refactor scheduler_dag_execution_timing.py to use SQLA2 (#59419)
---
 .pre-commit-config.yaml                            | 1 +
 dev/airflow_perf/scheduler_dag_execution_timing.py | 9 +++++----
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 46d505f55ab..f2221ed27e1 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -413,6 +413,7 @@ repos:
             ^airflow-ctl.*\.py$|
             ^airflow-core/src/airflow/models/.*\.py$|
             
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$|
+            ^dev/airflow_perf/scheduler_dag_execution_timing.py$|
             ^providers/openlineage/.*\.py$|
             ^task_sdk.*\.py$
         pass_filenames: true
diff --git a/dev/airflow_perf/scheduler_dag_execution_timing.py 
b/dev/airflow_perf/scheduler_dag_execution_timing.py
index 47eaf92d7c8..48427f35ce0 100755
--- a/dev/airflow_perf/scheduler_dag_execution_timing.py
+++ b/dev/airflow_perf/scheduler_dag_execution_timing.py
@@ -27,6 +27,7 @@ from argparse import Namespace
 from operator import attrgetter
 
 import rich_click as click
+from sqlalchemy import delete, update
 
 from airflow.jobs.job import run_job
 from airflow.utils.types import DagRunTriggeredByType
@@ -135,9 +136,9 @@ def reset_dag(dag, session):
     TI = airflow.models.TaskInstance
     dag_id = dag.dag_id
 
-    session.query(DM).filter(DM.dag_id == dag_id).update({"is_paused": False})
-    session.query(DR).filter(DR.dag_id == dag_id).delete()
-    session.query(TI).filter(TI.dag_id == dag_id).delete()
+    session.execute(update(DM).where(DM.dag_id == 
dag_id).values(is_paused=False))
+    session.execute(delete(DR).where(DR.dag_id == dag_id))
+    session.execute(delete(TI).where(TI.dag_id == dag_id))
 
 
 def pause_all_dags(session):
@@ -146,7 +147,7 @@ def pause_all_dags(session):
     """
     from airflow.models.dag import DagModel
 
-    session.query(DagModel).update({"is_paused": True})
+    session.execute(update(DagModel).values(is_paused=True))
 
 
 def create_dag_runs(dag, num_runs, session):

Reply via email to