o-nikolas commented on code in PR #31725:
URL: https://github.com/apache/airflow/pull/31725#discussion_r1223380658


##########
airflow/metrics/otel_logger.py:
##########
@@ -83,10 +95,45 @@ def name_is_otel_safe(prefix: str, name: str) -> bool:
     return bool(stat_name_otel_handler(prefix, name, 
max_length=OTEL_NAME_MAX_LENGTH))
 
 
+def _type_as_str(obj: Instrument) -> str:
+    """
+    Given an OpenTelemetry Instrument, returns the type of instrument as a 
string.
+
+    type() will return something like: <class 
'opentelemetry.sdk.metrics._internal.instrument._Counter'>
+    This extracts that down to just "Counter" (or "Gauge" or whatever) for 
cleaner logging purposes.

Review Comment:
   ```suggestion
       This extracts that down to just "Counter" (or "Gauge", etc) for cleaner 
logging purposes.
   ```



##########
tests/core/test_otel_logger.py:
##########
@@ -173,13 +171,54 @@ def test_decr_with_rate_limit_works(self, mock_random, 
name):
         self.map[full_name(name)].add.assert_has_calls(expected_calls)
         self.map[full_name(name)].add.call_count == 2
 
-    @mock.patch("warnings.warn")
-    def test_timer_warns_not_implemented(self, mock_warn):
-        with self.stats.timer():
-            mock_warn.assert_called_once_with("OpenTelemetry Timers are not 
yet implemented.")
+    def test_gauge_new_metric(self, name):
+        self.stats.gauge(name, value=1)
+
+        self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+            name=full_name(name), callbacks=ANY
+        )
+
+    def test_gauge_new_metric_with_tags(self, name):
+        tags = {"hello": "world"}
+        key = _generate_key_name(full_name(name), tags)
+
+        self.stats.gauge(name, value=1, tags=tags)
+
+        self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+            name=full_name(name), callbacks=ANY
+        )
+        self.map[key].attributes == tags
+
+    def test_gauge_existing_metric(self, name):
+        self.stats.gauge(name, value=1)
+        self.stats.gauge(name, value=2)
+
+        self.meter.get_meter().create_observable_gauge.assert_called_once_with(
+            name=full_name(name), callbacks=ANY
+        )
+        assert self.map[full_name(name)].value == 2
+
+    @mock.patch("random.random", side_effect=[0.1, 0.9])
+    @mock.patch.object(MetricsMap, "set_value")
+    def test_gauge_with_rate_limit_works(self, mock_set_value, mock_random, 
name):
+        # Create the counter and set the value to 1

Review Comment:
   ```suggestion
           # Create the gauge and set the value to 1
   ```



##########
airflow/metrics/otel_logger.py:
##########
@@ -235,6 +301,57 @@ def del_counter(self, name: str, attributes: Attributes = 
None) -> None:
         if key in self.map.keys():
             del self.map[key]
 
+    def set_value(self, name: str, value: float, delta: bool, tags: 
Attributes):

Review Comment:
   minor, all the other methods are `<something>_gauge`, I feel like this one 
should follow suit?
   ```suggestion
       def set_gauge(self, name: str, value: float, delta: bool, tags: 
Attributes):
   ```



##########
tests/core/test_otel_logger.py:
##########
@@ -173,13 +171,54 @@ def test_decr_with_rate_limit_works(self, mock_random, 
name):
         self.map[full_name(name)].add.assert_has_calls(expected_calls)
         self.map[full_name(name)].add.call_count == 2
 
-    @mock.patch("warnings.warn")
-    def test_timer_warns_not_implemented(self, mock_warn):
-        with self.stats.timer():
-            mock_warn.assert_called_once_with("OpenTelemetry Timers are not 
yet implemented.")
+    def test_gauge_new_metric(self, name):

Review Comment:
   Should there be a test for getting the value of a gauge? All the tests below 
seem to be about setting/creating them.



##########
airflow/metrics/otel_logger.py:
##########
@@ -83,10 +95,45 @@ def name_is_otel_safe(prefix: str, name: str) -> bool:
     return bool(stat_name_otel_handler(prefix, name, 
max_length=OTEL_NAME_MAX_LENGTH))
 
 
+def _type_as_str(obj: Instrument) -> str:
+    """
+    Given an OpenTelemetry Instrument, returns the type of instrument as a 
string.

Review Comment:
   super nit:
   ```suggestion
       Given an OpenTelemetry Instrument, returns the type of the instrument as 
a string.
   ```



##########
airflow/metrics/otel_logger.py:
##########
@@ -235,6 +301,57 @@ def del_counter(self, name: str, attributes: Attributes = 
None) -> None:
         if key in self.map.keys():
             del self.map[key]
 
+    def set_value(self, name: str, value: float, delta: bool, tags: 
Attributes):
+        """
+        Overrides the last reading for a Gauge with a new value.
+
+        :param name: The name of the gauge to record.
+        :param value: The new reading to record.
+        :param delta: If True, value is added to the previous reading, else it 
overrides.
+        :param tags: Gauge attributes which were used to generate a unique key 
to store the counter.
+        :returns: None
+        """
+        key: str = _generate_key_name(name, tags)
+        old_value = self.poke_gauge(name, tags)
+        # If delta is true, add the new value to the last reading otherwise 
overwrite it.
+        self.map[key] = Observation(value + (delta * old_value), tags)

Review Comment:
   While neat, I find this a little obtuse/esoteric. I'd just go with a more 
straightforward if-statement or ternary operator so that it is more readable 
for future folks. Or at the very least a comment above this line explaining it. 



##########
airflow/metrics/otel_logger.py:
##########
@@ -154,9 +201,33 @@ def gauge(
         delta: bool = False,
         *,
         tags: Attributes = None,
+        back_compat_name: str = "",
     ) -> None:
-        warnings.warn("OpenTelemetry Gauges are not yet implemented.")
-        return None
+        """
+        Record a new value for a Gauge.
+
+        :param stat: The name of the stat to update.
+        :param value: The new value of stat, either a float or an int.
+        :param rate: value between 0 and 1 that represents the sample rate at
+            which the metric is going to be emitted.
+        :param delta: If true, the provided value will be added to the 
previous value.
+            If False the new value will override the previous.
+        :param tags: Tags to append to the stat.
+        :param back_compat_name:  If an alternative name is provided, the
+            stat will be emitted using both names if possible.
+        """
+        if rate < 0:
+            raise ValueError("rate must be a positive value.")
+        if rate < 1 and random.random() > rate:
+            return

Review Comment:
   Probably worth factoring this out into a helper instead of reimplementing it 
twice (soon to be three with timers)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to