This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 08daffe1a9 Align timers and timing metrics (ms) across all metrics
loggers (#39908)
08daffe1a9 is described below
commit 08daffe1a9cef682647689fd70c207da6e2f1e2d
Author: Gopal Dirisala <[email protected]>
AuthorDate: Thu Sep 12 01:16:20 2024 +0530
Align timers and timing metrics (ms) across all metrics loggers (#39908)
---
airflow/config_templates/config.yml | 11 +++++++++++
airflow/metrics/datadog_logger.py | 15 ++++++++++++++-
airflow/metrics/otel_logger.py | 14 +++++++++++++-
airflow/metrics/protocols.py | 16 +++++++++++++++-
airflow/models/taskinstance.py | 19 +++++++++++++++++--
newsfragments/39908.significant.rst | 1 +
tests/_internals/forbidden_warnings.py | 5 +++++
tests/core/test_otel_logger.py | 25 +++++++++++++++++++++----
tests/core/test_stats.py | 30 ++++++++++++++++++++++++++----
9 files changed, 123 insertions(+), 13 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 2e624db827..9c9f7c2153 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1106,6 +1106,17 @@ metrics:
example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
or \"^scheduler,^executor,heartbeat|timeout\""
default: ""
+ metrics_consistency_on:
+ description: |
+ Enables metrics consistency across all metrics loggers (ex: timer and
timing metrics).
+
+ .. warning::
+
+ It is enabled by default from Airflow 3.
+ version_added: 2.10.0
+ type: string
+ example: ~
+ default: "True"
statsd_on:
description: |
Enables sending metrics to StatsD.
diff --git a/airflow/metrics/datadog_logger.py
b/airflow/metrics/datadog_logger.py
index 1564079773..c7bcf1986d 100644
--- a/airflow/metrics/datadog_logger.py
+++ b/airflow/metrics/datadog_logger.py
@@ -19,9 +19,11 @@ from __future__ import annotations
import datetime
import logging
+import warnings
from typing import TYPE_CHECKING
from airflow.configuration import conf
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
AllowListValidator,
@@ -40,6 +42,14 @@ if TYPE_CHECKING:
log = logging.getLogger(__name__)
+metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on",
fallback=True)
+if not metrics_consistency_on:
+ warnings.warn(
+ "Timer and timing metrics publish in seconds were deprecated. It is
enabled by default from Airflow 3 onwards. Enable metrics consistency to
publish all the timer and timing metrics in milliseconds.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+
class SafeDogStatsdLogger:
"""DogStatsd Logger."""
@@ -134,7 +144,10 @@ class SafeDogStatsdLogger:
tags_list = []
if self.metrics_validator.test(stat):
if isinstance(dt, datetime.timedelta):
- dt = dt.total_seconds()
+ if metrics_consistency_on:
+ dt = dt.total_seconds() * 1000.0
+ else:
+ dt = dt.total_seconds()
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
return None
diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py
index 5dac960c16..e8d0f54d73 100644
--- a/airflow/metrics/otel_logger.py
+++ b/airflow/metrics/otel_logger.py
@@ -31,6 +31,7 @@ from opentelemetry.sdk.metrics._internal.export import
ConsoleMetricExporter, Pe
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from airflow.configuration import conf
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
OTEL_NAME_MAX_LENGTH,
@@ -71,6 +72,14 @@ DEFAULT_METRIC_NAME_PREFIX = "airflow"
# Delimiter is placed between the universal metric prefix and the unique
metric name.
DEFAULT_METRIC_NAME_DELIMITER = "."
+metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on",
fallback=True)
+if not metrics_consistency_on:
+ warnings.warn(
+ "Timer and timing metrics publish in seconds were deprecated. It is
enabled by default from Airflow 3 onwards. Enable metrics consistency to
publish all the timer and timing metrics in milliseconds.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+
def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str:
"""Assembles the prefix, delimiter, and name and returns it as a string."""
@@ -274,7 +283,10 @@ class SafeOtelLogger:
"""OTel does not have a native timer, stored as a Gauge whose value is
number of seconds elapsed."""
if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
if isinstance(dt, datetime.timedelta):
- dt = dt.total_seconds()
+ if metrics_consistency_on:
+ dt = dt.total_seconds() * 1000.0
+ else:
+ dt = dt.total_seconds()
self.metrics_map.set_gauge_value(full_name(prefix=self.prefix,
name=stat), float(dt), False, tags)
def timer(
diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py
index c46942ce95..7eef7929e0 100644
--- a/airflow/metrics/protocols.py
+++ b/airflow/metrics/protocols.py
@@ -19,12 +19,23 @@ from __future__ import annotations
import datetime
import time
+import warnings
from typing import Union
+from airflow.configuration import conf
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.typing_compat import Protocol
DeltaType = Union[int, float, datetime.timedelta]
+metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on",
fallback=True)
+if not metrics_consistency_on:
+ warnings.warn(
+ "Timer and timing metrics publish in seconds were deprecated. It is
enabled by default from Airflow 3 onwards. Enable metrics consistency to
publish all the timer and timing metrics in milliseconds.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+
class TimerProtocol(Protocol):
"""Type protocol for StatsLogger.timer."""
@@ -116,6 +127,9 @@ class Timer(TimerProtocol):
def stop(self, send: bool = True) -> None:
"""Stop the timer, and optionally send it to stats backend."""
if self._start_time is not None:
- self.duration = time.perf_counter() - self._start_time
+ if metrics_consistency_on:
+ self.duration = 1000.0 * (time.perf_counter() -
self._start_time) # Convert to milliseconds.
+ else:
+ self.duration = time.perf_counter() - self._start_time
if send and self.real_timer:
self.real_timer.stop()
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 165f5c7987..5f82d84fe5 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -74,6 +74,7 @@ from airflow.datasets.manager import dataset_manager
from airflow.exceptions import (
AirflowException,
AirflowFailException,
+ AirflowProviderDeprecationWarning,
AirflowRescheduleException,
AirflowSensorTimeout,
AirflowSkipException,
@@ -168,6 +169,14 @@ if TYPE_CHECKING:
PAST_DEPENDS_MET = "past_depends_met"
+metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on",
fallback=True)
+if not metrics_consistency_on:
+ warnings.warn(
+ "Timer and timing metrics publish in seconds were deprecated. It is
enabled by default from Airflow 3 onwards. Enable metrics consistency to
publish all the timer and timing metrics in milliseconds.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+
class TaskReturnCode(Enum):
"""
@@ -2809,7 +2818,10 @@ class TaskInstance(Base, LoggingMixin):
self.task_id,
)
return
- timing = timezone.utcnow() - self.queued_dttm
+ if metrics_consistency_on:
+ timing = timezone.utcnow() - self.queued_dttm
+ else:
+ timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
elif new_state == TaskInstanceState.QUEUED:
metric_name = "scheduled_duration"
if self.start_date is None:
@@ -2822,7 +2834,10 @@ class TaskInstance(Base, LoggingMixin):
self.task_id,
)
return
- timing = timezone.utcnow() - self.start_date
+ if metrics_consistency_on:
+ timing = timezone.utcnow() - self.start_date
+ else:
+ timing = (timezone.utcnow() - self.start_date).total_seconds()
else:
raise NotImplementedError("no metric emission setup for state %s",
new_state)
diff --git a/newsfragments/39908.significant.rst
b/newsfragments/39908.significant.rst
new file mode 100644
index 0000000000..bd4a2967ba
--- /dev/null
+++ b/newsfragments/39908.significant.rst
@@ -0,0 +1 @@
+Publishing timer and timing metrics in seconds has been deprecated. In Airflow
3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can
disable this for backward compatibility. To publish all timer and timing
metrics in milliseconds, ensure metrics consistency is enabled
diff --git a/tests/_internals/forbidden_warnings.py
b/tests/_internals/forbidden_warnings.py
index c78e4b0333..324d2ff6f9 100644
--- a/tests/_internals/forbidden_warnings.py
+++ b/tests/_internals/forbidden_warnings.py
@@ -62,6 +62,11 @@ class ForbiddenWarningsPlugin:
# Add marker at the beginning of the markers list. In this case,
it does not conflict with
# filterwarnings markers, which are set explicitly in the test
suite.
item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"),
append=False)
+ item.add_marker(
+ pytest.mark.filterwarnings(
+ "ignore:Timer and timing metrics publish in seconds were
deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics
consistency to publish all the timer and timing metrics in
milliseconds.:DeprecationWarning"
+ )
+ )
@pytest.hookimpl(hookwrapper=True, trylast=True)
def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int):
diff --git a/tests/core/test_otel_logger.py b/tests/core/test_otel_logger.py
index 6cba116f65..d5697e585b 100644
--- a/tests/core/test_otel_logger.py
+++ b/tests/core/test_otel_logger.py
@@ -25,6 +25,7 @@ import pytest
from opentelemetry.metrics import MeterProvider
from airflow.exceptions import InvalidStatsNameException
+from airflow.metrics import otel_logger, protocols
from airflow.metrics.otel_logger import (
OTEL_NAME_MAX_LENGTH,
UP_DOWN_COUNTERS,
@@ -234,12 +235,22 @@ class TestOtelMetrics:
assert self.map[full_name(name)].value == 1
- def test_timing_new_metric(self, name):
- self.stats.timing(name, dt=123)
+ @pytest.mark.parametrize(
+ "metrics_consistency_on",
+ [True, False],
+ )
+ def test_timing_new_metric(self, metrics_consistency_on, name):
+ import datetime
+
+ otel_logger.metrics_consistency_on = metrics_consistency_on
+
+ self.stats.timing(name, dt=datetime.timedelta(seconds=123))
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
+ expected_value = 123000.0 if metrics_consistency_on else 123
+ assert self.map[full_name(name)].value == expected_value
def test_timing_new_metric_with_tags(self, name):
tags = {"hello": "world"}
@@ -265,13 +276,19 @@ class TestOtelMetrics:
# time.perf_count() is called once to get the starting timestamp and
again
# to get the end timestamp. timer() should return the difference as a
float.
+ @pytest.mark.parametrize(
+ "metrics_consistency_on",
+ [True, False],
+ )
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
- def test_timer_with_name_returns_float_and_stores_value(self, mock_time,
name):
+ def test_timer_with_name_returns_float_and_stores_value(self, mock_time,
metrics_consistency_on, name):
+ protocols.metrics_consistency_on = metrics_consistency_on
with self.stats.timer(name) as timer:
pass
assert isinstance(timer.duration, float)
- assert timer.duration == 3.14
+ expected_duration = 3140.0 if metrics_consistency_on else 3.14
+ assert timer.duration == expected_duration
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index 902a0ed003..5127b95927 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import importlib
import logging
import re
+import time
from unittest import mock
from unittest.mock import Mock
@@ -28,6 +29,7 @@ import statsd
import airflow
from airflow.exceptions import AirflowConfigException,
InvalidStatsNameException, RemovedInAirflow3Warning
+from airflow.metrics import datadog_logger, protocols
from airflow.metrics.datadog_logger import SafeDogStatsdLogger
from airflow.metrics.statsd_logger import SafeStatsdLogger
from airflow.metrics.validators import (
@@ -224,24 +226,44 @@ class TestDogStats:
metric="empty_key", sample_rate=1, tags=[], value=1
)
- def test_timer(self):
- with self.dogstatsd.timer("empty_timer"):
+ @pytest.mark.parametrize(
+ "metrics_consistency_on",
+ [True, False],
+ )
+ @mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0])
+ def test_timer(self, time_mock, metrics_consistency_on):
+ protocols.metrics_consistency_on = metrics_consistency_on
+
+ with self.dogstatsd.timer("empty_timer") as timer:
pass
self.dogstatsd_client.timed.assert_called_once_with("empty_timer",
tags=[])
+ expected_duration = 100.0
+ if metrics_consistency_on:
+ expected_duration = 1000.0 * 100.0
+ assert expected_duration == timer.duration
+ assert time_mock.call_count == 2
def test_empty_timer(self):
with self.dogstatsd.timer():
pass
self.dogstatsd_client.timed.assert_not_called()
- def test_timing(self):
+ @pytest.mark.parametrize(
+ "metrics_consistency_on",
+ [True, False],
+ )
+ def test_timing(self, metrics_consistency_on):
import datetime
+ datadog_logger.metrics_consistency_on = metrics_consistency_on
+
self.dogstatsd.timing("empty_timer", 123)
self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer",
value=123, tags=[])
self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123))
- self.dogstatsd_client.timing.assert_called_with(metric="empty_timer",
value=123.0, tags=[])
+ self.dogstatsd_client.timing.assert_called_with(
+ metric="empty_timer", value=123000.0 if metrics_consistency_on
else 123.0, tags=[]
+ )
def test_gauge(self):
self.dogstatsd.gauge("empty", 123)