ferruzzi commented on code in PR #43340:
URL: https://github.com/apache/airflow/pull/43340#discussion_r1821598308
##########
airflow/metrics/otel_logger.py:
##########
@@ -271,24 +274,29 @@ def gauge(
full_name(prefix=self.prefix, name=back_compat_name), value,
delta, tags
)
- if self.metrics_validator.test(stat):
- self.metrics_map.set_gauge_value(full_name(prefix=self.prefix,
name=stat), value, delta, tags)
+ if self.metrics_validator.test(metric_name):
+ full_metric_name = self.get_name(metric_name, tags)
+ self.metrics_map.set_gauge_value(full_metric_name, value, delta,
tags)
def timing(
self,
- stat: str,
- dt: DeltaType,
+ metric_name: str,
+ dt: DeltaType | None,
*,
tags: Attributes = None,
) -> None:
"""OTel does not have a native timer, stored as a Gauge whose value is
number of seconds elapsed."""
- if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
+ if dt is None:
+ log.warning("The duration (dt) cannot be None. Skipping timing
update.")
+ return
Review Comment:
Can you explain this addition for me?
##########
airflow/metrics/statsd_logger.py:
##########
@@ -44,27 +46,33 @@
log = logging.getLogger(__name__)
-def prepare_stat_with_tags(fn: T) -> T:
- """Add tags to stat with influxdb standard format if influxdb_tags_enabled
is True."""
+def prepare_metric_name_with_tags(fn: T) -> T:
+ """Add tags to metric_name with InfluxDB standard format if
influxdb_tags_enabled is True."""
@wraps(fn)
def wrapper(
- self, stat: str | None = None, *args, tags: dict[str, str] | None =
None, **kwargs
+ self, metric_name: str | None = None, tags: dict[str, str] | None =
None
) -> Callable[[str], str]:
- if self.influxdb_tags_enabled:
- if stat is not None and tags is not None:
- for k, v in tags.items():
- if self.metric_tags_validator.test(k):
- if all(c not in [",", "="] for c in f"{v}{k}"):
- stat += f",{k}={v}"
- else:
- log.error("Dropping invalid tag: %s=%s.", k, v)
- return fn(self, stat, *args, tags=tags, **kwargs)
+ if metric_name is None:
+ metric_name = ""
Review Comment:
Purely a style nitpick, your call. Python's ternary assignment would be
handy here:
```suggestion
metric_name = metric_name or ""
```
##########
airflow/metrics/statsd_logger.py:
##########
@@ -51,26 +51,22 @@ def prepare_metric_name_with_tags(fn: T) -> T:
@wraps(fn)
def wrapper(
- self, metric_name: str | None = None, tags: dict[str, str | int] |
None = None
+ self, metric_name: str | None = None, tags: dict[str, str] | None =
None
Review Comment:
We've gone back and forth on this one a couple times. Can you confirm that
it makes sense how it is now, or did I confuse you and you are just appeasing
me? If you need to chat about it, hit me up on Slack and we can talk it
through.
##########
airflow/metrics/statsd_logger.py:
##########
@@ -44,27 +46,33 @@
log = logging.getLogger(__name__)
-def prepare_stat_with_tags(fn: T) -> T:
- """Add tags to stat with influxdb standard format if influxdb_tags_enabled
is True."""
+def prepare_metric_name_with_tags(fn: T) -> T:
+ """Add tags to metric_name with InfluxDB standard format if
influxdb_tags_enabled is True."""
@wraps(fn)
def wrapper(
- self, stat: str | None = None, *args, tags: dict[str, str] | None =
None, **kwargs
+ self, metric_name: str | None = None, tags: dict[str, str] | None =
None
) -> Callable[[str], str]:
- if self.influxdb_tags_enabled:
- if stat is not None and tags is not None:
- for k, v in tags.items():
- if self.metric_tags_validator.test(k):
- if all(c not in [",", "="] for c in f"{v}{k}"):
- stat += f",{k}={v}"
- else:
- log.error("Dropping invalid tag: %s=%s.", k, v)
- return fn(self, stat, *args, tags=tags, **kwargs)
+ if metric_name is None:
+ metric_name = ""
+
+ if self.influxdb_tags_enabled and tags:
+ valid_tags: dict[str, str] = {}
+
+ for k, v in tags.items():
+ if self.metric_tags_validator.test(k):
+ if all(c not in [",", "="] for c in v) and all(c not in
[",", "="] for c in k):
Review Comment:
```suggestion
if all(c not in [",", "="] for c in f"{v}{k}"):
```
Since we are checking if those symbols are in either k or v, then checking
if it is in f'{k}{v}' works just fine and looks much neater.
##########
airflow/metrics/statsd_logger.py:
##########
@@ -79,80 +87,84 @@ def __init__(
self.influxdb_tags_enabled = influxdb_tags_enabled
self.metric_tags_validator = metric_tags_validator
- @prepare_stat_with_tags
- @validate_stat
def incr(
self,
- stat: str,
+ metric_name: str,
count: int = 1,
rate: float = 1,
*,
tags: dict[str, str] | None = None,
) -> None:
"""Increment stat."""
- if self.metrics_validator.test(stat):
- return self.statsd.incr(stat, count, rate)
+ full_metric_name = self.get_name(metric_name, tags)
+
+ if self.metrics_validator.test(full_metric_name):
+ self.statsd.incr(full_metric_name, count, rate)
return None
- @prepare_stat_with_tags
- @validate_stat
def decr(
self,
- stat: str,
+ metric_name: str,
count: int = 1,
rate: float = 1,
*,
tags: dict[str, str] | None = None,
) -> None:
"""Decrement stat."""
- if self.metrics_validator.test(stat):
- return self.statsd.decr(stat, count, rate)
+ full_metric_name = self.get_name(metric_name, tags)
+ if self.metrics_validator.test(full_metric_name):
+ return self.statsd.decr(full_metric_name, count, rate)
return None
- @prepare_stat_with_tags
- @validate_stat
def gauge(
self,
- stat: str,
+ metric_name: str,
value: int | float,
rate: float = 1,
delta: bool = False,
*,
tags: dict[str, str] | None = None,
) -> None:
"""Gauge stat."""
- if self.metrics_validator.test(stat):
- return self.statsd.gauge(stat, value, rate, delta)
+ full_metric_name = self.get_name(metric_name, tags)
+ if self.metrics_validator.test(full_metric_name):
+ return self.statsd.gauge(full_metric_name, value, rate, delta)
return None
- @prepare_stat_with_tags
- @validate_stat
def timing(
self,
- stat: str,
- dt: DeltaType,
+ metric_name: str,
+ dt: DeltaType | None,
*,
tags: dict[str, str] | None = None,
) -> None:
"""Stats timing."""
- if self.metrics_validator.test(stat):
- return self.statsd.timing(stat, dt)
+ full_metric_name = self.get_name(metric_name, tags)
+ if self.metrics_validator.test(full_metric_name):
+ return self.statsd.timing(full_metric_name, dt)
return None
- @prepare_stat_with_tags
- @validate_stat
def timer(
self,
- stat: str | None = None,
+ metric_name: str | None = None,
*args,
tags: dict[str, str] | None = None,
**kwargs,
) -> TimerProtocol:
"""Timer metric that can be cancelled."""
- if stat and self.metrics_validator.test(stat):
- return Timer(self.statsd.timer(stat, *args, **kwargs))
+ full_metric_name = self.get_name(metric_name, tags)
+ if full_metric_name and self.metrics_validator.test(full_metric_name):
+ return Timer(self.statsd.timer(full_metric_name, *args, **kwargs))
return Timer()
+ @prepare_metric_name_with_tags
+ @validate_stat
+ def get_name(self, metric_name: str, tags: Attributes | None = None) ->
str:
+ """Get metric name with tags, ensuring no invalid keys are included."""
+ if tags:
+ return f"{metric_name},{','.join(f'{k}={v}' for k, v in
tags.items())}"
Review Comment:
Please verify this join. I seem to remember this should be joined on an
underscore, not an equals sign?
##########
airflow/metrics/otel_logger.py:
##########
@@ -300,6 +308,18 @@ def timer(
"""Timer context manager returns the duration and can be cancelled."""
return _OtelTimer(self, stat, tags)
+ def get_name(self, metric_name: str, tags: Attributes | None = None) ->
str:
+ """Generate an OTel-safe metric name with the prefix and delimiter."""
+ if not metric_name:
+ raise InvalidStatsNameException("The stat name cannot be None or
an empty string.")
+
+ base_metric_name = metric_name
Review Comment:
Is there a reason not to use `metric_name` directly now that this method has
been cleaned up? It appears like you are not modifying either version of the
parameter, so there shouldn't be a need to make a copy of it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]