ferruzzi commented on code in PR #43340:
URL: https://github.com/apache/airflow/pull/43340#discussion_r1819559003
##########
airflow/metrics/otel_logger.py:
##########
@@ -271,24 +277,33 @@ def gauge(
full_name(prefix=self.prefix, name=back_compat_name), value,
delta, tags
)
- if self.metrics_validator.test(stat):
- self.metrics_map.set_gauge_value(full_name(prefix=self.prefix,
name=stat), value, delta, tags)
+ full_metric_name = self.get_name(metric_name, tags)
+ if self.metrics_validator.test(full_metric_name):
Review Comment:
If you do not move the `name_is_otel_safe()` check into `get_name()` then it
looks like it was missing from this check and should be added here to match the
other methods.
##########
airflow/metrics/base_stats_logger.py:
##########
@@ -31,55 +33,53 @@ class StatsLogger(Protocol):
instance: StatsLogger | NoStatsLogger | None = None
- @classmethod
def incr(
cls,
- stat: str,
+ metric_name: str,
count: int = 1,
rate: int | float = 1,
*,
tags: dict[str, Any] | None = None,
) -> None:
- """Increment stat."""
+ """Increment metric_name."""
- @classmethod
def decr(
cls,
- stat: str,
+ metric_name: str,
count: int = 1,
rate: int | float = 1,
*,
tags: dict[str, Any] | None = None,
) -> None:
- """Decrement stat."""
+ """Decrement metric_name."""
- @classmethod
def gauge(
cls,
- stat: str,
+ metric_name: str,
value: float,
rate: int | float = 1,
delta: bool = False,
*,
tags: dict[str, Any] | None = None,
) -> None:
- """Gauge stat."""
+ """Gauge metric_name."""
- @classmethod
def timing(
cls,
- stat: str,
+ metric_name: str,
dt: DeltaType | None,
*,
tags: dict[str, Any] | None = None,
) -> None:
"""Stats timing."""
- @classmethod
def timer(cls, *args, **kwargs) -> TimerProtocol:
"""Timer metric that can be cancelled."""
raise NotImplementedError()
+ def get_name(self, metric_name: str, tags: Attributes | None = None) ->
str:
+ pass
Review Comment:
Consider raising `NotImplementedError()` instead of passing, as seen in
`timer()` above. This will force any implementations (StatsD, Datadog, OTel,
whatever comes later) to implement it.
##########
tests/core/test_otel_logger.py:
##########
@@ -356,3 +356,110 @@ def test_timer_start_and_stop_manually_send_true(self,
mock_time, metrics_consis
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
+
+ def test_incr_counter(self):
+ # Arrange
Review Comment:
I see your pattern and appreciate that you were taught this way, and may be
required in classes, but in the Airflow codebase the Arrange/Act/Assert
comments aren't needed. A blank line separating the sections of the test is
sufficient, "everyone" knows what the intent behind the three "sections" is
without adding extra lines of text to the code.
I had not seen AAA before, I kinda like that naming. I was taught it as
"Given, When, Then". Same concept, just different names.
##########
airflow/metrics/statsd_logger.py:
##########
@@ -44,27 +46,38 @@
log = logging.getLogger(__name__)
-def prepare_stat_with_tags(fn: T) -> T:
- """Add tags to stat with influxdb standard format if influxdb_tags_enabled
is True."""
+def prepare_metric_name_with_tags(fn: T) -> T:
+ """Add tags to metric_name with InfluxDB standard format if
influxdb_tags_enabled is True."""
@wraps(fn)
def wrapper(
- self, stat: str | None = None, *args, tags: dict[str, str] | None =
None, **kwargs
+ self, metric_name: str | None = None, tags: dict[str, str | int] |
None = None
) -> Callable[[str], str]:
- if self.influxdb_tags_enabled:
- if stat is not None and tags is not None:
- for k, v in tags.items():
- if self.metric_tags_validator.test(k):
- if all(c not in [",", "="] for c in f"{v}{k}"):
- stat += f",{k}={v}"
- else:
- log.error("Dropping invalid tag: %s=%s.", k, v)
- return fn(self, stat, *args, tags=tags, **kwargs)
+ # if not isinstance(self, SafeStatsdLogger): # Check if self is an
instance of SafeStatsdLogger
+ # raise TypeError("Expected an instance of SafeStatsdLogger")
+
+ if metric_name is None:
+ metric_name = ""
+
+ if self.influxdb_tags_enabled and tags is not None:
Review Comment:
```suggestion
if self.influxdb_tags_enabled and tags:
```
##########
tests/core/test_otel_logger.py:
##########
@@ -356,3 +356,110 @@ def test_timer_start_and_stop_manually_send_true(self,
mock_time, metrics_consis
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
+
Review Comment:
One of the difficulties of unit testing is trying to cover all the edge
cases. All of the tests you added are "happy cases" where things were
expected to work and we make sure that they did. Can you think of any way to
add negative tests where we make sure bad things are caught and handled as
expected? For example, what is expected to happen if we try to call
`get_name()` (for example when we create a new counter) with an invalid name?
Should `get_name()` return that invalid name? Should it log a warning?
Something else entirely?
What other ways could we break the new code, and does the code handle those
edge cases as expected?
Ideas:
Call `get_name()` directly with:
1. an invalid name. t
2. try it with and without tags,
3. etc.
In theory not including tags on the otel one shouldn't change anything, but
having that unit test is a form of regression testing, not just making sure
your code works now but ensuring nobody breaks it later on accident. So this
may seem like a silly test to include now, but it can save a lot of trouble
down the road.
##########
airflow/metrics/statsd_logger.py:
##########
@@ -44,27 +46,38 @@
log = logging.getLogger(__name__)
-def prepare_stat_with_tags(fn: T) -> T:
- """Add tags to stat with influxdb standard format if influxdb_tags_enabled
is True."""
+def prepare_metric_name_with_tags(fn: T) -> T:
+ """Add tags to metric_name with InfluxDB standard format if
influxdb_tags_enabled is True."""
@wraps(fn)
def wrapper(
- self, stat: str | None = None, *args, tags: dict[str, str] | None =
None, **kwargs
+ self, metric_name: str | None = None, tags: dict[str, str | int] |
None = None
Review Comment:
Did you run into a case where this `int` needed to be added? Leaving this
as a `str` doesn't seem like a bad idea at face value.
##########
airflow/metrics/base_stats_logger.py:
##########
@@ -31,55 +33,53 @@ class StatsLogger(Protocol):
instance: StatsLogger | NoStatsLogger | None = None
- @classmethod
Review Comment:
Why are we removing @classmethod from all of these? Seems unusual to remove
the decorator but leave the `cls` parameter.
##########
airflow/metrics/statsd_logger.py:
##########
@@ -44,27 +46,38 @@
log = logging.getLogger(__name__)
-def prepare_stat_with_tags(fn: T) -> T:
- """Add tags to stat with influxdb standard format if influxdb_tags_enabled
is True."""
+def prepare_metric_name_with_tags(fn: T) -> T:
+ """Add tags to metric_name with InfluxDB standard format if
influxdb_tags_enabled is True."""
@wraps(fn)
def wrapper(
- self, stat: str | None = None, *args, tags: dict[str, str] | None =
None, **kwargs
+ self, metric_name: str | None = None, tags: dict[str, str | int] |
None = None
) -> Callable[[str], str]:
- if self.influxdb_tags_enabled:
- if stat is not None and tags is not None:
- for k, v in tags.items():
- if self.metric_tags_validator.test(k):
- if all(c not in [",", "="] for c in f"{v}{k}"):
- stat += f",{k}={v}"
- else:
- log.error("Dropping invalid tag: %s=%s.", k, v)
- return fn(self, stat, *args, tags=tags, **kwargs)
+ # if not isinstance(self, SafeStatsdLogger): # Check if self is an
instance of SafeStatsdLogger
+ # raise TypeError("Expected an instance of SafeStatsdLogger")
+
+ if metric_name is None:
+ metric_name = ""
+
+ if self.influxdb_tags_enabled and tags is not None:
+ valid_tags: dict[str, str | int] = {}
+ for k, v in tags.items():
Review Comment:
Rather than casting key and value to strings and storing their new values in
separate variables, you could cast the whole dict in place before this loop
using a comprehension:
```
tags = {str(k):str(v) for k, v in tags.items()}
```
then just use k and v directly in this loop since you know they are already
strings.
##########
airflow/metrics/otel_logger.py:
##########
@@ -209,22 +210,26 @@ def incr(
if count < 0:
raise ValueError("count must be a positive value.")
- if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
- counter =
self.metrics_map.get_counter(full_name(prefix=self.prefix, name=stat),
attributes=tags)
+ full_metric_name = self.get_name(metric_name, tags)
+
+ if self.metrics_validator.test(full_metric_name) and
name_is_otel_safe(self.prefix, full_metric_name):
Review Comment:
(Here and below) Is there a reason not to move the `name_is_otel_safe()`
check into the otel implementation of `get_name()`? Why have `get_name` return
a name which is not safe to use? That would also reduce the duplicate code in
each of these methods.
##########
airflow/metrics/statsd_logger.py:
##########
@@ -44,27 +46,38 @@
log = logging.getLogger(__name__)
-def prepare_stat_with_tags(fn: T) -> T:
- """Add tags to stat with influxdb standard format if influxdb_tags_enabled
is True."""
+def prepare_metric_name_with_tags(fn: T) -> T:
+ """Add tags to metric_name with InfluxDB standard format if
influxdb_tags_enabled is True."""
@wraps(fn)
def wrapper(
- self, stat: str | None = None, *args, tags: dict[str, str] | None =
None, **kwargs
+ self, metric_name: str | None = None, tags: dict[str, str | int] |
None = None
) -> Callable[[str], str]:
- if self.influxdb_tags_enabled:
- if stat is not None and tags is not None:
- for k, v in tags.items():
- if self.metric_tags_validator.test(k):
- if all(c not in [",", "="] for c in f"{v}{k}"):
- stat += f",{k}={v}"
- else:
- log.error("Dropping invalid tag: %s=%s.", k, v)
- return fn(self, stat, *args, tags=tags, **kwargs)
+ # if not isinstance(self, SafeStatsdLogger): # Check if self is an
instance of SafeStatsdLogger
+ # raise TypeError("Expected an instance of SafeStatsdLogger")
Review Comment:
I think you meant to remove this?
##########
airflow/metrics/statsd_logger.py:
##########
@@ -44,27 +46,38 @@
log = logging.getLogger(__name__)
-def prepare_stat_with_tags(fn: T) -> T:
- """Add tags to stat with influxdb standard format if influxdb_tags_enabled
is True."""
+def prepare_metric_name_with_tags(fn: T) -> T:
+ """Add tags to metric_name with InfluxDB standard format if
influxdb_tags_enabled is True."""
@wraps(fn)
def wrapper(
- self, stat: str | None = None, *args, tags: dict[str, str] | None =
None, **kwargs
+ self, metric_name: str | None = None, tags: dict[str, str | int] |
None = None
) -> Callable[[str], str]:
- if self.influxdb_tags_enabled:
- if stat is not None and tags is not None:
- for k, v in tags.items():
- if self.metric_tags_validator.test(k):
- if all(c not in [",", "="] for c in f"{v}{k}"):
- stat += f",{k}={v}"
- else:
- log.error("Dropping invalid tag: %s=%s.", k, v)
- return fn(self, stat, *args, tags=tags, **kwargs)
+ # if not isinstance(self, SafeStatsdLogger): # Check if self is an
instance of SafeStatsdLogger
+ # raise TypeError("Expected an instance of SafeStatsdLogger")
+
+ if metric_name is None:
+ metric_name = ""
+
+ if self.influxdb_tags_enabled and tags is not None:
+ valid_tags: dict[str, str | int] = {}
+ for k, v in tags.items():
+ if self.metric_tags_validator.test(k):
+ value_str = str(v)
+ key_str = str(k)
+ if all(c not in [",", "="] for c in value_str) and all(
+ c not in [",", "="] for c in key_str
+ ):
+ valid_tags[k] = value_str
Review Comment:
If you are going to keep using the case `key_str` value then I think that
should likely be used here as well instead of the raw `k`, no?
##########
tests/core/test_otel_logger.py:
##########
@@ -356,3 +356,110 @@ def test_timer_start_and_stop_manually_send_true(self,
mock_time, metrics_consis
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
+
+ def test_incr_counter(self):
+ # Arrange
+ logger = self.stats
+ metric_name = "test_metric"
+ count = 5
+ tags = {"env": "prod"}
+
+ # Act
+ logger.incr(metric_name, count=count, tags=tags)
+
+ # Assert
+ counter = logger.metrics_map.get_counter(
+ full_name(prefix=logger.prefix, name=metric_name), attributes=tags
+ )
+ counter.add.assert_called_with(count, attributes=tags)
+
+ def test_decr_counter(self):
+ # Arrange
+ logger = self.stats
+ metric_name = "test_metric"
+ count = 3
+ tags = {"env": "prod"}
+
+ # Act
+ logger.decr(metric_name, count=count, tags=tags)
+
+ # Assert
+ counter = logger.metrics_map.get_counter(
+ full_name(prefix=logger.prefix, name=metric_name), attributes=tags
+ )
+ counter.add.assert_called_with(-count, attributes=tags)
+
+ @pytest.mark.parametrize("expected_duration", [2.5, 1.0, 3.0])
+ def test_timing(self, expected_duration):
+ # Arrange
+ logger = self.stats
+ metric_name = "test_metric"
+ tags = {"env": "prod"}
+
+ # Mocking time.perf_counter to simulate timing
+ with mock.patch.object(time, "perf_counter", side_effect=[0.0,
expected_duration]):
+ with logger.timer(metric_name, tags=tags) as timer:
+ pass
+
+ # Assert
+ assert isinstance(timer.duration, float)
+ assert timer.duration >= expected_duration - 0.1
+ assert timer.duration <= expected_duration + 0.1
Review Comment:
Non-blocking Nitpick: `0.1` is a "magic number", consider creating a
variable called "acceptable_margin" or something like that.
##########
tests/core/test_otel_logger.py:
##########
@@ -356,3 +356,110 @@ def test_timer_start_and_stop_manually_send_true(self,
mock_time, metrics_consis
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
+
+ def test_incr_counter(self):
+ # Arrange
+ logger = self.stats
Review Comment:
Nitpick: the term "logger" is a bit overloaded in Airflow and this could be
misread as logger meaning the console like log.error(). Consider renaming this
(here and below) to `otel` or `otel_logger` just to help clarify, or just using
`self.stats` directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]