This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 44c7bee37d AIP-49 OTel Timers (#31859)
44c7bee37d is described below

commit 44c7bee37db1fcbaf1563a04747f1128ce9f90a4
Author: D. Ferruzzi <[email protected]>
AuthorDate: Wed Jun 21 12:07:04 2023 -0700

    AIP-49 OTel Timers (#31859)
---
 airflow/metrics/otel_logger.py | 46 +++++++++++++++++-----
 tests/core/test_otel_logger.py | 88 +++++++++++++++++++++++++++++++++++++-----
 2 files changed, 115 insertions(+), 19 deletions(-)

diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py
index 7ec4118a71..6f1b2f9697 100644
--- a/airflow/metrics/otel_logger.py
+++ b/airflow/metrics/otel_logger.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import datetime
 import logging
 import random
 import warnings
@@ -36,7 +37,6 @@ from airflow.metrics.validators import (
     OTEL_NAME_MAX_LENGTH,
     AllowListValidator,
     stat_name_otel_handler,
-    validate_stat,
 )
 
 log = logging.getLogger(__name__)
@@ -131,6 +131,29 @@ def _skip_due_to_rate(rate: float) -> bool:
     return rate < 1 and random.random() > rate
 
 
+class _OtelTimer(Timer):
+    """
+    An implementation of Stats.Timer() which records the result in the OTel 
Metrics Map.
+    OpenTelemetry does not have a native timer, we will store the values as a 
Gauge.
+
+    :param name: The name of the timer.
+    :param tags: Tags to append to the timer.
+    """
+
+    def __init__(self, otel_logger: SafeOtelLogger, name: str | None, tags: 
Attributes):
+        super().__init__()
+        self.otel_logger = otel_logger
+        self.name = name
+        self.tags = tags
+
+    def stop(self, send: bool = True) -> None:
+        super().stop(send)
+        if self.name and send:
+            self.otel_logger.metrics_map.set_gauge_value(
+                full_name(prefix=self.otel_logger.prefix, name=self.name), 
self.duration, False, self.tags
+            )
+
+
 class SafeOtelLogger:
     """Otel Logger."""
 
@@ -198,7 +221,6 @@ class SafeOtelLogger:
             counter.add(-count, attributes=tags)
             return counter
 
-    @validate_stat
     def gauge(
         self,
         stat: str,
@@ -233,7 +255,6 @@ class SafeOtelLogger:
         if self.metrics_validator.test(stat):
             self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, 
name=stat), value, delta, tags)
 
-    @validate_stat
     def timing(
         self,
         stat: str,
@@ -241,10 +262,15 @@ class SafeOtelLogger:
         *,
         tags: Attributes = None,
     ) -> None:
-        warnings.warn(f"Create timer {stat}: OpenTelemetry Timers are not yet 
implemented.")
-        return None
+        """
+        OpenTelemetry does not have a native timer, they are stored
+        as a Gauge whose value represents the seconds elapsed.
+        """
+        if self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat):
+            if isinstance(dt, datetime.timedelta):
+                dt = dt.total_seconds()
+            self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, 
name=stat), float(dt), False, tags)
 
-    @validate_stat
     def timer(
         self,
         stat: str | None = None,
@@ -252,8 +278,8 @@ class SafeOtelLogger:
         tags: Attributes = None,
         **kwargs,
     ) -> TimerProtocol:
