This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d294caab3d Refactor imports from 'airflow-core' out of 'shared/stats' 
(#62127)
0d294caab3d is described below

commit 0d294caab3d494eeff6184b09e8e2dda6ffc47ed
Author: Christos Bisias <[email protected]>
AuthorDate: Fri Feb 20 02:18:46 2026 +0200

    Refactor imports from 'airflow-core' out of 'shared/stats' (#62127)
    
    * refactor core imports out of stats
    
    * remove unused param
    
    * skip unit tests if datadog isn't available
    
    * fix import in task_runner
    
    * add a test helper for the statsd_logger factory
---
 airflow-core/src/airflow/dag_processing/manager.py |   8 +-
 .../src/airflow/executors/base_executor.py         |   8 +-
 .../src/airflow/jobs/scheduler_job_runner.py       |   8 +-
 .../src/airflow/jobs/triggerer_job_runner.py       |   8 +-
 .../airflow/observability/metrics/stats_utils.py   |  40 +++
 .../tests/unit/dag_processing/test_manager.py      |   4 +-
 airflow-core/tests/unit/jobs/test_triggerer_job.py |   4 +-
 .../tests/unit/observability/metrics/__init__.py   |  16 ++
 .../tests/unit/observability/metrics/test_stats.py |  97 +++++++
 .../observability/metrics/base_stats_logger.py     |  11 +-
 .../airflow_shared/observability/metrics/stats.py  |  21 +-
 .../tests/observability/metrics/test_stats.py      | 304 ++++++++-------------
 .../src/airflow/sdk/execution_time/task_runner.py  |   8 +-
 .../sdk/observability/metrics/otel_logger.py       |   2 +-
 .../sdk/observability/metrics/stats_utils.py       |  40 +++
 .../sdk/observability/metrics/statsd_logger.py     |   2 +-
 task-sdk/src/airflow/sdk/serde/__init__.py         |   8 +-
 17 files changed, 328 insertions(+), 261 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 657595f58d7..c0dff728806 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -61,6 +61,7 @@ from airflow.models.dagbundle import DagBundleModel
 from airflow.models.dagwarning import DagWarning
 from airflow.models.db_callback_request import DbCallbackRequest
 from airflow.models.errors import ParseImportError
+from airflow.observability.metrics import stats_utils
 from airflow.observability.trace import DebugTrace
 from airflow.sdk import SecretCache
 from airflow.sdk.log import init_log_file, logging_processors
@@ -271,11 +272,8 @@ class DagFileProcessorManager(LoggingMixin):
         # Related: https://github.com/apache/airflow/pull/57459
         os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
 
-        Stats.initialize(
-            is_statsd_datadog_enabled=conf.getboolean("metrics", 
"statsd_datadog_enabled"),
-            is_statsd_on=conf.getboolean("metrics", "statsd_on"),
-            is_otel_on=conf.getboolean("metrics", "otel_on"),
-        )
+        stats_factory = stats_utils.get_stats_factory(Stats)
+        Stats.initialize(factory=stats_factory)
 
         self.register_exit_signals()
 
diff --git a/airflow-core/src/airflow/executors/base_executor.py 
b/airflow-core/src/airflow/executors/base_executor.py
index 927e4c07d64..a57790744b7 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -34,6 +34,7 @@ from airflow.configuration import conf
 from airflow.executors import workloads
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.models import Log
+from airflow.observability.metrics import stats_utils
 from airflow.observability.trace import DebugTrace, Trace, add_debug_span
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import TaskInstanceState
@@ -175,11 +176,8 @@ class BaseExecutor(LoggingMixin):
         return generator
 
     def __init__(self, parallelism: int = PARALLELISM, team_name: str | None = 
None):
-        Stats.initialize(
-            is_statsd_datadog_enabled=conf.getboolean("metrics", 
"statsd_datadog_enabled"),
-            is_statsd_on=conf.getboolean("metrics", "statsd_on"),
-            is_otel_on=conf.getboolean("metrics", "otel_on"),
-        )
+        stats_factory = stats_utils.get_stats_factory(Stats)
+        Stats.initialize(factory=stats_factory)
         super().__init__()
         # Ensure we set this now, so that each subprocess gets the same value
         from airflow.api_fastapi.auth.tokens import get_signing_args
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index d592b551d95..9b77ce4592c 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -97,6 +97,7 @@ from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.team import Team
 from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger, 
TriggerFailureReason
+from airflow.observability.metrics import stats_utils
 from airflow.observability.trace import DebugTrace, Trace, add_debug_span
 from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
 from airflow.serialization.definitions.notset import NOTSET
@@ -1322,11 +1323,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             DagRun.set_active_spans(active_spans=self.active_spans)
             BaseExecutor.set_active_spans(active_spans=self.active_spans)
 
-            Stats.initialize(
-                is_statsd_datadog_enabled=conf.getboolean("metrics", 
"statsd_datadog_enabled"),
-                is_statsd_on=conf.getboolean("metrics", "statsd_on"),
-                is_otel_on=conf.getboolean("metrics", "otel_on"),
-            )
+            stats_factory = stats_utils.get_stats_factory(Stats)
+            Stats.initialize(factory=stats_factory)
 
             self._run_scheduler_loop()
 
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index c5ac4e41f6e..ba083b8fad3 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -48,6 +48,7 @@ from airflow.executors import workloads
 from airflow.jobs.base_job_runner import BaseJobRunner
 from airflow.jobs.job import perform_heartbeat
 from airflow.models.trigger import Trigger
+from airflow.observability.metrics import stats_utils
 from airflow.observability.trace import DebugTrace, Trace, add_debug_span
 from airflow.sdk.api.datamodels._generated import HITLDetailResponse
 from airflow.sdk.execution_time.comms import (
@@ -167,11 +168,8 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
     def _execute(self) -> int | None:
         self.log.info("Starting the triggerer")
         self.register_signals()
-        Stats.initialize(
-            is_statsd_datadog_enabled=conf.getboolean("metrics", 
"statsd_datadog_enabled"),
-            is_statsd_on=conf.getboolean("metrics", "statsd_on"),
-            is_otel_on=conf.getboolean("metrics", "otel_on"),
-        )
+        stats_factory = stats_utils.get_stats_factory(Stats)
+        Stats.initialize(factory=stats_factory)
         try:
             # Kick off runner sub-process without DB access
             self.trigger_runner = TriggerRunnerSupervisor.start(
diff --git a/airflow-core/src/airflow/observability/metrics/stats_utils.py 
b/airflow-core/src/airflow/observability/metrics/stats_utils.py
new file mode 100644
index 00000000000..a474de333fd
--- /dev/null
+++ b/airflow-core/src/airflow/observability/metrics/stats_utils.py
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from collections.abc import Callable
+
+from airflow._shared.observability.metrics.base_stats_logger import 
NoStatsLogger
+from airflow.configuration import conf
+
+
+def get_stats_factory(stats_cls) -> Callable:
+    if conf.getboolean("metrics", "statsd_datadog_enabled"):
+        from airflow.observability.metrics import datadog_logger
+
+        # Datadog needs the 'stats_cls' param, so wrap it into a 0-arg factory.
+        return lambda: datadog_logger.get_dogstatsd_logger(stats_cls)
+    if conf.getboolean("metrics", "statsd_on"):
+        from airflow.observability.metrics import statsd_logger
+
+        return statsd_logger.get_statsd_logger
+    if conf.getboolean("metrics", "otel_on"):
+        from airflow.observability.metrics import otel_logger
+
+        return otel_logger.get_otel_logger
+    return NoStatsLogger
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index d6cb9e819c0..dd8eb88cdb7 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1590,6 +1590,4 @@ class TestDagFileProcessorManager:
         # Verify Stats.initialize was called with the expected configuration 
parameters
         stats_init_mock.assert_called_once()
         call_kwargs = stats_init_mock.call_args.kwargs
-        assert "is_statsd_datadog_enabled" in call_kwargs
-        assert "is_statsd_on" in call_kwargs
-        assert "is_otel_on" in call_kwargs
+        assert "factory" in call_kwargs
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 3dc6dfc9f2e..7ba89562ad0 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1233,9 +1233,7 @@ class TestTriggererJobRunner:
         # Verify Stats.initialize was called with the expected configuration 
parameters
         stats_init_mock.assert_called_once()
         call_kwargs = stats_init_mock.call_args.kwargs
-        assert "is_statsd_datadog_enabled" in call_kwargs
-        assert "is_statsd_on" in call_kwargs
-        assert "is_otel_on" in call_kwargs
+        assert "factory" in call_kwargs
 
 
 class TestTriggererMessageTypes:
diff --git a/airflow-core/tests/unit/observability/metrics/__init__.py 
b/airflow-core/tests/unit/observability/metrics/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow-core/tests/unit/observability/metrics/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow-core/tests/unit/observability/metrics/test_stats.py 
b/airflow-core/tests/unit/observability/metrics/test_stats.py
new file mode 100644
index 00000000000..1c919acae63
--- /dev/null
+++ b/airflow-core/tests/unit/observability/metrics/test_stats.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib
+import re
+
+import pytest
+
+import airflow
+import airflow.observability.stats
+from airflow.observability.metrics import stats_utils
+
+from tests_common.test_utils.config import conf_vars
+
+
+class InvalidCustomStatsd:
+    """
+    This custom Statsd class is invalid because it does not subclass
+    statsd.StatsClient.
+    """
+
+    def __init__(self, host=None, port=None, prefix=None):
+        pass
+
+
+class TestStats:
+    def test_load_invalid_custom_stats_client(self):
+        with conf_vars(
+            {
+                ("metrics", "statsd_on"): "True",
+                ("metrics", "statsd_custom_client_path"): 
f"{__name__}.InvalidCustomStatsd",
+            }
+        ):
+            importlib.reload(airflow._shared.observability.metrics.stats)
+            factory = 
stats_utils.get_stats_factory(airflow.observability.stats.Stats)
+            airflow.observability.stats.Stats.initialize(factory=factory)
+            error_message = re.escape(
+                "Your custom StatsD client must extend the statsd."
+                "StatsClient in order to ensure backwards compatibility."
+            )
+            # we assert for Exception here instead of AirflowConfigException 
to not import from shared configuration
+            with pytest.raises(Exception, match=error_message):
+                airflow.observability.stats.Stats.incr("empty_key")
+        importlib.reload(airflow._shared.observability.metrics.stats)
+
+
+class TestDogStats:
+    def setup_method(self):
+        pytest.importorskip("datadog")
+
+    def test_enabled_by_config(self):
+        """Test that enabling this sets the right instance properties"""
+        from datadog import DogStatsd
+
+        with conf_vars(
+            {
+                ("metrics", "statsd_datadog_enabled"): "True",
+            }
+        ):
+            importlib.reload(airflow.observability.stats)
+            factory = 
stats_utils.get_stats_factory(airflow.observability.stats.Stats)
+            airflow.observability.stats.Stats.initialize(factory=factory)
+            assert isinstance(airflow.observability.stats.Stats.dogstatsd, 
DogStatsd)
+            assert not hasattr(airflow.observability.stats.Stats, "statsd")
+        # Avoid side-effects
+        importlib.reload(airflow.observability.stats)
+
+    def 
test_does_not_send_stats_using_statsd_when_statsd_and_dogstatsd_both_on(self):
+        from datadog import DogStatsd
+
+        with conf_vars(
+            {
+                ("metrics", "statsd_on"): "True",
+                ("metrics", "statsd_datadog_enabled"): "True",
+            }
+        ):
+            importlib.reload(airflow.observability.stats)
+            factory = 
stats_utils.get_stats_factory(airflow.observability.stats.Stats)
+            airflow.observability.stats.Stats.initialize(factory=factory)
+            assert isinstance(airflow.observability.stats.Stats.dogstatsd, 
DogStatsd)
+            assert not hasattr(airflow.observability.stats.Stats, "statsd")
+        importlib.reload(airflow.observability.stats)
diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py
 
b/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py
index e15e2528e31..19aef0007d3 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+from collections.abc import Callable
 from typing import TYPE_CHECKING, Any, Protocol
 
 from .protocols import Timer
@@ -34,10 +35,7 @@ class StatsLogger(Protocol):
     def initialize(
         cls,
         *,
-        is_statsd_datadog_enabled: bool,
-        is_statsd_on: bool,
-        is_otel_on: bool,
-        reset_instance: bool = True,
+        factory: Callable,
     ) -> None:
         """Initialize the StatsLogger instance."""
 
@@ -98,10 +96,7 @@ class NoStatsLogger:
     def initialize(
         cls,
         *,
-        is_statsd_datadog_enabled: bool,
-        is_statsd_on: bool,
-        is_otel_on: bool,
-        reset_instance: bool = True,
+        factory: Callable,
     ) -> None:
         """Initialize the NoStatsLogger instance."""
 
diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/stats.py 
b/shared/observability/src/airflow_shared/observability/metrics/stats.py
index 8055f8ed95d..be153c298d6 100644
--- a/shared/observability/src/airflow_shared/observability/metrics/stats.py
+++ b/shared/observability/src/airflow_shared/observability/metrics/stats.py
@@ -112,27 +112,8 @@ class _Stats(type):
 
         return getattr(instance, name)
 
-    def initialize(cls, *, is_statsd_datadog_enabled: bool, is_statsd_on: 
bool, is_otel_on: bool) -> None:
-        type.__setattr__(cls, "factory", None)
+    def initialize(cls, *, factory: Callable) -> None:
         type.__setattr__(cls, "instance", None)
-        factory: Callable
-
-        if is_statsd_datadog_enabled:
-            from airflow.observability.metrics import datadog_logger
-
-            # Datadog needs the cls param, so wrap it into a 0-arg factory.
-            factory = lambda: datadog_logger.get_dogstatsd_logger(cls)
-        elif is_statsd_on:
-            from airflow.observability.metrics import statsd_logger
-
-            factory = statsd_logger.get_statsd_logger
-        elif is_otel_on:
-            from airflow.observability.metrics import otel_logger
-
-            factory = otel_logger.get_otel_logger
-        else:
-            factory = NoStatsLogger
-
         type.__setattr__(cls, "factory", factory)
 
     @classmethod
diff --git a/shared/observability/tests/observability/metrics/test_stats.py 
b/shared/observability/tests/observability/metrics/test_stats.py
index 98f5ef24d54..173370f8f7d 100644
--- a/shared/observability/tests/observability/metrics/test_stats.py
+++ b/shared/observability/tests/observability/metrics/test_stats.py
@@ -19,8 +19,8 @@ from __future__ import annotations
 
 import importlib
 import logging
-import re
 import time
+from collections.abc import Callable
 from unittest import mock
 from unittest.mock import Mock
 
@@ -31,6 +31,7 @@ import airflow_shared
 import airflow_shared.observability.metrics.stats
 import airflow_shared.observability.metrics.validators
 from airflow_shared.observability.exceptions import InvalidStatsNameException
+from airflow_shared.observability.metrics import datadog_logger, statsd_logger
 from airflow_shared.observability.metrics.datadog_logger import 
SafeDogStatsdLogger
 from airflow_shared.observability.metrics.statsd_logger import SafeStatsdLogger
 from airflow_shared.observability.metrics.validators import (
@@ -46,14 +47,21 @@ class CustomStatsd(statsd.StatsClient):
     pass
 
 
-class InvalidCustomStatsd:
-    """
-    This custom Statsd class is invalid because it does not subclass
-    statsd.StatsClient.
-    """
-
-    def __init__(self, host=None, port=None, prefix=None):
-        pass
+def get_statsd_logger_factory(
+    stats_class,
+    metrics_allow_list: str | None = None,
+    metrics_block_list: str | None = None,
+    stat_name_handler: Callable[[str], str] | None = None,
+):
+    return lambda: statsd_logger.get_statsd_logger(
+        stats_class=stats_class,
+        host="localhost",
+        port="1234",
+        prefix="airflow",
+        metrics_allow_list=metrics_allow_list,
+        metrics_block_list=metrics_block_list,
+        stat_name_handler=stat_name_handler,
+    )
 
 
 class TestStats:
@@ -102,127 +110,79 @@ class TestStats:
 
     def test_enabled_by_config(self):
         """Test that enabling this sets the right instance properties"""
-        with conf_vars({("metrics", "statsd_on"): "True"}):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
statsd.StatsClient)
-            assert not 
hasattr(airflow_shared.observability.metrics.stats.Stats, "dogstatsd")
+        importlib.reload(airflow_shared.observability.metrics.stats)
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=get_statsd_logger_factory(stats_class=statsd.StatsClient)
+        )
+        assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
statsd.StatsClient)
+        assert not hasattr(airflow_shared.observability.metrics.stats.Stats, 
"dogstatsd")
         # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
     def test_load_custom_statsd_client(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "statsd_custom_client_path"): 
f"{__name__}.CustomStatsd",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
CustomStatsd)
-        # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
-
-    def test_load_invalid_custom_stats_client(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "statsd_custom_client_path"): 
f"{__name__}.InvalidCustomStatsd",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            error_message = re.escape(
-                "Your custom StatsD client must extend the statsd."
-                "StatsClient in order to ensure backwards compatibility."
-            )
-            # we assert for Exception here instead of AirflowConfigException 
to not import from shared configuration
-            with pytest.raises(Exception, match=error_message):
-                
airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=get_statsd_logger_factory(stats_class=CustomStatsd)
+        )
+        assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
CustomStatsd)
+        # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
     def test_load_allow_list_validator(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "metrics_allow_list"): "name1,name2",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert isinstance(
-                
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
-                PatternAllowListValidator,
-            )
-            assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
-                "name1",
-                "name2",
+        importlib.reload(airflow_shared.observability.metrics.stats)
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=get_statsd_logger_factory(
+                stats_class=statsd.StatsClient,
+                metrics_allow_list="name1,name2",
             )
