This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0fd42ff015 Save scheduler execution time during search for queued
dag_runs (#30699)
0fd42ff015 is described below
commit 0fd42ff015be02d1a6a6c2e1a080f8267194b3a5
Author: AutomationDev85 <[email protected]>
AuthorDate: Thu May 25 21:11:26 2023 +0200
Save scheduler execution time during search for queued dag_runs (#30699)
* Function returns list of dagruns and not query
* Changed pytests
* Changed all to _start_queued_dagruns
* Added comment and fixed tests
* Fixed typo
---
airflow/jobs/scheduler_job_runner.py | 3 ++-
tests/jobs/test_scheduler_job.py | 4 +++-
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 44ae2ce34d..da40d9eea7 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1298,7 +1298,8 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running
state."""
- dag_runs: Collection[DagRun] =
self._get_next_dagruns_to_examine(DagRunState.QUEUED, session)
+ # added all() to save runtime, otherwise query is executed more than
once
+ dag_runs: Collection[DagRun] =
self._get_next_dagruns_to_examine(DagRunState.QUEUED, session).all()
active_runs_of_dags = Counter(
DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs),
only_running=True, session=session),
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5be7612790..8d28e01867 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -5141,7 +5141,9 @@ class TestSchedulerJobQueriesCount:
with assert_queries_count(expected_query_count, margin=15):
with mock.patch.object(DagRun, "next_dagruns_to_examine") as
mock_dagruns:
- mock_dagruns.return_value = dagruns
+ query = MagicMock()
+ query.all.return_value = dagruns
+ mock_dagruns.return_value = query
self.job_runner._run_scheduler_loop()