This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4f68b5aa82cd7b1a60de1f2c76c1e14c7ad8c5d4
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Nov 13 16:28:37 2024 +0000

    Refactor & rename `metrics_consistency_on` conf to `timer_unit_consistency` 
(#43966)
    
    Changes:
    - Replaces the `metrics_consistency_on` config with 
`timer_unit_consistency` for better clarity!
    - Improves the newsfragment entry & deprecation warning
    - Changes the default to be `False` so folks aren't caught by surprise.
    
    We should backport this to 2.11 and remove this setting from Airflow main
    
    (cherry picked from commit 1bd061df6e4409c5330d258d11fb7633e861a622)
---
 airflow/config_templates/config.yml    | 19 ++++++++----
 airflow/metrics/datadog_logger.py      | 12 ++++----
 airflow/metrics/otel_logger.py         | 12 ++++----
 airflow/metrics/protocols.py           | 12 ++++----
 airflow/models/taskinstance.py         | 13 ++++----
 newsfragments/39908.significant.rst    | 12 +++++++-
 tests/_internals/forbidden_warnings.py |  2 +-
 tests/core/test_otel_logger.py         | 55 +++++++++++++++++++++++++---------
 tests/core/test_stats.py               | 16 +++++-----
 9 files changed, 98 insertions(+), 55 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 94a66dc51c0..1e52a05bd43 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1098,17 +1098,24 @@ metrics:
       example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
       or \"^scheduler,^executor,heartbeat|timeout\""
       default: ""
-    metrics_consistency_on:
-      description: |
-        Enables metrics consistency across all metrics loggers (ex: timer and 
timing metrics).
+    # TODO: Remove 'timer_unit_consistency' in Airflow 3.0
+    timer_unit_consistency:
+      description: |
+        Controls the consistency of timer units across all metrics loggers
+        (e.g., Statsd, Datadog, OpenTelemetry)
+        for timing and duration-based metrics. When enabled, all timers will 
publish
+        metrics in milliseconds for consistency and alignment with Airflow's 
default
+        metrics behavior in version 3.0+.
 
         .. warning::
 
-          It is enabled by default from Airflow 3.
-      version_added: 2.10.0
+          It will be the default behavior from Airflow 3.0. If disabled, 
timers may publish
+          in seconds for backwards compatibility, though it is recommended to 
enable this
+          setting to ensure metric uniformity and forward-compat with Airflow 
3.
+      version_added: 2.11.0
       type: string
       example: ~
-      default: "True"
+      default: "False"
     statsd_on:
       description: |
         Enables sending metrics to StatsD.
diff --git a/airflow/metrics/datadog_logger.py 
b/airflow/metrics/datadog_logger.py
index c7bcf1986d8..00aa01c88b7 100644
--- a/airflow/metrics/datadog_logger.py
+++ b/airflow/metrics/datadog_logger.py
@@ -23,7 +23,7 @@ import warnings
 from typing import TYPE_CHECKING
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.metrics.protocols import Timer
 from airflow.metrics.validators import (
     AllowListValidator,
@@ -42,11 +42,11 @@ if TYPE_CHECKING:
 
 log = logging.getLogger(__name__)
 
-metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", 
fallback=True)
-if not metrics_consistency_on:
+timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
+if not timer_unit_consistency:
     warnings.warn(
-        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable metrics consistency to 
publish all the timer and timing metrics in milliseconds.",
-        AirflowProviderDeprecationWarning,
+        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to 
publish all the timer and timing metrics in milliseconds.",
+        RemovedInAirflow3Warning,
         stacklevel=2,
     )
 
@@ -144,7 +144,7 @@ class SafeDogStatsdLogger:
             tags_list = []
         if self.metrics_validator.test(stat):
             if isinstance(dt, datetime.timedelta):
-                if metrics_consistency_on:
+                if timer_unit_consistency:
                     dt = dt.total_seconds() * 1000.0
                 else:
                     dt = dt.total_seconds()
diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py
index d20f68fc5d5..e22f1d640ee 100644
--- a/airflow/metrics/otel_logger.py
+++ b/airflow/metrics/otel_logger.py
@@ -31,7 +31,7 @@ from opentelemetry.sdk.metrics._internal.export import 
ConsoleMetricExporter, Pe
 from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.metrics.protocols import Timer
 from airflow.metrics.validators import (
     OTEL_NAME_MAX_LENGTH,
@@ -73,11 +73,11 @@ DEFAULT_METRIC_NAME_PREFIX = "airflow"
 # Delimiter is placed between the universal metric prefix and the unique 
metric name.
 DEFAULT_METRIC_NAME_DELIMITER = "."
 
-metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", 
fallback=True)
-if not metrics_consistency_on:
+timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
+if not timer_unit_consistency:
     warnings.warn(
-        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable metrics consistency to 
publish all the timer and timing metrics in milliseconds.",
-        AirflowProviderDeprecationWarning,
+        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to 
publish all the timer and timing metrics in milliseconds.",
+        RemovedInAirflow3Warning,
         stacklevel=2,
     )
 
@@ -284,7 +284,7 @@ class SafeOtelLogger:
         """OTel does not have a native timer, stored as a Gauge whose value is 
number of seconds elapsed."""
         if self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat):
             if isinstance(dt, datetime.timedelta):
