potiuk commented on code in PR #30873: URL: https://github.com/apache/airflow/pull/30873#discussion_r1209729139
########## airflow/metrics/otel_logger.py: ########## @@ -0,0 +1,289 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import random +import warnings +from typing import Callable + +from opentelemetry import metrics +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter, PeriodicExportingMetricReader +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.util.types import Attributes + +from airflow.configuration import conf +from airflow.exceptions import InvalidStatsNameException +from airflow.metrics.protocols import DeltaType, Timer, TimerProtocol +from airflow.metrics.validators import AllowListValidator, stat_name_default_handler, validate_stat + +log = logging.getLogger(__name__) + + +# "airflow.dag_processing.processes" is currently the only UDC used in Airflow. If more are added, +# we should add a better system for this. +# +# Generally in OTel a Counter is monotonic (can only go up) and there is an UpDownCounter which, +# as you can guess, is non-monotonic; it can go up or down. The choice here is to either drop +# this one metric and implement the rest as monotonic Counters, implement all counters as +# UpDownCounters, or add a bit of logic to do it intelligently. The catch is that the Collector +# which transmits these metrics to the upstream dashboard tools (Prometheus, Grafana, etc.) assigns +# the type of Gauge to any UDC instead of Counter. Adding this logic feels like the best compromise +# where normal Counters still get typed correctly, and we don't lose an existing metric. +# See: +# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#counter-creation +# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#updowncounter + +UP_DOWN_COUNTERS = {"airflow.dag_processing.processes"} +OTEL_NAME_MAX_LENGTH = 63 +METRIC_NAME_PREFIX = "airflow." + + +def _is_up_down_counter(name): + return name in UP_DOWN_COUNTERS + + +def _generate_key_name(name: str, attributes: Attributes = None): + if attributes: + key = name + for item in attributes.items(): + key += f"_{item[0]}_{item[1]}" + else: + key = name + + return key + + +def name_is_otel_safe(prefix: str, name: str) -> bool: + """ + Returns true is the provided name and prefix would result in a name compatible with Open Telemetry. + Legal names are defined here: + https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax + """ + + def test(value) -> bool: + return bool(stat_name_default_handler(value, max_length=OTEL_NAME_MAX_LENGTH)) + + try: + if not name: + raise InvalidStatsNameException + test(name) + if prefix: + test(prefix) + test(f"{prefix}.{name}") + return True + except InvalidStatsNameException: + log.exception( + f"Invalid stat name: {prefix}.{name}. Please see " + f"https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax" + ) + return False + + +class SafeOtelLogger: + """Otel Logger""" + + def __init__(self, otel_provider, prefix: str = "airflow", allow_list_validator=AllowListValidator()): + self.otel: Callable = otel_provider + self.prefix: str = prefix + self.metrics_validator = allow_list_validator + self.meter = otel_provider.get_meter(__name__) + self.metrics_map = MetricsMap(self.meter) + + def incr( + self, + stat: str, + count: int = 1, + rate: float = 1, + tags: Attributes = None, + ): + """ + Increment stat by count. + + :param stat: The name of the stat to increment. + :param count: A positive integer to add to the current value of stat. + :param rate: value between 0 and 1 that represents the sampled rate at + which the metric is going to be emitted. + :param tags: Tags to append to the stat. + """ + if (count < 0) or (rate < 0): + raise ValueError("count and rate must both be positive values.") + if rate < 1 and random.random() > rate: + return + + if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): + counter = self.metrics_map.get_counter(f"{self.prefix}.{stat}", attributes=tags) + counter.add(count, attributes=tags) + return counter + + def decr( + self, + stat: str, + count: int = 1, + rate: float = 1, + tags: Attributes = None, + ): + """ + Decrement stat by count. + + :param stat: The name of the stat to decrement. + :param count: A positive integer to subtract from current value of stat. + :param rate: value between 0 and 1 that represents the sampled rate at + which the metric is going to be emitted. + :param tags: Tags to append to the stat. + """ + if (count < 0) or (rate < 0): + raise ValueError("count and rate must both be positive values.") + if rate < 1 and random.random() > rate: + return + + if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): + counter = self.metrics_map.get_counter(f"{self.prefix}.{stat}") + counter.add(-count, attributes=tags) + return counter + + @validate_stat + def gauge( + self, + stat: str, + value: int | float, + rate: float = 1, + delta: bool = False, + *, + tags: Attributes = None, + ) -> None: + warnings.warn("OpenTelemetry Gauges are not yet implemented.") + return None + + @validate_stat + def timing( + self, + stat: str, + dt: DeltaType, + *, + tags: Attributes = None, + ) -> None: + warnings.warn("OpenTelemetry Timers are not yet implemented.") + return None + + @validate_stat + def timer( + self, + stat: str | None = None, + *args, + tags: Attributes = None, + **kwargs, + ) -> TimerProtocol: + warnings.warn("OpenTelemetry Timers are not yet implemented.") + return Timer() + + +class MetricsMap: + """Stores Otel Instruments.""" + + def __init__(self, meter): + self.meter = meter + self.map = {} + + def clear(self) -> None: + self.map.clear() + + def _create_counter(self, name): + """Creates a new counter or up_down_counter for the provided name.""" + otel_safe_name = name[:OTEL_NAME_MAX_LENGTH] + if name != otel_safe_name: + warnings.warn( Review Comment: Should we simply raise error in this case? The problem with this warning is that user who will see it will not be able to do anything with it, and those are the worst kinds of warnings. This check should be targeted not for the users of airflow, but for those who create new metrics, and for them, just failing it here would be IMHO much betters because it will prevent them from adding the wrong metrics name in the first place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