+        )
+        assert isinstance(
+            airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+            PatternAllowListValidator,
+        )
+        assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
+            "name1",
+            "name2",
+        )
         # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
     def test_load_block_list_validator(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "metrics_block_list"): "name1,name2",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert isinstance(
-                
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
-                PatternBlockListValidator,
-            )
-            assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
-                "name1",
-                "name2",
+        importlib.reload(airflow_shared.observability.metrics.stats)
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=get_statsd_logger_factory(
+                stats_class=statsd.StatsClient,
+                metrics_block_list="name1,name2",
             )
+        )
+        assert isinstance(
+            airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+            PatternBlockListValidator,
+        )
+        assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
+            "name1",
+            "name2",
+        )
         # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
     def 
test_load_allow_and_block_list_validator_loads_only_allow_list_validator(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "metrics_allow_list"): "name1,name2",
-                ("metrics", "metrics_block_list"): "name1,name2",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert isinstance(
-                
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
-                PatternAllowListValidator,
-            )
-            assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
-                "name1",
-                "name2",
+        importlib.reload(airflow_shared.observability.metrics.stats)
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=get_statsd_logger_factory(
+                stats_class=statsd.StatsClient,
+                metrics_allow_list="name1,name2",
+                metrics_block_list="name1,name2",
             )