-                if metrics_consistency_on:
+                if timer_unit_consistency:
                     dt = dt.total_seconds() * 1000.0
                 else:
                     dt = dt.total_seconds()
diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py
index 7eef7929e02..0d12704e87a 100644
--- a/airflow/metrics/protocols.py
+++ b/airflow/metrics/protocols.py
@@ -23,16 +23,16 @@ import warnings
 from typing import Union
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.typing_compat import Protocol
 
 DeltaType = Union[int, float, datetime.timedelta]
 
-metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", 
fallback=True)
-if not metrics_consistency_on:
+timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
+if not timer_unit_consistency:
     warnings.warn(
-        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable metrics consistency to 
publish all the timer and timing metrics in milliseconds.",
-        AirflowProviderDeprecationWarning,
+        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to 
publish all the timer and timing metrics in milliseconds.",
+        RemovedInAirflow3Warning,
         stacklevel=2,
     )
 
@@ -127,7 +127,7 @@ class Timer(TimerProtocol):
     def stop(self, send: bool = True) -> None:
         """Stop the timer, and optionally send it to stats backend."""
         if self._start_time is not None:
-            if metrics_consistency_on:
+            if timer_unit_consistency:
                 self.duration = 1000.0 * (time.perf_counter() - 
self._start_time)  # Convert to milliseconds.
             else:
                 self.duration = time.perf_counter() - self._start_time
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index d75fca612b0..3871497f9e0 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -77,7 +77,6 @@ from airflow.datasets.manager import dataset_manager
 from airflow.exceptions import (
     AirflowException,
     AirflowFailException,
-    AirflowProviderDeprecationWarning,
     AirflowRescheduleException,
     AirflowSensorTimeout,
     AirflowSkipException,
@@ -174,11 +173,11 @@ if TYPE_CHECKING:
 
 PAST_DEPENDS_MET = "past_depends_met"
 
-metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", 
fallback=True)
-if not metrics_consistency_on:
+timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
+if not timer_unit_consistency:
     warnings.warn(
-        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable metrics consistency to 
publish all the timer and timing metrics in milliseconds.",
-        AirflowProviderDeprecationWarning,
+        "Timer and timing metrics publish in seconds were deprecated. It is 
enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to 
publish all the timer and timing metrics in milliseconds.",
+        RemovedInAirflow3Warning,
         stacklevel=2,
     )
 
@@ -2969,7 +2968,7 @@ class TaskInstance(Base, LoggingMixin):
                     self.task_id,
                 )
                 return
-            if metrics_consistency_on:
+            if timer_unit_consistency:
                 timing = timezone.utcnow() - self.queued_dttm
             else:
                 timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
@@ -2985,7 +2984,7 @@ class TaskInstance(Base, LoggingMixin):
                     self.task_id,
                 )
                 return
-            if metrics_consistency_on:
+            if timer_unit_consistency:
                 timing = timezone.utcnow() - self.start_date
             else:
                 timing = (timezone.utcnow() - self.start_date).total_seconds()
