This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7232ea1b9e52bc2b390a66a70c6d76f4efc44f26
Author: Gopal Dirisala <[email protected]>
AuthorDate: Thu Sep 12 01:16:20 2024 +0530

    Align timers and timing metrics (ms) across all metrics loggers (#39908)
    
    (cherry picked from commit 08daffe1a9cef682647689fd70c207da6e2f1e2d)
---
 airflow/config_templates/config.yml    | 11 +++++++++++
 airflow/metrics/datadog_logger.py      | 15 ++++++++++++++-
 airflow/metrics/otel_logger.py         | 14 +++++++++++++-
 airflow/metrics/protocols.py           | 16 +++++++++++++++-
 airflow/models/taskinstance.py         | 19 +++++++++++++++++--
 newsfragments/39908.significant.rst    |  1 +
 tests/_internals/forbidden_warnings.py |  5 +++++
 tests/core/test_otel_logger.py         | 25 +++++++++++++++++++++----
 tests/core/test_stats.py               | 30 ++++++++++++++++++++++++++----
 9 files changed, 123 insertions(+), 13 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index d90a931db23..94a66dc51c0 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1098,6 +1098,17 @@ metrics:
       example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
       or \"^scheduler,^executor,heartbeat|timeout\""
       default: ""
+    metrics_consistency_on:
+      description: |
+        Enables metrics consistency across all metrics loggers (ex: timer and 
timing metrics).
+
+        .. warning::
+
+          It is enabled by default from Airflow 3.
+      version_added: 2.10.0
+      type: string
+      example: ~
+      default: "True"
     statsd_on:
       description: |
         Enables sending metrics to StatsD.
diff --git a/airflow/metrics/datadog_logger.py 
b/airflow/metrics/datadog_logger.py
index 15640797730..c7bcf1986d8 100644
--- a/airflow/metrics/datadog_logger.py
+++ b/airflow/metrics/datadog_logger.py
@@ -19,9 +19,11 @@ from __future__ import annotations
 
 import datetime
 import logging
+import warnings
 from typing import TYPE_CHECKING
 
 from airflow.configuration import conf
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.metrics.protocols import Timer
 from airflow.metrics.validators import (
     AllowListValidator,
@@ -40,6 +42,14 @@ if TYPE_CHECKING:
 
 log = logging.getLogger(__name__)
 
+metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", 
fallback=True)
+if not metrics_consistency_on:
+    warnings.warn(
+        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable metrics consistency to 
publish all the timer and timing metrics in milliseconds.",
+        AirflowProviderDeprecationWarning,
+        stacklevel=2,
+    )
+
 
 class SafeDogStatsdLogger:
     """DogStatsd Logger."""
@@ -134,7 +144,10 @@ class SafeDogStatsdLogger:
             tags_list = []
         if self.metrics_validator.test(stat):
             if isinstance(dt, datetime.timedelta):
-                dt = dt.total_seconds()
+                if metrics_consistency_on:
+                    dt = dt.total_seconds() * 1000.0
+                else:
+                    dt = dt.total_seconds()
             return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
         return None
 
diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py
index 00f539754ed..d20f68fc5d5 100644
--- a/airflow/metrics/otel_logger.py
+++ b/airflow/metrics/otel_logger.py
@@ -31,6 +31,7 @@ from opentelemetry.sdk.metrics._internal.export import 
ConsoleMetricExporter, Pe
 from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
 
 from airflow.configuration import conf
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.metrics.protocols import Timer
 from airflow.metrics.validators import (
     OTEL_NAME_MAX_LENGTH,
@@ -72,6 +73,14 @@ DEFAULT_METRIC_NAME_PREFIX = "airflow"
 # Delimiter is placed between the universal metric prefix and the unique 
metric name.
 DEFAULT_METRIC_NAME_DELIMITER = "."
 
+metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", 
fallback=True)
+if not metrics_consistency_on:
+    warnings.warn(
+        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable metrics consistency to 
publish all the timer and timing metrics in milliseconds.",
+        AirflowProviderDeprecationWarning,
+        stacklevel=2,
+    )
+
 
 def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str:
     """Assembles the prefix, delimiter, and name and returns it as a string."""
@@ -275,7 +284,10 @@ class SafeOtelLogger:
         """OTel does not have a native timer, stored as a Gauge whose value is 
number of seconds elapsed."""
         if self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat):
             if isinstance(dt, datetime.timedelta):
-                dt = dt.total_seconds()
+                if metrics_consistency_on:
+                    dt = dt.total_seconds() * 1000.0
+                else:
+                    dt = dt.total_seconds()
             self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, 
