This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 44c7bee37d AIP-49 OTel Timers (#31859)
44c7bee37d is described below
commit 44c7bee37db1fcbaf1563a04747f1128ce9f90a4
Author: D. Ferruzzi <[email protected]>
AuthorDate: Wed Jun 21 12:07:04 2023 -0700
AIP-49 OTel Timers (#31859)
---
airflow/metrics/otel_logger.py | 46 +++++++++++++++++-----
tests/core/test_otel_logger.py | 88 +++++++++++++++++++++++++++++++++++++-----
2 files changed, 115 insertions(+), 19 deletions(-)
diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py
index 7ec4118a71..6f1b2f9697 100644
--- a/airflow/metrics/otel_logger.py
+++ b/airflow/metrics/otel_logger.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import datetime
import logging
import random
import warnings
@@ -36,7 +37,6 @@ from airflow.metrics.validators import (
OTEL_NAME_MAX_LENGTH,
AllowListValidator,
stat_name_otel_handler,
- validate_stat,
)
log = logging.getLogger(__name__)
@@ -131,6 +131,29 @@ def _skip_due_to_rate(rate: float) -> bool:
return rate < 1 and random.random() > rate
+class _OtelTimer(Timer):
+ """
+ An implementation of Stats.Timer() which records the result in the OTel
Metrics Map.
+ OpenTelemetry does not have a native timer, we will store the values as a
Gauge.
+
+ :param name: The name of the timer.
+ :param tags: Tags to append to the timer.
+ """
+
+ def __init__(self, otel_logger: SafeOtelLogger, name: str | None, tags:
Attributes):
+ super().__init__()
+ self.otel_logger = otel_logger
+ self.name = name
+ self.tags = tags
+
+ def stop(self, send: bool = True) -> None:
+ super().stop(send)
+ if self.name and send:
+ self.otel_logger.metrics_map.set_gauge_value(
+ full_name(prefix=self.otel_logger.prefix, name=self.name),
self.duration, False, self.tags
+ )
+
+
class SafeOtelLogger:
"""Otel Logger."""
@@ -198,7 +221,6 @@ class SafeOtelLogger:
counter.add(-count, attributes=tags)
return counter
- @validate_stat
def gauge(
self,
stat: str,
@@ -233,7 +255,6 @@ class SafeOtelLogger:
if self.metrics_validator.test(stat):
self.metrics_map.set_gauge_value(full_name(prefix=self.prefix,
name=stat), value, delta, tags)
- @validate_stat
def timing(
self,
stat: str,
@@ -241,10 +262,15 @@ class SafeOtelLogger:
*,
tags: Attributes = None,
) -> None:
- warnings.warn(f"Create timer {stat}: OpenTelemetry Timers are not yet
implemented.")
- return None
+ """
+ OpenTelemetry does not have a native timer, they are stored
+ as a Gauge whose value represents the seconds elapsed.
+ """
+ if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
+ if isinstance(dt, datetime.timedelta):
+ dt = dt.total_seconds()
+ self.metrics_map.set_gauge_value(full_name(prefix=self.prefix,
name=stat), float(dt), False, tags)
- @validate_stat
def timer(
self,
stat: str | None = None,
@@ -252,8 +278,8 @@ class SafeOtelLogger:
tags: Attributes = None,
**kwargs,
) -> TimerProtocol:
- warnings.warn(f"Create timer {stat}: OpenTelemetry Timers are not yet
implemented.")
- return Timer()
+ """Timer context manager returns the duration and can be cancelled."""
+ return _OtelTimer(self, stat, tags)
class MetricsMap:
@@ -304,7 +330,7 @@ class MetricsMap:
if key in self.map.keys():
del self.map[key]
- def set_gauge_value(self, name: str, value: float, delta: bool, tags:
Attributes):
+ def set_gauge_value(self, name: str, value: float | None, delta: bool,
tags: Attributes):
"""
Overrides the last reading for a Gauge with a new value.
@@ -315,7 +341,7 @@ class MetricsMap:
:returns: None
"""
key: str = _generate_key_name(name, tags)
- new_value = value
+ new_value = value or DEFAULT_GAUGE_VALUE
old_value = self.poke_gauge(name, tags)
if delta:
new_value += old_value
diff --git a/tests/core/test_otel_logger.py b/tests/core/test_otel_logger.py
index dcffa2d60e..75f4ebdc80 100644
--- a/tests/core/test_otel_logger.py
+++ b/tests/core/test_otel_logger.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import logging
+import time
from unittest import mock
from unittest.mock import ANY
@@ -121,6 +122,7 @@ class TestOtelMetrics:
self.stats.incr(name)
assert self.map[full_name(name)].add.call_count == 2
+
self.meter.get_meter().create_counter.assert_called_once_with(name=full_name(name))
@mock.patch("random.random", side_effect=[0.1, 0.9])
def test_incr_with_rate_limit_works(self, mock_random, name):
@@ -227,13 +229,81 @@ class TestOtelMetrics:
assert self.map[full_name(name)].value == 1
- @mock.patch("warnings.warn")
- def test_timer_warns_not_implemented(self, mock_warn):
- class MessageContaining(str):
- def __eq__(self, other):
- return self in other
+ def test_timing_new_metric(self, name):
+ self.stats.timing(name, dt=123)
+
+ self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+ name=full_name(name), callbacks=ANY
+ )
+
+ def test_timing_new_metric_with_tags(self, name):
+ tags = {"hello": "world"}
+ key = _generate_key_name(full_name(name), tags)
+
+ self.stats.timing(name, dt=1, tags=tags)
+
+ self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+ name=full_name(name), callbacks=ANY
+ )
+ self.map[key].attributes == tags
+
+ def test_timing_existing_metric(self, name):
+ self.stats.timing(name, dt=1)
+ self.stats.timing(name, dt=2)
- with self.stats.timer():
- mock_warn.assert_called_once_with(
- MessageContaining("OpenTelemetry Timers are not yet
implemented.")
- )
+ self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+ name=full_name(name), callbacks=ANY
+ )
+ assert self.map[full_name(name)].value == 2
+
+ # For the four test_timer_foo tests below:
+ # time.perf_count() is called once to get the starting timestamp and
again
+ # to get the end timestamp. timer() should return the difference as a
float.
+
+ @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
+ def test_timer_with_name_returns_float_and_stores_value(self, mock_time,
name):
+ with self.stats.timer(name) as timer:
+ pass
+
+ assert isinstance(timer.duration, float)
+ assert timer.duration == 3.14
+ assert mock_time.call_count == 2
+ self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+ name=full_name(name), callbacks=ANY
+ )
+
+ @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
+ def test_timer_no_name_returns_float_but_does_not_store_value(self,
mock_time, name):
+ with self.stats.timer() as timer:
+ pass
+
+ assert isinstance(timer.duration, float)
+ assert timer.duration == 3.14
+ assert mock_time.call_count == 2
+ self.meter.get_meter().create_observable_gauge.assert_not_called()
+
+ @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
+ def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
+ timer = self.stats.timer(name)
+ timer.start()
+ # Perform some task
+ timer.stop(send=False)
+
+ assert isinstance(timer.duration, float)
+ assert timer.duration == 3.14
+ assert mock_time.call_count == 2
+ self.meter.get_meter().create_observable_gauge.assert_not_called()
+
+ @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
+ def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
+ timer = self.stats.timer(name)
+ timer.start()
+ # Perform some task
+ timer.stop(send=True)
+
+ assert isinstance(timer.duration, float)
+ assert timer.duration == 3.14
+ assert mock_time.call_count == 2
+ self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+ name=full_name(name), callbacks=ANY
+ )