This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0d294caab3d Refactor imports from 'airflow-core' out of 'shared/stats'
(#62127)
0d294caab3d is described below
commit 0d294caab3d494eeff6184b09e8e2dda6ffc47ed
Author: Christos Bisias <[email protected]>
AuthorDate: Fri Feb 20 02:18:46 2026 +0200
Refactor imports from 'airflow-core' out of 'shared/stats' (#62127)
* refactor core imports out of stats
* remove unused param
* skip unit tests if datadog isn't available
* fix import in task_runner
* add a test helper for the statsd_logger factory
---
airflow-core/src/airflow/dag_processing/manager.py | 8 +-
.../src/airflow/executors/base_executor.py | 8 +-
.../src/airflow/jobs/scheduler_job_runner.py | 8 +-
.../src/airflow/jobs/triggerer_job_runner.py | 8 +-
.../airflow/observability/metrics/stats_utils.py | 40 +++
.../tests/unit/dag_processing/test_manager.py | 4 +-
airflow-core/tests/unit/jobs/test_triggerer_job.py | 4 +-
.../tests/unit/observability/metrics/__init__.py | 16 ++
.../tests/unit/observability/metrics/test_stats.py | 97 +++++++
.../observability/metrics/base_stats_logger.py | 11 +-
.../airflow_shared/observability/metrics/stats.py | 21 +-
.../tests/observability/metrics/test_stats.py | 304 ++++++++-------------
.../src/airflow/sdk/execution_time/task_runner.py | 8 +-
.../sdk/observability/metrics/otel_logger.py | 2 +-
.../sdk/observability/metrics/stats_utils.py | 40 +++
.../sdk/observability/metrics/statsd_logger.py | 2 +-
task-sdk/src/airflow/sdk/serde/__init__.py | 8 +-
17 files changed, 328 insertions(+), 261 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 657595f58d7..c0dff728806 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -61,6 +61,7 @@ from airflow.models.dagbundle import DagBundleModel
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
+from airflow.observability.metrics import stats_utils
from airflow.observability.trace import DebugTrace
from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
@@ -271,11 +272,8 @@ class DagFileProcessorManager(LoggingMixin):
# Related: https://github.com/apache/airflow/pull/57459
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
- Stats.initialize(
- is_statsd_datadog_enabled=conf.getboolean("metrics",
"statsd_datadog_enabled"),
- is_statsd_on=conf.getboolean("metrics", "statsd_on"),
- is_otel_on=conf.getboolean("metrics", "otel_on"),
- )
+ stats_factory = stats_utils.get_stats_factory(Stats)
+ Stats.initialize(factory=stats_factory)
self.register_exit_signals()
diff --git a/airflow-core/src/airflow/executors/base_executor.py
b/airflow-core/src/airflow/executors/base_executor.py
index 927e4c07d64..a57790744b7 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -34,6 +34,7 @@ from airflow.configuration import conf
from airflow.executors import workloads
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Log
+from airflow.observability.metrics import stats_utils
from airflow.observability.trace import DebugTrace, Trace, add_debug_span
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
@@ -175,11 +176,8 @@ class BaseExecutor(LoggingMixin):
return generator
def __init__(self, parallelism: int = PARALLELISM, team_name: str | None =
None):
- Stats.initialize(
- is_statsd_datadog_enabled=conf.getboolean("metrics",
"statsd_datadog_enabled"),
- is_statsd_on=conf.getboolean("metrics", "statsd_on"),
- is_otel_on=conf.getboolean("metrics", "otel_on"),
- )
+ stats_factory = stats_utils.get_stats_factory(Stats)
+ Stats.initialize(factory=stats_factory)
super().__init__()
# Ensure we set this now, so that each subprocess gets the same value
from airflow.api_fastapi.auth.tokens import get_signing_args
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index d592b551d95..9b77ce4592c 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -97,6 +97,7 @@ from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.models.team import Team
from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger,
TriggerFailureReason
+from airflow.observability.metrics import stats_utils
from airflow.observability.trace import DebugTrace, Trace, add_debug_span
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
from airflow.serialization.definitions.notset import NOTSET
@@ -1322,11 +1323,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
DagRun.set_active_spans(active_spans=self.active_spans)
BaseExecutor.set_active_spans(active_spans=self.active_spans)
- Stats.initialize(
- is_statsd_datadog_enabled=conf.getboolean("metrics",
"statsd_datadog_enabled"),
- is_statsd_on=conf.getboolean("metrics", "statsd_on"),
- is_otel_on=conf.getboolean("metrics", "otel_on"),
- )
+ stats_factory = stats_utils.get_stats_factory(Stats)
+ Stats.initialize(factory=stats_factory)
self._run_scheduler_loop()
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index c5ac4e41f6e..ba083b8fad3 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -48,6 +48,7 @@ from airflow.executors import workloads
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import perform_heartbeat
from airflow.models.trigger import Trigger
+from airflow.observability.metrics import stats_utils
from airflow.observability.trace import DebugTrace, Trace, add_debug_span
from airflow.sdk.api.datamodels._generated import HITLDetailResponse
from airflow.sdk.execution_time.comms import (
@@ -167,11 +168,8 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
def _execute(self) -> int | None:
self.log.info("Starting the triggerer")
self.register_signals()
- Stats.initialize(
- is_statsd_datadog_enabled=conf.getboolean("metrics",
"statsd_datadog_enabled"),
- is_statsd_on=conf.getboolean("metrics", "statsd_on"),
- is_otel_on=conf.getboolean("metrics", "otel_on"),
- )
+ stats_factory = stats_utils.get_stats_factory(Stats)
+ Stats.initialize(factory=stats_factory)
try:
# Kick off runner sub-process without DB access
self.trigger_runner = TriggerRunnerSupervisor.start(
diff --git a/airflow-core/src/airflow/observability/metrics/stats_utils.py
b/airflow-core/src/airflow/observability/metrics/stats_utils.py
new file mode 100644
index 00000000000..a474de333fd
--- /dev/null
+++ b/airflow-core/src/airflow/observability/metrics/stats_utils.py
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from collections.abc import Callable
+
+from airflow._shared.observability.metrics.base_stats_logger import
NoStatsLogger
+from airflow.configuration import conf
+
+
+def get_stats_factory(stats_cls) -> Callable:
+ if conf.getboolean("metrics", "statsd_datadog_enabled"):
+ from airflow.observability.metrics import datadog_logger
+
+ # Datadog needs the 'stats_cls' param, so wrap it into a 0-arg factory.
+ return lambda: datadog_logger.get_dogstatsd_logger(stats_cls)
+ if conf.getboolean("metrics", "statsd_on"):
+ from airflow.observability.metrics import statsd_logger
+
+ return statsd_logger.get_statsd_logger
+ if conf.getboolean("metrics", "otel_on"):
+ from airflow.observability.metrics import otel_logger
+
+ return otel_logger.get_otel_logger
+ return NoStatsLogger
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index d6cb9e819c0..dd8eb88cdb7 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1590,6 +1590,4 @@ class TestDagFileProcessorManager:
# Verify Stats.initialize was called with the expected configuration
parameters
stats_init_mock.assert_called_once()
call_kwargs = stats_init_mock.call_args.kwargs
- assert "is_statsd_datadog_enabled" in call_kwargs
- assert "is_statsd_on" in call_kwargs
- assert "is_otel_on" in call_kwargs
+ assert "factory" in call_kwargs
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 3dc6dfc9f2e..7ba89562ad0 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1233,9 +1233,7 @@ class TestTriggererJobRunner:
# Verify Stats.initialize was called with the expected configuration
parameters
stats_init_mock.assert_called_once()
call_kwargs = stats_init_mock.call_args.kwargs
- assert "is_statsd_datadog_enabled" in call_kwargs
- assert "is_statsd_on" in call_kwargs
- assert "is_otel_on" in call_kwargs
+ assert "factory" in call_kwargs
class TestTriggererMessageTypes:
diff --git a/airflow-core/tests/unit/observability/metrics/__init__.py
b/airflow-core/tests/unit/observability/metrics/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow-core/tests/unit/observability/metrics/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow-core/tests/unit/observability/metrics/test_stats.py
b/airflow-core/tests/unit/observability/metrics/test_stats.py
new file mode 100644
index 00000000000..1c919acae63
--- /dev/null
+++ b/airflow-core/tests/unit/observability/metrics/test_stats.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib
+import re
+
+import pytest
+
+import airflow
+import airflow.observability.stats
+from airflow.observability.metrics import stats_utils
+
+from tests_common.test_utils.config import conf_vars
+
+
+class InvalidCustomStatsd:
+ """
+ This custom Statsd class is invalid because it does not subclass
+ statsd.StatsClient.
+ """
+
+ def __init__(self, host=None, port=None, prefix=None):
+ pass
+
+
+class TestStats:
+ def test_load_invalid_custom_stats_client(self):
+ with conf_vars(
+ {
+ ("metrics", "statsd_on"): "True",
+ ("metrics", "statsd_custom_client_path"):
f"{__name__}.InvalidCustomStatsd",
+ }
+ ):
+ importlib.reload(airflow._shared.observability.metrics.stats)
+ factory =
stats_utils.get_stats_factory(airflow.observability.stats.Stats)
+ airflow.observability.stats.Stats.initialize(factory=factory)
+ error_message = re.escape(
+ "Your custom StatsD client must extend the statsd."
+ "StatsClient in order to ensure backwards compatibility."
+ )
+ # we assert for Exception here instead of AirflowConfigException
to not import from shared configuration
+ with pytest.raises(Exception, match=error_message):
+ airflow.observability.stats.Stats.incr("empty_key")
+ importlib.reload(airflow._shared.observability.metrics.stats)
+
+
+class TestDogStats:
+ def setup_method(self):
+ pytest.importorskip("datadog")
+
+ def test_enabled_by_config(self):
+ """Test that enabling this sets the right instance properties"""
+ from datadog import DogStatsd
+
+ with conf_vars(
+ {
+ ("metrics", "statsd_datadog_enabled"): "True",
+ }
+ ):
+ importlib.reload(airflow.observability.stats)
+ factory =
stats_utils.get_stats_factory(airflow.observability.stats.Stats)
+ airflow.observability.stats.Stats.initialize(factory=factory)
+ assert isinstance(airflow.observability.stats.Stats.dogstatsd,
DogStatsd)
+ assert not hasattr(airflow.observability.stats.Stats, "statsd")
+ # Avoid side-effects
+ importlib.reload(airflow.observability.stats)
+
+ def
test_does_not_send_stats_using_statsd_when_statsd_and_dogstatsd_both_on(self):
+ from datadog import DogStatsd
+
+ with conf_vars(
+ {
+ ("metrics", "statsd_on"): "True",
+ ("metrics", "statsd_datadog_enabled"): "True",
+ }
+ ):
+ importlib.reload(airflow.observability.stats)
+ factory =
stats_utils.get_stats_factory(airflow.observability.stats.Stats)
+ airflow.observability.stats.Stats.initialize(factory=factory)
+ assert isinstance(airflow.observability.stats.Stats.dogstatsd,
DogStatsd)
+ assert not hasattr(airflow.observability.stats.Stats, "statsd")
+ importlib.reload(airflow.observability.stats)
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py
b/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py
index e15e2528e31..19aef0007d3 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py
+++
b/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Protocol
from .protocols import Timer
@@ -34,10 +35,7 @@ class StatsLogger(Protocol):
def initialize(
cls,
*,
- is_statsd_datadog_enabled: bool,
- is_statsd_on: bool,
- is_otel_on: bool,
- reset_instance: bool = True,
+ factory: Callable,
) -> None:
"""Initialize the StatsLogger instance."""
@@ -98,10 +96,7 @@ class NoStatsLogger:
def initialize(
cls,
*,
- is_statsd_datadog_enabled: bool,
- is_statsd_on: bool,
- is_otel_on: bool,
- reset_instance: bool = True,
+ factory: Callable,
) -> None:
"""Initialize the NoStatsLogger instance."""
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/stats.py
b/shared/observability/src/airflow_shared/observability/metrics/stats.py
index 8055f8ed95d..be153c298d6 100644
--- a/shared/observability/src/airflow_shared/observability/metrics/stats.py
+++ b/shared/observability/src/airflow_shared/observability/metrics/stats.py
@@ -112,27 +112,8 @@ class _Stats(type):
return getattr(instance, name)
- def initialize(cls, *, is_statsd_datadog_enabled: bool, is_statsd_on:
bool, is_otel_on: bool) -> None:
- type.__setattr__(cls, "factory", None)
+ def initialize(cls, *, factory: Callable) -> None:
type.__setattr__(cls, "instance", None)
- factory: Callable
-
- if is_statsd_datadog_enabled:
- from airflow.observability.metrics import datadog_logger
-
- # Datadog needs the cls param, so wrap it into a 0-arg factory.
- factory = lambda: datadog_logger.get_dogstatsd_logger(cls)
- elif is_statsd_on:
- from airflow.observability.metrics import statsd_logger
-
- factory = statsd_logger.get_statsd_logger
- elif is_otel_on:
- from airflow.observability.metrics import otel_logger
-
- factory = otel_logger.get_otel_logger
- else:
- factory = NoStatsLogger
-
type.__setattr__(cls, "factory", factory)
@classmethod
diff --git a/shared/observability/tests/observability/metrics/test_stats.py
b/shared/observability/tests/observability/metrics/test_stats.py
index 98f5ef24d54..173370f8f7d 100644
--- a/shared/observability/tests/observability/metrics/test_stats.py
+++ b/shared/observability/tests/observability/metrics/test_stats.py
@@ -19,8 +19,8 @@ from __future__ import annotations
import importlib
import logging
-import re
import time
+from collections.abc import Callable
from unittest import mock
from unittest.mock import Mock
@@ -31,6 +31,7 @@ import airflow_shared
import airflow_shared.observability.metrics.stats
import airflow_shared.observability.metrics.validators
from airflow_shared.observability.exceptions import InvalidStatsNameException
+from airflow_shared.observability.metrics import datadog_logger, statsd_logger
from airflow_shared.observability.metrics.datadog_logger import
SafeDogStatsdLogger
from airflow_shared.observability.metrics.statsd_logger import SafeStatsdLogger
from airflow_shared.observability.metrics.validators import (
@@ -46,14 +47,21 @@ class CustomStatsd(statsd.StatsClient):
pass
-class InvalidCustomStatsd:
- """
- This custom Statsd class is invalid because it does not subclass
- statsd.StatsClient.
- """
-
- def __init__(self, host=None, port=None, prefix=None):
- pass
+def get_statsd_logger_factory(
+ stats_class,
+ metrics_allow_list: str | None = None,
+ metrics_block_list: str | None = None,
+ stat_name_handler: Callable[[str], str] | None = None,
+):
+ return lambda: statsd_logger.get_statsd_logger(
+ stats_class=stats_class,
+ host="localhost",
+ port="1234",
+ prefix="airflow",
+ metrics_allow_list=metrics_allow_list,
+ metrics_block_list=metrics_block_list,
+ stat_name_handler=stat_name_handler,
+ )
class TestStats:
@@ -102,127 +110,79 @@ class TestStats:
def test_enabled_by_config(self):
"""Test that enabling this sets the right instance properties"""
- with conf_vars({("metrics", "statsd_on"): "True"}):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
statsd.StatsClient)
- assert not
hasattr(airflow_shared.observability.metrics.stats.Stats, "dogstatsd")
+ importlib.reload(airflow_shared.observability.metrics.stats)
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=get_statsd_logger_factory(stats_class=statsd.StatsClient)
+ )
+ assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
statsd.StatsClient)
+ assert not hasattr(airflow_shared.observability.metrics.stats.Stats,
"dogstatsd")
# Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
def test_load_custom_statsd_client(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "statsd_custom_client_path"):
f"{__name__}.CustomStatsd",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
CustomStatsd)
- # Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
-
- def test_load_invalid_custom_stats_client(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "statsd_custom_client_path"):
f"{__name__}.InvalidCustomStatsd",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- error_message = re.escape(
- "Your custom StatsD client must extend the statsd."
- "StatsClient in order to ensure backwards compatibility."
- )
- # we assert for Exception here instead of AirflowConfigException
to not import from shared configuration
- with pytest.raises(Exception, match=error_message):
-
airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=get_statsd_logger_factory(stats_class=CustomStatsd)
+ )
+ assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
CustomStatsd)
+ # Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
def test_load_allow_list_validator(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "metrics_allow_list"): "name1,name2",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert isinstance(
-
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
- PatternAllowListValidator,
- )
- assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
- "name1",
- "name2",
+ importlib.reload(airflow_shared.observability.metrics.stats)
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=get_statsd_logger_factory(
+ stats_class=statsd.StatsClient,
+ metrics_allow_list="name1,name2",
)
+ )
+ assert isinstance(
+ airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+ PatternAllowListValidator,
+ )
+ assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
+ "name1",
+ "name2",
+ )
# Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
def test_load_block_list_validator(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "metrics_block_list"): "name1,name2",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert isinstance(
-
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
- PatternBlockListValidator,
- )
- assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
- "name1",
- "name2",
+ importlib.reload(airflow_shared.observability.metrics.stats)
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=get_statsd_logger_factory(
+ stats_class=statsd.StatsClient,
+ metrics_block_list="name1,name2",
)
+ )
+ assert isinstance(
+ airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+ PatternBlockListValidator,
+ )
+ assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
+ "name1",
+ "name2",
+ )
# Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
def
test_load_allow_and_block_list_validator_loads_only_allow_list_validator(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "metrics_allow_list"): "name1,name2",
- ("metrics", "metrics_block_list"): "name1,name2",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert isinstance(
-
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
- PatternAllowListValidator,
- )
- assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
- "name1",
- "name2",
+ importlib.reload(airflow_shared.observability.metrics.stats)
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=get_statsd_logger_factory(
+ stats_class=statsd.StatsClient,
+ metrics_allow_list="name1,name2",
+ metrics_block_list="name1,name2",
)
+ )
+ assert isinstance(
+ airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+ PatternAllowListValidator,
+ )
+ assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
+ "name1",
+ "name2",
+ )
# Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
@@ -305,41 +265,6 @@ class TestDogStats:
metric="empty", sample_rate=1, value=1, tags=[]
)
- def test_enabled_by_config(self):
- """Test that enabling this sets the right instance properties"""
- from datadog import DogStatsd
-
- with conf_vars({("metrics", "statsd_datadog_enabled"): "True"}):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=True,
- is_statsd_on=False,
- is_otel_on=False,
- )
- assert
isinstance(airflow_shared.observability.metrics.stats.Stats.dogstatsd,
DogStatsd)
- assert not
hasattr(airflow_shared.observability.metrics.stats.Stats, "statsd")
- # Avoid side-effects
- importlib.reload(airflow_shared.observability.metrics.stats)
-
- def
test_does_not_send_stats_using_statsd_when_statsd_and_dogstatsd_both_on(self):
- from datadog import DogStatsd
-
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "statsd_datadog_enabled"): "True",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=True,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert
isinstance(airflow_shared.observability.metrics.stats.Stats.dogstatsd,
DogStatsd)
- assert not
hasattr(airflow_shared.observability.metrics.stats.Stats, "statsd")
- importlib.reload(airflow_shared.observability.metrics.stats)
-
class TestStatsAllowAndBlockLists:
@pytest.mark.parametrize(
@@ -399,57 +324,59 @@ class TestPatternValidatorConfigOption:
# Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
- stats_on = {("metrics", "statsd_on"): "True"}
- allow_list = {("metrics", "metrics_allow_list"): "foo,bar"}
- block_list = {("metrics", "metrics_block_list"): "foo,bar"}
-
@pytest.mark.parametrize(
- ("config", "expected"),
+ ("allow_list", "block_list", "expected"),
[
pytest.param(
- {**stats_on},
+ None,
+ None,
PatternAllowListValidator,
id="pattern_allow_by_default",
),
pytest.param(
- {**stats_on, **allow_list},
+ "foo,bar",
+ None,
PatternAllowListValidator,
id="pattern_allow_list_provided",
),
pytest.param(
- {**stats_on, **block_list},
+ None,
+ "foo,bar",
PatternBlockListValidator,
id="pattern_block_list_provided",
),
pytest.param(
- {**stats_on, **allow_list, **block_list},
+ "foo,bar",
+ "foo,bar",
PatternAllowListValidator,
- id="pattern_block_list_provided",
+ id="pattern_both_lists_provided",
),
],
)
- def test_pattern_picker(self, config, expected):
- with conf_vars(config):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
+ def test_pattern_picker(self, allow_list, block_list, expected):
+ importlib.reload(airflow_shared.observability.metrics.stats)
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=get_statsd_logger_factory(
+ stats_class=statsd.StatsClient,
+ metrics_allow_list=allow_list,
+ metrics_block_list=block_list,
)
+ )
- assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
statsd.StatsClient)
- assert isinstance(
-
airflow_shared.observability.metrics.stats.Stats.instance.metrics_validator,
expected
- )
+ assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
statsd.StatsClient)
+ assert isinstance(
+
airflow_shared.observability.metrics.stats.Stats.instance.metrics_validator,
expected
+ )
- @conf_vars({**stats_on, **block_list, ("metrics", "metrics_allow_list"):
"baz,qux"})
def test_setting_allow_and_block_logs_warning(self, caplog):
with caplog.at_level(logging.WARNING):
importlib.reload(airflow_shared.observability.metrics.stats)
airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
+ factory=get_statsd_logger_factory(
+ stats_class=statsd.StatsClient,
+ metrics_allow_list="baz,qux",
+ metrics_block_list="foo,bar",
+ )
)
assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
statsd.StatsClient)
@@ -598,43 +525,30 @@ class TestCustomStatsName:
airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
mock_dogstatsd.return_value.assert_not_called()
- @conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- (
- "metrics",
- "stat_name_handler",
- ): "observability.metrics.test_stats.always_valid",
- }
- )
@mock.patch("statsd.StatsClient")
def test_does_send_stats_using_statsd_when_the_name_is_valid(self,
mock_statsd):
importlib.reload(airflow_shared.observability.metrics.stats)
airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
+ factory=get_statsd_logger_factory(
+ stats_class=mock_statsd,
+ stat_name_handler=always_valid,
+ )
)
airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
mock_statsd.return_value.incr.assert_called_once_with("empty_key", 1,
1)
@skip_if_force_lowest_dependencies_marker
- @conf_vars(
- {
- ("metrics", "statsd_datadog_enabled"): "True",
- (
- "metrics",
- "stat_name_handler",
- ): "observability.metrics.test_stats.always_valid",
- }
- )
@mock.patch("datadog.DogStatsd")
def test_does_send_stats_using_dogstatsd_when_the_name_is_valid(self,
mock_dogstatsd):
importlib.reload(airflow_shared.observability.metrics.stats)
airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=True,
- is_statsd_on=False,
- is_otel_on=False,
+ factory=lambda: datadog_logger.get_dogstatsd_logger(
+ mock_dogstatsd,
+ host="localhost",
+ port="1234",
+ namespace="airflow",
+ stat_name_handler=always_valid,
+ )
)
airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index dbf8e7f12ec..7650bd56f27 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -120,6 +120,7 @@ from airflow.sdk.execution_time.context import (
from airflow.sdk.execution_time.sentry import Sentry
from airflow.sdk.execution_time.xcom import XCom
from airflow.sdk.listener import get_listener_manager
+from airflow.sdk.observability.metrics import stats_utils
from airflow.sdk.timezone import coerce_datetime
if TYPE_CHECKING:
@@ -1768,11 +1769,8 @@ def main():
global SUPERVISOR_COMMS
SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log)
- Stats.initialize(
- is_statsd_datadog_enabled=conf.getboolean("metrics",
"statsd_datadog_enabled"),
- is_statsd_on=conf.getboolean("metrics", "statsd_on"),
- is_otel_on=conf.getboolean("metrics", "otel_on"),
- )
+ stats_factory = stats_utils.get_stats_factory(Stats)
+ Stats.initialize(factory=stats_factory)
try:
try:
diff --git a/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
b/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
index aa3589ec9d5..ebf64d701b3 100644
--- a/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
+++ b/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
@@ -25,7 +25,7 @@ if TYPE_CHECKING:
from airflow.sdk._shared.observability.metrics.otel_logger import
SafeOtelLogger
-def get_otel_logger(cls) -> SafeOtelLogger:
+def get_otel_logger() -> SafeOtelLogger:
# The config values have been deprecated and therefore,
# if the user hasn't added them to the config, the default values won't be
used.
# A fallback is needed to avoid an exception.
diff --git a/task-sdk/src/airflow/sdk/observability/metrics/stats_utils.py
b/task-sdk/src/airflow/sdk/observability/metrics/stats_utils.py
new file mode 100644
index 00000000000..2d814e20b3e
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/observability/metrics/stats_utils.py
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from collections.abc import Callable
+
+from airflow.sdk._shared.observability.metrics.base_stats_logger import
NoStatsLogger
+from airflow.sdk.configuration import conf
+
+
+def get_stats_factory(stats_cls) -> Callable:
+ if conf.getboolean("metrics", "statsd_datadog_enabled"):
+ from airflow.sdk.observability.metrics import datadog_logger
+
+ # Datadog needs the 'stats_cls' param, so wrap it into a 0-arg factory.
+ return lambda: datadog_logger.get_dogstatsd_logger(stats_cls)
+ if conf.getboolean("metrics", "statsd_on"):
+ from airflow.sdk.observability.metrics import statsd_logger
+
+ return statsd_logger.get_statsd_logger
+ if conf.getboolean("metrics", "otel_on"):
+ from airflow.sdk.observability.metrics import otel_logger
+
+ return otel_logger.get_otel_logger
+ return NoStatsLogger
diff --git a/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py
b/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py
index 7bfe413cfba..de9af0e94f1 100644
--- a/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py
+++ b/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py
@@ -29,7 +29,7 @@ if TYPE_CHECKING:
log = logging.getLogger(__name__)
-def get_statsd_logger(cls) -> SafeStatsdLogger:
+def get_statsd_logger() -> SafeStatsdLogger:
stats_class = conf.getimport("metrics", "statsd_custom_client_path",
fallback=None)
# no need to check for the scheduler/statsd_on -> this method is only
called when it is set
diff --git a/task-sdk/src/airflow/sdk/serde/__init__.py
b/task-sdk/src/airflow/sdk/serde/__init__.py
index 9c5ebaa4f69..c9536fd174c 100644
--- a/task-sdk/src/airflow/sdk/serde/__init__.py
+++ b/task-sdk/src/airflow/sdk/serde/__init__.py
@@ -32,6 +32,7 @@ import attr
from airflow.sdk._shared.module_loading import import_string, iter_namespace,
qualname
from airflow.sdk._shared.observability.metrics.stats import Stats
from airflow.sdk.configuration import conf
+from airflow.sdk.observability.metrics import stats_utils
from airflow.sdk.serde.typing import is_pydantic_model
if TYPE_CHECKING:
@@ -370,11 +371,8 @@ def _register():
_deserializers.clear()
_stringifiers.clear()
- Stats.initialize(
- is_statsd_datadog_enabled=conf.getboolean("metrics",
"statsd_datadog_enabled"),
- is_statsd_on=conf.getboolean("metrics", "statsd_on"),
- is_otel_on=conf.getboolean("metrics", "otel_on"),
- )
+ stats_factory = stats_utils.get_stats_factory(Stats)
+ Stats.initialize(factory=stats_factory)
with Stats.timer("serde.load_serializers") as timer:
serializers_module = import_module("airflow.sdk.serde.serializers")