+        )
+        assert isinstance(
+            airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+            PatternAllowListValidator,
+        )
+        assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
+            "name1",
+            "name2",
+        )
         # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
@@ -305,41 +265,6 @@ class TestDogStats:
             metric="empty", sample_rate=1, value=1, tags=[]
         )
 
-    def test_enabled_by_config(self):
-        """Test that enabling this sets the right instance properties"""
-        from datadog import DogStatsd
-
-        with conf_vars({("metrics", "statsd_datadog_enabled"): "True"}):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=True,
-                is_statsd_on=False,
-                is_otel_on=False,
-            )
-            assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.dogstatsd, 
DogStatsd)
-            assert not 
hasattr(airflow_shared.observability.metrics.stats.Stats, "statsd")
-        # Avoid side-effects
-        importlib.reload(airflow_shared.observability.metrics.stats)
-
-    def 
test_does_not_send_stats_using_statsd_when_statsd_and_dogstatsd_both_on(self):
-        from datadog import DogStatsd
-
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "statsd_datadog_enabled"): "True",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=True,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.dogstatsd, 
DogStatsd)
-            assert not 
hasattr(airflow_shared.observability.metrics.stats.Stats, "statsd")
-        importlib.reload(airflow_shared.observability.metrics.stats)
-
 
 class TestStatsAllowAndBlockLists:
     @pytest.mark.parametrize(
@@ -399,57 +324,59 @@ class TestPatternValidatorConfigOption:
         # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
-    stats_on = {("metrics", "statsd_on"): "True"}
-    allow_list = {("metrics", "metrics_allow_list"): "foo,bar"}
-    block_list = {("metrics", "metrics_block_list"): "foo,bar"}
-
     @pytest.mark.parametrize(
-        ("config", "expected"),
+        ("allow_list", "block_list", "expected"),
         [
             pytest.param(
-                {**stats_on},
+                None,
+                None,
                 PatternAllowListValidator,
                 id="pattern_allow_by_default",
             ),
             pytest.param(
-                {**stats_on, **allow_list},
+                "foo,bar",
+                None,
                 PatternAllowListValidator,
                 id="pattern_allow_list_provided",
             ),
             pytest.param(
-                {**stats_on, **block_list},
+                None,
+                "foo,bar",
                 PatternBlockListValidator,
                 id="pattern_block_list_provided",
             ),
             pytest.param(
-                {**stats_on, **allow_list, **block_list},
+                "foo,bar",
+                "foo,bar",
                 PatternAllowListValidator,
-                id="pattern_block_list_provided",
+                id="pattern_both_lists_provided",
             ),
         ],
     )
-    def test_pattern_picker(self, config, expected):
-        with conf_vars(config):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
+    def test_pattern_picker(self, allow_list, block_list, expected):
+        importlib.reload(airflow_shared.observability.metrics.stats)
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=get_statsd_logger_factory(
+                stats_class=statsd.StatsClient,
+                metrics_allow_list=allow_list,
+                metrics_block_list=block_list,
             )
+        )
 
