This is an automated email from the ASF dual-hosted git repository. utkarsharma pushed a commit to branch sync_2-10-test-rc2 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5854ff060cc3a8ff722c9923cf44e0d2173b7879 Author: Jarek Potiuk <[email protected]> AuthorDate: Thu Oct 31 11:57:19 2024 +0100 Fix broken stat scheduler_loop_duration (#42886) (#43544) * wip * wip * fix lint err --------- Co-authored-by: venkat <[email protected]> (cherry picked from commit 60b8056616e94f987c3096dff3b59211d649b4b0) Co-authored-by: Venkat VJ <[email protected]> (cherry picked from commit 842c60a7cf276abd12f1c1ea04744a13837bda98) --- airflow/jobs/scheduler_job_runner.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index dd8ff4377f..4f8e900df1 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1105,16 +1105,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): ) for loop_count in itertools.count(start=1): - with Trace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span: + with Trace.start_span( + span_name="scheduler_job_loop", component="SchedulerJobRunner" + ) as span, Stats.timer("scheduler.scheduler_loop_duration") as timer: span.set_attribute("category", "scheduler") span.set_attribute("loop_count", loop_count) - with Stats.timer("scheduler.scheduler_loop_duration") as timer: - if self.using_sqlite and self.processor_agent: - self.processor_agent.run_single_parsing_loop() - # For the sqlite case w/ 1 thread, wait until the processor - # is finished to avoid concurrent access to the DB. - self.log.debug("Waiting for processors to finish since we're using sqlite") - self.processor_agent.wait_until_finished() + + if self.using_sqlite and self.processor_agent: + self.processor_agent.run_single_parsing_loop() + # For the sqlite case w/ 1 thread, wait until the processor + # is finished to avoid concurrent access to the DB. + self.log.debug("Waiting for processors to finish since we're using sqlite") + self.processor_agent.wait_until_finished() with create_session() as session: # This will schedule for as many executors as possible.
