ferruzzi commented on code in PR #43340:
URL: https://github.com/apache/airflow/pull/43340#discussion_r1821598308


##########
airflow/metrics/otel_logger.py:
##########
@@ -271,24 +274,29 @@ def gauge(
                 full_name(prefix=self.prefix, name=back_compat_name), value, 
delta, tags
             )
 
-        if self.metrics_validator.test(stat):
-            self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, 
name=stat), value, delta, tags)
+        if self.metrics_validator.test(metric_name):
+            full_metric_name = self.get_name(metric_name, tags)
+            self.metrics_map.set_gauge_value(full_metric_name, value, delta, 
tags)
 
     def timing(
         self,
-        stat: str,
-        dt: DeltaType,
+        metric_name: str,
+        dt: DeltaType | None,
         *,
         tags: Attributes = None,
     ) -> None:
         """OTel does not have a native timer, stored as a Gauge whose value is 
number of seconds elapsed."""
-        if self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat):
+        if dt is None:
+            log.warning("The duration (dt) cannot be None. Skipping timing 
update.")
+            return

Review Comment:
   Can you explain this addition for me?



##########
airflow/metrics/statsd_logger.py:
##########
@@ -44,27 +46,33 @@
 log = logging.getLogger(__name__)
 
 
-def prepare_stat_with_tags(fn: T) -> T:
-    """Add tags to stat with influxdb standard format if influxdb_tags_enabled 
is True."""
+def prepare_metric_name_with_tags(fn: T) -> T:
+    """Add tags to metric_name with InfluxDB standard format if 
influxdb_tags_enabled is True."""
 
     @wraps(fn)
     def wrapper(
-        self, stat: str | None = None, *args, tags: dict[str, str] | None = 
None, **kwargs
+        self, metric_name: str | None = None, tags: dict[str, str] | None = 
None
     ) -> Callable[[str], str]:
-        if self.influxdb_tags_enabled:
-            if stat is not None and tags is not None:
-                for k, v in tags.items():
-                    if self.metric_tags_validator.test(k):
-                        if all(c not in [",", "="] for c in f"{v}{k}"):
-                            stat += f",{k}={v}"
-                        else:
-                            log.error("Dropping invalid tag: %s=%s.", k, v)
-        return fn(self, stat, *args, tags=tags, **kwargs)
+        if metric_name is None:
+            metric_name = ""

Review Comment:
   Purely a style nitpick, your call.  Python's ternary assignment would be 
handy here:
   ```suggestion
           metric_name = metric_name or ""
   ```



##########
airflow/metrics/statsd_logger.py:
##########
@@ -51,26 +51,22 @@ def prepare_metric_name_with_tags(fn: T) -> T:
 
     @wraps(fn)
     def wrapper(
-        self, metric_name: str | None = None, tags: dict[str, str | int] | 
None = None
+        self, metric_name: str | None = None, tags: dict[str, str] | None = 
None

Review Comment:
   We've gone back and forth on this one a couple times.  Can you confirm that 
it makes sense how it is now, or did I confuse you and you are just appeasing 
me?  If you need to chat about it, hit me up on Slack and we can talk it 
through.



##########
airflow/metrics/statsd_logger.py:
##########
@@ -44,27 +46,33 @@
 log = logging.getLogger(__name__)
 
 
-def prepare_stat_with_tags(fn: T) -> T:
-    """Add tags to stat with influxdb standard format if influxdb_tags_enabled 
is True."""
+def prepare_metric_name_with_tags(fn: T) -> T:
+    """Add tags to metric_name with InfluxDB standard format if 
influxdb_tags_enabled is True."""
 
     @wraps(fn)
     def wrapper(
-        self, stat: str | None = None, *args, tags: dict[str, str] | None = 
None, **kwargs
+        self, metric_name: str | None = None, tags: dict[str, str] | None = 
None
     ) -> Callable[[str], str]:
-        if self.influxdb_tags_enabled:
-            if stat is not None and tags is not None:
-                for k, v in tags.items():
-                    if self.metric_tags_validator.test(k):
-                        if all(c not in [",", "="] for c in f"{v}{k}"):
-                            stat += f",{k}={v}"
-                        else:
-                            log.error("Dropping invalid tag: %s=%s.", k, v)
-        return fn(self, stat, *args, tags=tags, **kwargs)
+        if metric_name is None:
+            metric_name = ""
+
+        if self.influxdb_tags_enabled and tags:
+            valid_tags: dict[str, str] = {}
+
+            for k, v in tags.items():
+                if self.metric_tags_validator.test(k):
+                    if all(c not in [",", "="] for c in v) and all(c not in 
[",", "="] for c in k):

Review Comment:
   ```suggestion
                      if all(c not in [",", "="] for c in f"{v}{k}"):
   ```
   Since we are checking if those symbols are in either k or v, then checking 
if it is in f'{k}{v}' works just fine and looks much neater.



##########
airflow/metrics/statsd_logger.py:
##########
@@ -79,80 +87,84 @@ def __init__(
         self.influxdb_tags_enabled = influxdb_tags_enabled
         self.metric_tags_validator = metric_tags_validator
 
-    @prepare_stat_with_tags
-    @validate_stat
     def incr(
         self,
-        stat: str,
+        metric_name: str,
         count: int = 1,
         rate: float = 1,
         *,
         tags: dict[str, str] | None = None,
     ) -> None:
         """Increment stat."""
-        if self.metrics_validator.test(stat):
-            return self.statsd.incr(stat, count, rate)
+        full_metric_name = self.get_name(metric_name, tags)
+
+        if self.metrics_validator.test(full_metric_name):
+            self.statsd.incr(full_metric_name, count, rate)
         return None
 
-    @prepare_stat_with_tags
-    @validate_stat
     def decr(
         self,
-        stat: str,
+        metric_name: str,
         count: int = 1,
         rate: float = 1,
         *,
         tags: dict[str, str] | None = None,
     ) -> None:
         """Decrement stat."""
-        if self.metrics_validator.test(stat):
-            return self.statsd.decr(stat, count, rate)
+        full_metric_name = self.get_name(metric_name, tags)
+        if self.metrics_validator.test(full_metric_name):
+            return self.statsd.decr(full_metric_name, count, rate)
         return None
 
-    @prepare_stat_with_tags
-    @validate_stat
     def gauge(
         self,
-        stat: str,
+        metric_name: str,
         value: int | float,
         rate: float = 1,
         delta: bool = False,
         *,
         tags: dict[str, str] | None = None,
     ) -> None:
         """Gauge stat."""
-        if self.metrics_validator.test(stat):
-            return self.statsd.gauge(stat, value, rate, delta)
+        full_metric_name = self.get_name(metric_name, tags)
+        if self.metrics_validator.test(full_metric_name):
+            return self.statsd.gauge(full_metric_name, value, rate, delta)
         return None
 
-    @prepare_stat_with_tags
-    @validate_stat
     def timing(
         self,
-        stat: str,
-        dt: DeltaType,
+        metric_name: str,
+        dt: DeltaType | None,
         *,
         tags: dict[str, str] | None = None,
     ) -> None:
         """Stats timing."""
-        if self.metrics_validator.test(stat):
-            return self.statsd.timing(stat, dt)
+        full_metric_name = self.get_name(metric_name, tags)
+        if self.metrics_validator.test(full_metric_name):
+            return self.statsd.timing(full_metric_name, dt)
         return None
 
-    @prepare_stat_with_tags
-    @validate_stat
     def timer(
         self,
-        stat: str | None = None,
+        metric_name: str | None = None,
         *args,
         tags: dict[str, str] | None = None,
         **kwargs,
     ) -> TimerProtocol:
         """Timer metric that can be cancelled."""
-        if stat and self.metrics_validator.test(stat):
-            return Timer(self.statsd.timer(stat, *args, **kwargs))
+        full_metric_name = self.get_name(metric_name, tags)
+        if full_metric_name and self.metrics_validator.test(full_metric_name):
+            return Timer(self.statsd.timer(full_metric_name, *args, **kwargs))
         return Timer()
 
+    @prepare_metric_name_with_tags
+    @validate_stat
+    def get_name(self, metric_name: str, tags: Attributes | None = None) -> 
str:
+        """Get metric name with tags, ensuring no invalid keys are included."""
+        if tags:
+            return f"{metric_name},{','.join(f'{k}={v}' for k, v in 
tags.items())}"

Review Comment:
   Please verify this join.    I seem to remember this should be joined on an 
underscore, not an equals sign?



##########
airflow/metrics/otel_logger.py:
##########
@@ -300,6 +308,18 @@ def timer(
         """Timer context manager returns the duration and can be cancelled."""
         return _OtelTimer(self, stat, tags)
 
+    def get_name(self, metric_name: str, tags: Attributes | None = None) -> 
str:
+        """Generate an OTel-safe metric name with the prefix and delimiter."""
+        if not metric_name:
+            raise InvalidStatsNameException("The stat name cannot be None or 
an empty string.")
+
+        base_metric_name = metric_name

Review Comment:
   Is there a reason not to use `metric_name` directly now that this method has 
been cleaned up?  It appears like you are not modifying either version of the 
parameter, so there shouldn't be a need to make a copy of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to