name=stat), float(dt), False, tags)
 
     def timer(
diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py
index c46942ce95f..7eef7929e02 100644
--- a/airflow/metrics/protocols.py
+++ b/airflow/metrics/protocols.py
@@ -19,12 +19,23 @@ from __future__ import annotations
 
 import datetime
 import time
+import warnings
 from typing import Union
 
+from airflow.configuration import conf
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.typing_compat import Protocol
 
 DeltaType = Union[int, float, datetime.timedelta]
 
+metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", 
fallback=True)
+if not metrics_consistency_on:
+    warnings.warn(
+        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable metrics consistency to 
publish all the timer and timing metrics in milliseconds.",
+        AirflowProviderDeprecationWarning,
+        stacklevel=2,
+    )
+
 
 class TimerProtocol(Protocol):
     """Type protocol for StatsLogger.timer."""
@@ -116,6 +127,9 @@ class Timer(TimerProtocol):
     def stop(self, send: bool = True) -> None:
         """Stop the timer, and optionally send it to stats backend."""
         if self._start_time is not None:
-            self.duration = time.perf_counter() - self._start_time
+            if metrics_consistency_on:
+                self.duration = 1000.0 * (time.perf_counter() - 
self._start_time)  # Convert to milliseconds.
+            else:
+                self.duration = time.perf_counter() - self._start_time
         if send and self.real_timer:
             self.real_timer.stop()
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 755a4685203..d75fca612b0 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -77,6 +77,7 @@ from airflow.datasets.manager import dataset_manager
 from airflow.exceptions import (
     AirflowException,
     AirflowFailException,
+    AirflowProviderDeprecationWarning,
     AirflowRescheduleException,
     AirflowSensorTimeout,
     AirflowSkipException,
@@ -173,6 +174,14 @@ if TYPE_CHECKING:
 
 PAST_DEPENDS_MET = "past_depends_met"
 
+metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", 
fallback=True)
+if not metrics_consistency_on:
+    warnings.warn(
+        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable metrics consistency to 
publish all the timer and timing metrics in milliseconds.",
+        AirflowProviderDeprecationWarning,
+        stacklevel=2,
+    )
+
 
 class TaskReturnCode(Enum):
     """
@@ -2960,7 +2969,10 @@ class TaskInstance(Base, LoggingMixin):
                     self.task_id,
                 )
                 return
-            timing = timezone.utcnow() - self.queued_dttm
+            if metrics_consistency_on:
+                timing = timezone.utcnow() - self.queued_dttm
+            else:
+                timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
         elif new_state == TaskInstanceState.QUEUED:
             metric_name = "scheduled_duration"
             if self.start_date is None:
@@ -2973,7 +2985,10 @@ class TaskInstance(Base, LoggingMixin):
                     self.task_id,
                 )
                 return
-            timing = timezone.utcnow() - self.start_date
+            if metrics_consistency_on:
+                timing = timezone.utcnow() - self.start_date
+            else:
+                timing = (timezone.utcnow() - self.start_date).total_seconds()
         else:
             raise NotImplementedError("no metric emission setup for state %s", 
new_state)
 
diff --git a/newsfragments/39908.significant.rst 
b/newsfragments/39908.significant.rst
new file mode 100644
index 00000000000..bd4a2967ba4
--- /dev/null
+++ b/newsfragments/39908.significant.rst
@@ -0,0 +1 @@
+Publishing timer and timing metrics in seconds has been deprecated. In Airflow 
3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can 
disable this for backward compatibility. To publish all timer and timing 
metrics in milliseconds, ensure metrics consistency is enabled
diff --git a/tests/_internals/forbidden_warnings.py 
b/tests/_internals/forbidden_warnings.py
index c78e4b0333f..324d2ff6f98 100644
--- a/tests/_internals/forbidden_warnings.py
+++ b/tests/_internals/forbidden_warnings.py
@@ -62,6 +62,11 @@ class ForbiddenWarningsPlugin:
             # Add marker at the beginning of the markers list. In this case, 
it does not conflict with
             # filterwarnings markers, which are set explicitly in the test 
suite.
             item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), 
append=False)
+        item.add_marker(
+            pytest.mark.filterwarnings(
+                "ignore:Timer and timing metrics publish in seconds were 
deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics 
consistency to publish all the timer and timing metrics in 
milliseconds.:DeprecationWarning"
+            )
+        )
 
     @pytest.hookimpl(hookwrapper=True, trylast=True)
     def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int):
diff --git a/tests/core/test_otel_logger.py b/tests/core/test_otel_logger.py
index 6cba116f652..d5697e585b4 100644
--- a/tests/core/test_otel_logger.py
+++ b/tests/core/test_otel_logger.py
@@ -25,6 +25,7 @@ import pytest
 from opentelemetry.metrics import MeterProvider
 
 from airflow.exceptions import InvalidStatsNameException
+from airflow.metrics import otel_logger, protocols
 from airflow.metrics.otel_logger import (
     OTEL_NAME_MAX_LENGTH,
     UP_DOWN_COUNTERS,
@@ -234,12 +235,22 @@ class TestOtelMetrics:
 
         assert self.map[full_name(name)].value == 1
 
-    def test_timing_new_metric(self, name):
-        self.stats.timing(name, dt=123)
+    @pytest.mark.parametrize(
+        "metrics_consistency_on",
+        [True, False],
+    )
+    def test_timing_new_metric(self, metrics_consistency_on, name):
+        import datetime
+
+        otel_logger.metrics_consistency_on = metrics_consistency_on
+
+        self.stats.timing(name, dt=datetime.timedelta(seconds=123))
 
         self.meter.get_meter().create_observable_gauge.assert_called_once_with(
             name=full_name(name), callbacks=ANY
         )
+        expected_value = 123000.0 if metrics_consistency_on else 123
+        assert self.map[full_name(name)].value == expected_value
 
     def test_timing_new_metric_with_tags(self, name):
         tags = {"hello": "world"}
@@ -265,13 +276,19 @@ class TestOtelMetrics:
     #   time.perf_count() is called once to get the starting timestamp and 
again
     #   to get the end timestamp.  timer() should return the difference as a 
float.
 
+    @pytest.mark.parametrize(
+        "metrics_consistency_on",
+        [True, False],
+    )
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
-    def test_timer_with_name_returns_float_and_stores_value(self, mock_time, 
name):
+    def test_timer_with_name_returns_float_and_stores_value(self, mock_time, 
metrics_consistency_on, name):
+        protocols.metrics_consistency_on = metrics_consistency_on
         with self.stats.timer(name) as timer:
             pass
 
         assert isinstance(timer.duration, float)
-        assert timer.duration == 3.14
+        expected_duration = 3140.0 if metrics_consistency_on else 3.14
+        assert timer.duration == expected_duration
         assert mock_time.call_count == 2
         self.meter.get_meter().create_observable_gauge.assert_called_once_with(
             name=full_name(name), callbacks=ANY
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index e0386cab1b9..23e0e21f317 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import importlib
 import logging
 import re
+import time
 from unittest import mock
 from unittest.mock import Mock
 
@@ -28,6 +29,7 @@ import statsd
 
 import airflow
 from airflow.exceptions import AirflowConfigException, 
InvalidStatsNameException, RemovedInAirflow3Warning
+from airflow.metrics import datadog_logger, protocols
 from airflow.metrics.datadog_logger import SafeDogStatsdLogger
 from airflow.metrics.statsd_logger import SafeStatsdLogger
 from airflow.metrics.validators import (
@@ -224,24 +226,44 @@ class TestDogStats:
             metric="empty_key", sample_rate=1, tags=[], value=1
         )
 
-    def test_timer(self):
-        with self.dogstatsd.timer("empty_timer"):
+    @pytest.mark.parametrize(
+        "metrics_consistency_on",
+        [True, False],
+    )
+    @mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0])
+    def test_timer(self, time_mock, metrics_consistency_on):
+        protocols.metrics_consistency_on = metrics_consistency_on
+
+        with self.dogstatsd.timer("empty_timer") as timer:
             pass
         self.dogstatsd_client.timed.assert_called_once_with("empty_timer", 
tags=[])
+        expected_duration = 100.0
+        if metrics_consistency_on:
+            expected_duration = 1000.0 * 100.0
+        assert expected_duration == timer.duration
+        assert time_mock.call_count == 2
 
     def test_empty_timer(self):
         with self.dogstatsd.timer():
             pass
         self.dogstatsd_client.timed.assert_not_called()
 
-    def test_timing(self):
+    @pytest.mark.parametrize(
+        "metrics_consistency_on",
+        [True, False],
+    )
+    def test_timing(self, metrics_consistency_on):
         import datetime
 
+        datadog_logger.metrics_consistency_on = metrics_consistency_on
+
         self.dogstatsd.timing("empty_timer", 123)
         
self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", 
value=123, tags=[])
 
         self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123))
-        self.dogstatsd_client.timing.assert_called_with(metric="empty_timer", 
value=123.0, tags=[])
+        self.dogstatsd_client.timing.assert_called_with(
+            metric="empty_timer", value=123000.0 if metrics_consistency_on 
else 123.0, tags=[]
+        )
 
     def test_gauge(self):
         self.dogstatsd.gauge("empty", 123)

Reply via email to