-            assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
statsd.StatsClient)
-            assert isinstance(
-                
airflow_shared.observability.metrics.stats.Stats.instance.metrics_validator, 
expected
-            )
+        assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
statsd.StatsClient)
+        assert isinstance(
+            
airflow_shared.observability.metrics.stats.Stats.instance.metrics_validator, 
expected
+        )
 
-    @conf_vars({**stats_on, **block_list, ("metrics", "metrics_allow_list"): 
"baz,qux"})
     def test_setting_allow_and_block_logs_warning(self, caplog):
         with caplog.at_level(logging.WARNING):
             importlib.reload(airflow_shared.observability.metrics.stats)
             airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
+                factory=get_statsd_logger_factory(
+                    stats_class=statsd.StatsClient,
+                    metrics_allow_list="baz,qux",
+                    metrics_block_list="foo,bar",
+                )
             )
 
             assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
statsd.StatsClient)
@@ -598,43 +525,30 @@ class TestCustomStatsName:
         airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
         mock_dogstatsd.return_value.assert_not_called()
 
-    @conf_vars(
-        {
-            ("metrics", "statsd_on"): "True",
-            (
-                "metrics",
-                "stat_name_handler",
-            ): "observability.metrics.test_stats.always_valid",
-        }
-    )
     @mock.patch("statsd.StatsClient")
     def test_does_send_stats_using_statsd_when_the_name_is_valid(self, 
mock_statsd):
         importlib.reload(airflow_shared.observability.metrics.stats)
         airflow_shared.observability.metrics.stats.Stats.initialize(
-            is_statsd_datadog_enabled=False,
-            is_statsd_on=True,
-            is_otel_on=False,
+            factory=get_statsd_logger_factory(
+                stats_class=mock_statsd,
+                stat_name_handler=always_valid,
+            )
         )
         airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
         mock_statsd.return_value.incr.assert_called_once_with("empty_key", 1, 
1)
 
     @skip_if_force_lowest_dependencies_marker
