This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0fd42ff015 Save scheduler execution time during search for queued 
dag_runs (#30699)
0fd42ff015 is described below

commit 0fd42ff015be02d1a6a6c2e1a080f8267194b3a5
Author: AutomationDev85 <[email protected]>
AuthorDate: Thu May 25 21:11:26 2023 +0200

    Save scheduler execution time during search for queued dag_runs (#30699)
    
    * Function returns list of dagruns and not query
    
    * Changed pytests
    
    * Changed all to _start_queued_dagruns
    
    * Added comment and fixed tests
    
    * Fixed typo
---
 airflow/jobs/scheduler_job_runner.py | 3 ++-
 tests/jobs/test_scheduler_job.py     | 4 +++-
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 44ae2ce34d..da40d9eea7 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1298,7 +1298,8 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
 
     def _start_queued_dagruns(self, session: Session) -> None:
         """Find DagRuns in queued state and decide moving them to running 
state."""
-        dag_runs: Collection[DagRun] = 
self._get_next_dagruns_to_examine(DagRunState.QUEUED, session)
+        # added all() to save runtime, otherwise query is executed more than 
once
+        dag_runs: Collection[DagRun] = 
self._get_next_dagruns_to_examine(DagRunState.QUEUED, session).all()
 
         active_runs_of_dags = Counter(
             DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), 
only_running=True, session=session),
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5be7612790..8d28e01867 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -5141,7 +5141,9 @@ class TestSchedulerJobQueriesCount:
 
             with assert_queries_count(expected_query_count, margin=15):
                 with mock.patch.object(DagRun, "next_dagruns_to_examine") as 
mock_dagruns:
-                    mock_dagruns.return_value = dagruns
+                    query = MagicMock()
+                    query.all.return_value = dagruns
+                    mock_dagruns.return_value = query
 
                     self.job_runner._run_scheduler_loop()
 

Reply via email to