This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2956c98ecb6 feat(metrics): wrap executor.heartbeat() in a timer to 
localize loop slowdowns (#66808)
2956c98ecb6 is described below

commit 2956c98ecb6220ec7aad473af84ff724dd934bf9
Author: Stefan Wang <[email protected]>
AuthorDate: Tue May 12 14:58:53 2026 -0700

    feat(metrics): wrap executor.heartbeat() in a timer to localize loop 
slowdowns (#66808)
    
    Emit scheduler.executor_heartbeat_duration as a per-executor timer so
    operators can see whether executor.heartbeat() is the bottleneck of a
    slow scheduler loop, instead of inferring from the aggregate
    scheduler.scheduler_loop_duration.
    
    Tagged by type(executor).__name__ so multi-executor deployments
    attribute the cost to each configured executor separately.
    
    Closes #66803
    
    Signed-off-by: 1fanwang <[email protected]>
---
 airflow-core/src/airflow/jobs/scheduler_job_runner.py    |  6 +++++-
 airflow-core/tests/unit/jobs/test_scheduler_job.py       | 16 ++++++++++++++++
 .../observability/metrics/metrics_template.yaml          |  7 +++++++
 3 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 3eed95a8bb0..1a3f55b7f6f 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1665,7 +1665,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 # either a no-op, or they will check-in on currently running 
tasks and send out new
                 # events to be processed below.
                 for executor in self.executors:
-                    executor.heartbeat()
+                    with stats.timer(
+                        "scheduler.executor_heartbeat_duration",
+                        tags={"executor": type(executor).__name__},
+                    ):
+                        executor.heartbeat()
 
                 with create_session() as session:
                     num_finished_events = 0
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 56e69459104..eb763635ddf 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -1181,6 +1181,22 @@ class TestSchedulerJob:
             for executor in self.job_runner.executors:
                 executor.heartbeat.assert_called_once()
 
+    def test_executor_heartbeat_emits_timer(self, mock_executors, 
configure_testing_dag_bundle):
+        with configure_testing_dag_bundle(os.devnull):
+            scheduler_job = Job()
+            self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1)
+            with patch("airflow.jobs.scheduler_job_runner.stats.timer") as 
mock_timer:
+                self.job_runner._execute()
+
+            heartbeat_calls = [
+                timer_call
+                for timer_call in mock_timer.call_args_list
+                if timer_call.args and timer_call.args[0] == 
"scheduler.executor_heartbeat_duration"
+            ]
+            assert len(heartbeat_calls) == len(self.job_runner.executors)
+            for executor, timer_call in zip(self.job_runner.executors, 
heartbeat_calls):
+                assert timer_call.kwargs.get("tags") == {"executor": 
type(executor).__name__}
+
     def test_executor_events_processed(self, mock_executors, 
configure_testing_dag_bundle):
         with configure_testing_dag_bundle(os.devnull):
             scheduler_job = Job()
diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
 
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
index f4c48834927..f7ddd2b1243 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
@@ -604,6 +604,13 @@ metrics:
     legacy_name: "-"
     name_variables: []
 
+  - name: "scheduler.executor_heartbeat_duration"
+    description: "Milliseconds spent in ``executor.heartbeat()`` per scheduler 
loop iteration, tagged
+      by executor class name so each configured executor is reported 
separately."
+    type: "timer"
+    legacy_name: "-"
+    name_variables: []
+
   - name: "dagrun.first_task_scheduling_delay"
     description: "Milliseconds elapsed between first task start_date and 
dagrun expected start"
     type: "timer"

Reply via email to