This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c05dae7107 Add heartbeat metric for DAG processor  (#42398)
c05dae7107 is described below

commit c05dae7107cd4b49c8a2bb58c29bfc3ef8ce2c79
Author: Kalyan <[email protected]>
AuthorDate: Wed Oct 2 03:29:54 2024 +0530

    Add heartbeat metric for DAG processor  (#42398)
    
    
    ---------
    
    Signed-off-by: kalyanr <[email protected]>
---
 airflow/jobs/dag_processor_job_runner.py                 | 16 ++++++++++------
 chart/files/statsd-mappings.yml                          |  6 ++++++
 .../logging-monitoring/metrics.rst                       |  1 +
 3 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/airflow/jobs/dag_processor_job_runner.py 
b/airflow/jobs/dag_processor_job_runner.py
index 76b2ab5925..28128efba4 100644
--- a/airflow/jobs/dag_processor_job_runner.py
+++ b/airflow/jobs/dag_processor_job_runner.py
@@ -17,18 +17,18 @@
 
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
 
 from airflow.jobs.base_job_runner import BaseJobRunner
 from airflow.jobs.job import Job, perform_heartbeat
+from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import NEW_SESSION, provide_session
 
 if TYPE_CHECKING:
-    from airflow.dag_processing.manager import DagFileProcessorManager
-
+    from sqlalchemy.orm import Session
 
-def empty_callback(_: Any) -> None:
-    pass
+    from airflow.dag_processing.manager import DagFileProcessorManager
 
 
 class DagProcessorJobRunner(BaseJobRunner, LoggingMixin):
@@ -52,7 +52,7 @@ class DagProcessorJobRunner(BaseJobRunner, LoggingMixin):
         self.processor = processor
         self.processor.heartbeat = lambda: perform_heartbeat(
             job=self.job,
-            heartbeat_callback=empty_callback,
+            heartbeat_callback=self.heartbeat_callback,
             only_if_necessary=True,
         )
 
@@ -67,3 +67,7 @@ class DagProcessorJobRunner(BaseJobRunner, LoggingMixin):
             self.processor.terminate()
             self.processor.end()
         return None
+
+    @provide_session
+    def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
+        Stats.incr("dag_processor_heartbeat", 1, 1)
diff --git a/chart/files/statsd-mappings.yml b/chart/files/statsd-mappings.yml
index 86d773fd20..cef9593dd1 100644
--- a/chart/files/statsd-mappings.yml
+++ b/chart/files/statsd-mappings.yml
@@ -46,6 +46,12 @@ mappings:
     labels:
       type: counter
 
+  - match: airflow.dag_processor_heartbeat
+    match_type: regex
+    name: "airflow_dag_processor_heartbeat"
+    labels:
+      type: counter
+
   - match: airflow.dag.*.*.duration
     name: "airflow_task_duration"
     labels:
diff --git 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
index ac44d1acba..079aa5d397 100644
--- 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
+++ 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
@@ -159,6 +159,7 @@ Name                                                        
           Descripti
 ``previously_succeeded``                                               Number 
of previously succeeded task instances. Metric with dag_id and task_id tagging.
 ``zombies_killed``                                                     Zombie 
tasks killed. Metric with dag_id and task_id tagging.
 ``scheduler_heartbeat``                                                
Scheduler heartbeats
+``dag_processor_heartbeat``                                            
Standalone DAG processor heartbeats
 ``dag_processing.processes``                                           
Relative number of currently running DAG parsing processes (ie this delta
                                                                        is 
negative when, since the last metric was sent, processes have completed).
                                                                        Metric 
with file_path and action tagging.

Reply via email to