-    @conf_vars(
-        {
-            ("metrics", "statsd_datadog_enabled"): "True",
-            (
-                "metrics",
-                "stat_name_handler",
-            ): "observability.metrics.test_stats.always_valid",
-        }
-    )
     @mock.patch("datadog.DogStatsd")
     def test_does_send_stats_using_dogstatsd_when_the_name_is_valid(self, 
mock_dogstatsd):
         importlib.reload(airflow_shared.observability.metrics.stats)
         airflow_shared.observability.metrics.stats.Stats.initialize(
-            is_statsd_datadog_enabled=True,
-            is_statsd_on=False,
-            is_otel_on=False,
+            factory=lambda: datadog_logger.get_dogstatsd_logger(
+                mock_dogstatsd,
+                host="localhost",
+                port="1234",
+                namespace="airflow",
+                stat_name_handler=always_valid,
+            )
         )
 
         airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index dbf8e7f12ec..7650bd56f27 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -120,6 +120,7 @@ from airflow.sdk.execution_time.context import (
 from airflow.sdk.execution_time.sentry import Sentry
 from airflow.sdk.execution_time.xcom import XCom
 from airflow.sdk.listener import get_listener_manager
+from airflow.sdk.observability.metrics import stats_utils
 from airflow.sdk.timezone import coerce_datetime
 
 if TYPE_CHECKING:
@@ -1768,11 +1769,8 @@ def main():
     global SUPERVISOR_COMMS
     SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log)
 
