This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fe0633d729c Fix metric emitting for triggerer and dag processor
(#61154)
fe0633d729c is described below
commit fe0633d729c85131ca96aa41a8c56282a407b7d5
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Jan 30 10:11:34 2026 -0800
Fix metric emitting for triggerer and dag processor (#61154)
After a recent refactoring (#53722) a call to an initializer for stats
is required but was not done for the dag processor or triggerer
---
airflow-core/src/airflow/dag_processing/manager.py | 6 ++++
.../src/airflow/jobs/triggerer_job_runner.py | 5 +++
.../tests/unit/dag_processing/test_manager.py | 14 ++++++++
airflow-core/tests/unit/jobs/test_triggerer_job.py | 37 ++++++++++++++++++++++
4 files changed, 62 insertions(+)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index c6c547c8258..d77503a5885 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -271,6 +271,12 @@ class DagFileProcessorManager(LoggingMixin):
# Related: https://github.com/apache/airflow/pull/57459
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
+ Stats.initialize(
+ is_statsd_datadog_enabled=conf.getboolean("metrics",
"statsd_datadog_enabled"),
+ is_statsd_on=conf.getboolean("metrics", "statsd_on"),
+ is_otel_on=conf.getboolean("metrics", "otel_on"),
+ )
+
self.register_exit_signals()
self.log.info("Processing files using up to %s processes at a time ",
self._parallelism)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index ee32253f495..57660c6820e 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -166,6 +166,11 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
def _execute(self) -> int | None:
self.log.info("Starting the triggerer")
self.register_signals()
+ Stats.initialize(
+ is_statsd_datadog_enabled=conf.getboolean("metrics",
"statsd_datadog_enabled"),
+ is_statsd_on=conf.getboolean("metrics", "statsd_on"),
+ is_otel_on=conf.getboolean("metrics", "otel_on"),
+ )
try:
# Kick off runner sub-process without DB access
self.trigger_runner = TriggerRunnerSupervisor.start(
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index cd06a2820ff..4eab5a5f455 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1569,3 +1569,17 @@ class TestDagFileProcessorManager:
mock_process_start.assert_called_once()
call_kwargs = mock_process_start.call_args.kwargs
assert call_kwargs["bundle_name"] == "testing"
+
+ @mock.patch("airflow.dag_processing.manager.Stats.initialize")
+ def test_stats_initialize_called_on_run(self, stats_init_mock, tmp_path,
configure_testing_dag_bundle):
+ """Test that Stats.initialize() is called when
DagFileProcessorManager.run() is executed."""
+ with configure_testing_dag_bundle(tmp_path):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.run()
+
+ # Verify Stats.initialize was called with the expected configuration
parameters
+ stats_init_mock.assert_called_once()
+ call_kwargs = stats_init_mock.call_args.kwargs
+ assert "is_statsd_datadog_enabled" in call_kwargs
+ assert "is_statsd_on" in call_kwargs
+ assert "is_otel_on" in call_kwargs
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 83c71d845ac..3dc6dfc9f2e 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1201,6 +1201,43 @@ def
test_update_triggers_skips_when_ti_has_no_dag_version(session, supervisor_bu
supervisor.stdin.write.assert_not_called()
+class TestTriggererJobRunner:
+ @patch("airflow.jobs.triggerer_job_runner.Stats.initialize")
+ @patch.object(TriggerRunnerSupervisor, "start")
+ def test_stats_initialize_called_on_execute(self, mock_supervisor_start,
stats_init_mock, session):
+ """Test that Stats.initialize() is called when
TriggererJobRunner._execute() is executed."""
+ # Setup mock supervisor to immediately stop
+ mock_supervisor = MagicMock()
+ mock_supervisor.stop = False
+ mock_supervisor._exit_code = None
+ mock_supervisor.is_alive.return_value = True
+ mock_supervisor.run.side_effect = lambda: setattr(mock_supervisor,
"stop", True)
+ mock_supervisor_start.return_value = mock_supervisor
+
+ job = Job()
+ session.add(job)
+ session.flush()
+
+ job_runner = TriggererJobRunner(job)
+ job_runner.trigger_runner = mock_supervisor
+ mock_supervisor.stop = True # Stop immediately
+
+ # We don't need to run the full _execute, just verify Stats.initialize
is called
+ # before TriggerRunnerSupervisor.start
+ with patch.object(job_runner, "register_signals"):
+ try:
+ job_runner._execute()
+ except Exception:
+ pass # We expect this to fail since we're mocking
+
+ # Verify Stats.initialize was called with the expected configuration
parameters
+ stats_init_mock.assert_called_once()
+ call_kwargs = stats_init_mock.call_args.kwargs
+ assert "is_statsd_datadog_enabled" in call_kwargs
+ assert "is_statsd_on" in call_kwargs
+ assert "is_otel_on" in call_kwargs
+
+
class TestTriggererMessageTypes:
def test_message_types_in_triggerer(self):
"""