diff --git a/newsfragments/39908.significant.rst 
b/newsfragments/39908.significant.rst
index bd4a2967ba4..d5ba99fa9fa 100644
--- a/newsfragments/39908.significant.rst
+++ b/newsfragments/39908.significant.rst
@@ -1 +1,11 @@
-Publishing timer and timing metrics in seconds has been deprecated. In Airflow 
3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can 
disable this for backward compatibility. To publish all timer and timing 
metrics in milliseconds, ensure metrics consistency is enabled
+Publishing timer and timing metrics in seconds is now deprecated.
+
+In Airflow 3.0, the ``timer_unit_consistency`` setting in the ``metrics`` 
section will be
+enabled by default and setting itself will be removed. This will standardize 
all timer and timing metrics to
+milliseconds across all metric loggers.
+
+**Users Integrating with Datadog, OpenTelemetry, or other metric backends** 
should enable this setting. For users, using
+``statsd``, this change will not affect you.
+
+If you need backward compatibility, you can leave this setting disabled 
temporarily, but enabling
+``timer_unit_consistency`` is encouraged to future-proof your metrics setup.
diff --git a/tests/_internals/forbidden_warnings.py 
b/tests/_internals/forbidden_warnings.py
index 324d2ff6f98..49389500c8a 100644
--- a/tests/_internals/forbidden_warnings.py
+++ b/tests/_internals/forbidden_warnings.py
@@ -64,7 +64,7 @@ class ForbiddenWarningsPlugin:
             item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), 
append=False)
         item.add_marker(
             pytest.mark.filterwarnings(
-                "ignore:Timer and timing metrics publish in seconds were 
deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics 
consistency to publish all the timer and timing metrics in 
milliseconds.:DeprecationWarning"
+                "ignore:Timer and timing metrics publish in seconds were 
deprecated. It is enabled by default from Airflow 3 onwards. Enable 
timer_unit_consistency to publish all the timer and timing metrics in 
milliseconds.:DeprecationWarning"
             )
         )
 
diff --git a/tests/core/test_otel_logger.py b/tests/core/test_otel_logger.py
index d5697e585b4..a4bf7c4c415 100644
--- a/tests/core/test_otel_logger.py
+++ b/tests/core/test_otel_logger.py
@@ -236,20 +236,20 @@ class TestOtelMetrics:
         assert self.map[full_name(name)].value == 1
 
     @pytest.mark.parametrize(
-        "metrics_consistency_on",
+        "timer_unit_consistency",
         [True, False],
     )
-    def test_timing_new_metric(self, metrics_consistency_on, name):
+    def test_timing_new_metric(self, timer_unit_consistency, name):
         import datetime
 
-        otel_logger.metrics_consistency_on = metrics_consistency_on
+        otel_logger.timer_unit_consistency = timer_unit_consistency
 
         self.stats.timing(name, dt=datetime.timedelta(seconds=123))
 
         self.meter.get_meter().create_observable_gauge.assert_called_once_with(
             name=full_name(name), callbacks=ANY
         )
-        expected_value = 123000.0 if metrics_consistency_on else 123
+        expected_value = 123000.0 if timer_unit_consistency else 123
         assert self.map[full_name(name)].value == expected_value
 
     def test_timing_new_metric_with_tags(self, name):
@@ -277,54 +277,81 @@ class TestOtelMetrics:
     #   to get the end timestamp.  timer() should return the difference as a 
float.
 
     @pytest.mark.parametrize(
-        "metrics_consistency_on",
+        "timer_unit_consistency",
         [True, False],
     )
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
-    def test_timer_with_name_returns_float_and_stores_value(self, mock_time, 
metrics_consistency_on, name):
-        protocols.metrics_consistency_on = metrics_consistency_on
+    def test_timer_with_name_returns_float_and_stores_value(self, mock_time, 
timer_unit_consistency, name):
+        protocols.timer_unit_consistency = timer_unit_consistency
         with self.stats.timer(name) as timer:
             pass
 
         assert isinstance(timer.duration, float)
-        expected_duration = 3140.0 if metrics_consistency_on else 3.14
+        expected_duration = 3140.0 if timer_unit_consistency else 3.14
         assert timer.duration == expected_duration
         assert mock_time.call_count == 2
         self.meter.get_meter().create_observable_gauge.assert_called_once_with(
             name=full_name(name), callbacks=ANY
         )
 