-    Stats.initialize(
-        is_statsd_datadog_enabled=conf.getboolean("metrics", 
"statsd_datadog_enabled"),
-        is_statsd_on=conf.getboolean("metrics", "statsd_on"),
-        is_otel_on=conf.getboolean("metrics", "otel_on"),
-    )
+    stats_factory = stats_utils.get_stats_factory(Stats)
+    Stats.initialize(factory=stats_factory)
 
     try:
         try:
diff --git a/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py 
b/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
index aa3589ec9d5..ebf64d701b3 100644
--- a/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
+++ b/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
@@ -25,7 +25,7 @@ if TYPE_CHECKING:
     from airflow.sdk._shared.observability.metrics.otel_logger import 
SafeOtelLogger
 
 
-def get_otel_logger(cls) -> SafeOtelLogger:
+def get_otel_logger() -> SafeOtelLogger:
     # The config values have been deprecated and therefore,
     # if the user hasn't added them to the config, the default values won't be 
used.
     # A fallback is needed to avoid an exception.
diff --git a/task-sdk/src/airflow/sdk/observability/metrics/stats_utils.py 
b/task-sdk/src/airflow/sdk/observability/metrics/stats_utils.py
new file mode 100644
index 00000000000..2d814e20b3e
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/observability/metrics/stats_utils.py
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from collections.abc import Callable
+
+from airflow.sdk._shared.observability.metrics.base_stats_logger import 
NoStatsLogger
+from airflow.sdk.configuration import conf
+
+
+def get_stats_factory(stats_cls) -> Callable:
+    if conf.getboolean("metrics", "statsd_datadog_enabled"):
+        from airflow.sdk.observability.metrics import datadog_logger
+
+        # Datadog needs the 'stats_cls' param, so wrap it into a 0-arg factory.
+        return lambda: datadog_logger.get_dogstatsd_logger(stats_cls)
+    if conf.getboolean("metrics", "statsd_on"):
+        from airflow.sdk.observability.metrics import statsd_logger
+
+        return statsd_logger.get_statsd_logger
+    if conf.getboolean("metrics", "otel_on"):
+        from airflow.sdk.observability.metrics import otel_logger
+
+        return otel_logger.get_otel_logger
+    return NoStatsLogger
diff --git a/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py 
b/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py
index 7bfe413cfba..de9af0e94f1 100644
--- a/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py
+++ b/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py
@@ -29,7 +29,7 @@ if TYPE_CHECKING:
 log = logging.getLogger(__name__)
 
 
