This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fe0633d729c Fix metric emitting for triggerer and dag processor 
(#61154)
fe0633d729c is described below

commit fe0633d729c85131ca96aa41a8c56282a407b7d5
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Jan 30 10:11:34 2026 -0800

    Fix metric emitting for triggerer and dag processor (#61154)
    
    After a recent refactoring (#53722) a call to an initializer for stats
    is required but was not done for the dag processor or triggerer
---
 airflow-core/src/airflow/dag_processing/manager.py |  6 ++++
 .../src/airflow/jobs/triggerer_job_runner.py       |  5 +++
 .../tests/unit/dag_processing/test_manager.py      | 14 ++++++++
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 37 ++++++++++++++++++++++
 4 files changed, 62 insertions(+)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index c6c547c8258..d77503a5885 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -271,6 +271,12 @@ class DagFileProcessorManager(LoggingMixin):
         # Related: https://github.com/apache/airflow/pull/57459
         os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
 
+        Stats.initialize(
+            is_statsd_datadog_enabled=conf.getboolean("metrics", 
"statsd_datadog_enabled"),
+            is_statsd_on=conf.getboolean("metrics", "statsd_on"),
+            is_otel_on=conf.getboolean("metrics", "otel_on"),
+        )
+
         self.register_exit_signals()
 
         self.log.info("Processing files using up to %s processes at a time ", 
self._parallelism)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index ee32253f495..57660c6820e 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -166,6 +166,11 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
     def _execute(self) -> int | None:
         self.log.info("Starting the triggerer")
         self.register_signals()
+        Stats.initialize(
+            is_statsd_datadog_enabled=conf.getboolean("metrics", 
"statsd_datadog_enabled"),
+            is_statsd_on=conf.getboolean("metrics", "statsd_on"),
+            is_otel_on=conf.getboolean("metrics", "otel_on"),
+        )
         try:
             # Kick off runner sub-process without DB access
             self.trigger_runner = TriggerRunnerSupervisor.start(
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index cd06a2820ff..4eab5a5f455 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1569,3 +1569,17 @@ class TestDagFileProcessorManager:
         mock_process_start.assert_called_once()
         call_kwargs = mock_process_start.call_args.kwargs
         assert call_kwargs["bundle_name"] == "testing"
+
+    @mock.patch("airflow.dag_processing.manager.Stats.initialize")
+    def test_stats_initialize_called_on_run(self, stats_init_mock, tmp_path, 
configure_testing_dag_bundle):
+        """Test that Stats.initialize() is called when 
DagFileProcessorManager.run() is executed."""
+        with configure_testing_dag_bundle(tmp_path):
+            manager = DagFileProcessorManager(max_runs=1)
+            manager.run()
+
+        # Verify Stats.initialize was called with the expected configuration 
parameters
+        stats_init_mock.assert_called_once()
+        call_kwargs = stats_init_mock.call_args.kwargs
+        assert "is_statsd_datadog_enabled" in call_kwargs
+        assert "is_statsd_on" in call_kwargs
+        assert "is_otel_on" in call_kwargs
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 83c71d845ac..3dc6dfc9f2e 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1201,6 +1201,43 @@ def 
test_update_triggers_skips_when_ti_has_no_dag_version(session, supervisor_bu
     supervisor.stdin.write.assert_not_called()
 
 
+class TestTriggererJobRunner:
+    @patch("airflow.jobs.triggerer_job_runner.Stats.initialize")
+    @patch.object(TriggerRunnerSupervisor, "start")
+    def test_stats_initialize_called_on_execute(self, mock_supervisor_start, 
stats_init_mock, session):
+        """Test that Stats.initialize() is called when 
TriggererJobRunner._execute() is executed."""
+        # Setup mock supervisor to immediately stop
+        mock_supervisor = MagicMock()
+        mock_supervisor.stop = False
+        mock_supervisor._exit_code = None
+        mock_supervisor.is_alive.return_value = True
+        mock_supervisor.run.side_effect = lambda: setattr(mock_supervisor, 
"stop", True)
+        mock_supervisor_start.return_value = mock_supervisor
+
+        job = Job()
+        session.add(job)
+        session.flush()
+
+        job_runner = TriggererJobRunner(job)
+        job_runner.trigger_runner = mock_supervisor
+        mock_supervisor.stop = True  # Stop immediately
+
+        # We don't need to run the full _execute, just verify Stats.initialize 
is called
+        # before TriggerRunnerSupervisor.start
+        with patch.object(job_runner, "register_signals"):
+            try:
+                job_runner._execute()
+            except Exception:
+                pass  # We expect this to fail since we're mocking
+
+        # Verify Stats.initialize was called with the expected configuration 
parameters
+        stats_init_mock.assert_called_once()
+        call_kwargs = stats_init_mock.call_args.kwargs
+        assert "is_statsd_datadog_enabled" in call_kwargs
+        assert "is_statsd_on" in call_kwargs
+        assert "is_otel_on" in call_kwargs
+
+
 class TestTriggererMessageTypes:
     def test_message_types_in_triggerer(self):
         """

Reply via email to