+    @pytest.mark.parametrize(
+        "timer_unit_consistency",
+        [True, False],
+    )
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
-    def test_timer_no_name_returns_float_but_does_not_store_value(self, 
mock_time, name):
+    def test_timer_no_name_returns_float_but_does_not_store_value(
+        self, mock_time, timer_unit_consistency, name
+    ):
+        protocols.timer_unit_consistency = timer_unit_consistency
         with self.stats.timer() as timer:
             pass
 
         assert isinstance(timer.duration, float)
-        assert timer.duration == 3.14
+        expected_duration = 3140.0 if timer_unit_consistency else 3.14
+        assert timer.duration == expected_duration
         assert mock_time.call_count == 2
         self.meter.get_meter().create_observable_gauge.assert_not_called()
 
+    @pytest.mark.parametrize(
+        "timer_unit_consistency",
+        [
+            True,
+            False,
+        ],
+    )
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
-    def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
+    def test_timer_start_and_stop_manually_send_false(self, mock_time, 
timer_unit_consistency, name):
+        protocols.timer_unit_consistency = timer_unit_consistency
+
         timer = self.stats.timer(name)
         timer.start()
         # Perform some task
         timer.stop(send=False)
 
         assert isinstance(timer.duration, float)
-        assert timer.duration == 3.14
+        expected_value = 3140.0 if timer_unit_consistency else 3.14
+        assert timer.duration == expected_value
         assert mock_time.call_count == 2
         self.meter.get_meter().create_observable_gauge.assert_not_called()
 
+    @pytest.mark.parametrize(
+        "timer_unit_consistency",
+        [
+            True,
+            False,
+        ],
+    )
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
-    def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
+    def test_timer_start_and_stop_manually_send_true(self, mock_time, 
timer_unit_consistency, name):
+        protocols.timer_unit_consistency = timer_unit_consistency
         timer = self.stats.timer(name)
         timer.start()
         # Perform some task
         timer.stop(send=True)
 
         assert isinstance(timer.duration, float)
-        assert timer.duration == 3.14
+        expected_value = 3140.0 if timer_unit_consistency else 3.14
+        assert timer.duration == expected_value
         assert mock_time.call_count == 2
         self.meter.get_meter().create_observable_gauge.assert_called_once_with(
             name=full_name(name), callbacks=ANY
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index 23e0e21f317..c5ef3f2c2af 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -227,18 +227,18 @@ class TestDogStats:
         )
 
     @pytest.mark.parametrize(
-        "metrics_consistency_on",
+        "timer_unit_consistency",
         [True, False],
     )
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0])
-    def test_timer(self, time_mock, metrics_consistency_on):
-        protocols.metrics_consistency_on = metrics_consistency_on
+    def test_timer(self, time_mock, timer_unit_consistency):
+        protocols.timer_unit_consistency = timer_unit_consistency
 
         with self.dogstatsd.timer("empty_timer") as timer:
             pass
         self.dogstatsd_client.timed.assert_called_once_with("empty_timer", 
tags=[])
         expected_duration = 100.0
-        if metrics_consistency_on:
+        if timer_unit_consistency:
             expected_duration = 1000.0 * 100.0
         assert expected_duration == timer.duration
         assert time_mock.call_count == 2
@@ -249,20 +249,20 @@ class TestDogStats:
         self.dogstatsd_client.timed.assert_not_called()
 
     @pytest.mark.parametrize(
-        "metrics_consistency_on",
+        "timer_unit_consistency",
         [True, False],
     )
-    def test_timing(self, metrics_consistency_on):
+    def test_timing(self, timer_unit_consistency):
         import datetime
 
-        datadog_logger.metrics_consistency_on = metrics_consistency_on
+        datadog_logger.timer_unit_consistency = timer_unit_consistency
 
         self.dogstatsd.timing("empty_timer", 123)
         
self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", 
value=123, tags=[])
 
         self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123))
         self.dogstatsd_client.timing.assert_called_with(
-            metric="empty_timer", value=123000.0 if metrics_consistency_on 
else 123.0, tags=[]
+            metric="empty_timer", value=123000.0 if timer_unit_consistency 
else 123.0, tags=[]
         )
 
     def test_gauge(self):

Reply via email to