-def get_statsd_logger(cls) -> SafeStatsdLogger:
+def get_statsd_logger() -> SafeStatsdLogger:
     stats_class = conf.getimport("metrics", "statsd_custom_client_path", 
fallback=None)
 
     # no need to check for the scheduler/statsd_on -> this method is only 
called when it is set
diff --git a/task-sdk/src/airflow/sdk/serde/__init__.py 
b/task-sdk/src/airflow/sdk/serde/__init__.py
index 9c5ebaa4f69..c9536fd174c 100644
--- a/task-sdk/src/airflow/sdk/serde/__init__.py
+++ b/task-sdk/src/airflow/sdk/serde/__init__.py
@@ -32,6 +32,7 @@ import attr
 from airflow.sdk._shared.module_loading import import_string, iter_namespace, 
qualname
 from airflow.sdk._shared.observability.metrics.stats import Stats
 from airflow.sdk.configuration import conf
+from airflow.sdk.observability.metrics import stats_utils
 from airflow.sdk.serde.typing import is_pydantic_model
 
 if TYPE_CHECKING:
@@ -370,11 +371,8 @@ def _register():
     _deserializers.clear()
     _stringifiers.clear()
 
-    Stats.initialize(
-        is_statsd_datadog_enabled=conf.getboolean("metrics", 
"statsd_datadog_enabled"),
-        is_statsd_on=conf.getboolean("metrics", "statsd_on"),
-        is_otel_on=conf.getboolean("metrics", "otel_on"),
-    )
+    stats_factory = stats_utils.get_stats_factory(Stats)
+    Stats.initialize(factory=stats_factory)
 
     with Stats.timer("serde.load_serializers") as timer:
         serializers_module = import_module("airflow.sdk.serde.serializers")


Reply via email to