-        warnings.warn(f"Create timer {stat}: OpenTelemetry Timers are not yet 
implemented.")
-        return Timer()
+        """Timer context manager returns the duration and can be cancelled."""
+        return _OtelTimer(self, stat, tags)
 
 
 class MetricsMap:
@@ -304,7 +330,7 @@ class MetricsMap:
         if key in self.map.keys():
             del self.map[key]
 
-    def set_gauge_value(self, name: str, value: float, delta: bool, tags: 
Attributes):
+    def set_gauge_value(self, name: str, value: float | None, delta: bool, 
tags: Attributes):
         """
         Overrides the last reading for a Gauge with a new value.
 
@@ -315,7 +341,7 @@ class MetricsMap:
         :returns: None
         """
         key: str = _generate_key_name(name, tags)
-        new_value = value
+        new_value = value or DEFAULT_GAUGE_VALUE
         old_value = self.poke_gauge(name, tags)
         if delta:
             new_value += old_value
diff --git a/tests/core/test_otel_logger.py b/tests/core/test_otel_logger.py
index dcffa2d60e..75f4ebdc80 100644
--- a/tests/core/test_otel_logger.py
+++ b/tests/core/test_otel_logger.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import logging
+import time
 from unittest import mock
 from unittest.mock import ANY
 
@@ -121,6 +122,7 @@ class TestOtelMetrics:
         self.stats.incr(name)
 
         assert self.map[full_name(name)].add.call_count == 2
+        
self.meter.get_meter().create_counter.assert_called_once_with(name=full_name(name))
 
     @mock.patch("random.random", side_effect=[0.1, 0.9])
     def test_incr_with_rate_limit_works(self, mock_random, name):
@@ -227,13 +229,81 @@ class TestOtelMetrics:
 
         assert self.map[full_name(name)].value == 1
 
-    @mock.patch("warnings.warn")
-    def test_timer_warns_not_implemented(self, mock_warn):
-        class MessageContaining(str):
-            def __eq__(self, other):
-                return self in other
+    def test_timing_new_metric(self, name):
+        self.stats.timing(name, dt=123)
+
+        self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+            name=full_name(name), callbacks=ANY
+        )
+
+    def test_timing_new_metric_with_tags(self, name):
+        tags = {"hello": "world"}
+        key = _generate_key_name(full_name(name), tags)
+
+        self.stats.timing(name, dt=1, tags=tags)
+
+        self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+            name=full_name(name), callbacks=ANY
+        )
+        self.map[key].attributes == tags
+
+    def test_timing_existing_metric(self, name):
+        self.stats.timing(name, dt=1)
+        self.stats.timing(name, dt=2)
 
-        with self.stats.timer():
-            mock_warn.assert_called_once_with(
-                MessageContaining("OpenTelemetry Timers are not yet 
implemented.")
-            )
+        self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+            name=full_name(name), callbacks=ANY
+        )
+        assert self.map[full_name(name)].value == 2
+
+    # For the four test_timer_foo tests below:
+    #   time.perf_count() is called once to get the starting timestamp and 
again
+    #   to get the end timestamp.  timer() should return the difference as a 
float.
+
+    @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
+    def test_timer_with_name_returns_float_and_stores_value(self, mock_time, 
name):
+        with self.stats.timer(name) as timer:
+            pass
+
+        assert isinstance(timer.duration, float)
+        assert timer.duration == 3.14
+        assert mock_time.call_count == 2
+        self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+            name=full_name(name), callbacks=ANY
+        )
+
+    @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
+    def test_timer_no_name_returns_float_but_does_not_store_value(self, 
mock_time, name):
+        with self.stats.timer() as timer:
+            pass
+
+        assert isinstance(timer.duration, float)
+        assert timer.duration == 3.14
+        assert mock_time.call_count == 2
+        self.meter.get_meter().create_observable_gauge.assert_not_called()
+
+    @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
+    def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
+        timer = self.stats.timer(name)
+        timer.start()
+        # Perform some task
+        timer.stop(send=False)
+
+        assert isinstance(timer.duration, float)
+        assert timer.duration == 3.14
+        assert mock_time.call_count == 2
+        self.meter.get_meter().create_observable_gauge.assert_not_called()
+
+    @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
+    def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
+        timer = self.stats.timer(name)
+        timer.start()
+        # Perform some task
+        timer.stop(send=True)
+
+        assert isinstance(timer.duration, float)
+        assert timer.duration == 3.14
+        assert mock_time.call_count == 2
+        self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+            name=full_name(name), callbacks=ANY
+        )

Reply via email to