This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 842c60a7cf2 Fix broken stat scheduler_loop_duration (#42886) (#43544)
842c60a7cf2 is described below
commit 842c60a7cf276abd12f1c1ea04744a13837bda98
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Oct 31 11:57:19 2024 +0100
Fix broken stat scheduler_loop_duration (#42886) (#43544)
* wip
* wip
* fix lint err
---------
Co-authored-by: venkat <[email protected]>
(cherry picked from commit 60b8056616e94f987c3096dff3b59211d649b4b0)
Co-authored-by: Venkat VJ <[email protected]>
---
airflow/jobs/scheduler_job_runner.py | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index dd8ff4377f5..4f8e900df13 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1105,16 +1105,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
for loop_count in itertools.count(start=1):
- with Trace.start_span(span_name="scheduler_job_loop",
component="SchedulerJobRunner") as span:
+ with Trace.start_span(
+ span_name="scheduler_job_loop", component="SchedulerJobRunner"
+ ) as span, Stats.timer("scheduler.scheduler_loop_duration") as
timer:
span.set_attribute("category", "scheduler")
span.set_attribute("loop_count", loop_count)
- with Stats.timer("scheduler.scheduler_loop_duration") as timer:
- if self.using_sqlite and self.processor_agent:
- self.processor_agent.run_single_parsing_loop()
- # For the sqlite case w/ 1 thread, wait until the
processor
- # is finished to avoid concurrent access to the DB.
- self.log.debug("Waiting for processors to finish since
we're using sqlite")
- self.processor_agent.wait_until_finished()
+
+ if self.using_sqlite and self.processor_agent:
+ self.processor_agent.run_single_parsing_loop()
+ # For the sqlite case w/ 1 thread, wait until the processor
+ # is finished to avoid concurrent access to the DB.
+ self.log.debug("Waiting for processors to finish since
we're using sqlite")
+ self.processor_agent.wait_until_finished()
with create_session() as session:
# This will schedule for as many executors as possible.