This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d43052e53bc Refactor implementation to get OTel tracer (#44216)
d43052e53bc is described below
commit d43052e53bcf9bd8772484b8be4590f869932330
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Nov 20 19:02:30 2024 +0000
Refactor implementation to get OTel tracer (#44216)
Previous implementation had redundant logic in add_span for parameter
inspection:
```python
with Trace.start_span(span_name=func_name, component=component):
if len(inspect.signature(func).parameters) > 0:
return func(*args, **kwargs)
else:
return func()
```
`_Trace` Metaclass was using `__init__` and few more Python-related changes
---
airflow/traces/tracer.py | 78 ++++++++++++++++++++++++++----------------------
1 file changed, 43 insertions(+), 35 deletions(-)
diff --git a/airflow/traces/tracer.py b/airflow/traces/tracer.py
index 79272d43d00..f116b02ef3b 100644
--- a/airflow/traces/tracer.py
+++ b/airflow/traces/tracer.py
@@ -17,9 +17,9 @@
# under the License.
from __future__ import annotations
-import inspect
import logging
import socket
+from functools import wraps
from typing import TYPE_CHECKING, Any, Callable
from airflow.configuration import conf
@@ -44,20 +44,15 @@ def gen_links_from_kv_list(list):
def add_span(func):
"""Decorate a function with span."""
+ func_name = func.__name__
+ qual_name = func.__qualname__
+ module_name = func.__module__
+ component = qual_name.rsplit(".", 1)[0] if "." in qual_name else
module_name
+ @wraps(func)
def wrapper(*args, **kwargs):
- func_name = func.__name__
- qual_name = func.__qualname__
- module_name = func.__module__
- if "." in qual_name:
- component = f"{qual_name.rsplit('.', 1)[0]}"
- else:
- component = module_name
with Trace.start_span(span_name=func_name, component=component):
- if len(inspect.signature(func).parameters) > 0:
- return func(*args, **kwargs)
- else:
- return func()
+ return func(*args, **kwargs)
return wrapper
@@ -65,8 +60,7 @@ def add_span(func):
class EmptyContext:
"""If no Tracer is configured, EmptyContext is used as a fallback."""
- def __init__(self):
- self.trace_id = 1
+ trace_id = 1
class EmptySpan:
@@ -243,41 +237,55 @@ class EmptyTrace:
return EMPTY_SPAN
-class _Trace(type):
- factory: Callable
+class _TraceMeta(type):
+ factory: Callable[[], Tracer] | None = None
instance: Tracer | EmptyTrace | None = None
- def __getattr__(cls, name: str) -> str:
+ def __getattr__(cls, name: str):
+ if not cls.factory:
+ # Lazy initialization of the factory
+ cls.configure_factory()
if not cls.instance:
- try:
- cls.instance = cls.factory()
- except (socket.gaierror, ImportError) as e:
- log.error("Could not configure Trace: %s, using EmptyTrace
instead.", e)
- cls.instance = EmptyTrace()
+ cls._initialize_instance()
return getattr(cls.instance, name)
- def __init__(cls, *args, **kwargs) -> None:
- super().__init__(cls)
- if not hasattr(cls.__class__, "factory"):
- if conf.has_option("traces", "otel_on") and
conf.getboolean("traces", "otel_on"):
- from airflow.traces import otel_tracer
+ def _initialize_instance(cls):
+ """Initialize the trace instance."""
+ try:
+ cls.instance = cls.factory()
+ except (socket.gaierror, ImportError) as e:
+ log.error("Could not configure Trace: %s. Using EmptyTrace
instead.", e)
+ cls.instance = EmptyTrace()
+
+ def __call__(cls, *args, **kwargs):
+ """Ensure the class behaves as a singleton."""
+ if not cls.instance:
+ cls._initialize_instance()
+ return cls.instance
+
+ @classmethod
+ def configure_factory(cls):
+ """Configure the trace factory based on settings."""
+ if conf.has_option("traces", "otel_on") and conf.getboolean("traces",
"otel_on"):
+ from airflow.traces import otel_tracer
- cls.__class__.factory = otel_tracer.get_otel_tracer
- else:
- cls.__class__.factory = EmptyTrace
+ cls.factory = otel_tracer.get_otel_tracer
+ else:
+ # EmptyTrace is a class and not inherently callable.
+ # Using a lambda ensures it can be invoked as a callable factory.
+ # staticmethod ensures the lambda is treated as a standalone
function
+ # and avoids passing `cls` as an implicit argument.
+ cls.factory = staticmethod(lambda: EmptyTrace())
@classmethod
def get_constant_tags(cls) -> str | None:
"""Get constant tags to add to all traces."""
- tags_in_string = conf.get("traces", "tags", fallback=None)
- if not tags_in_string:
- return None
- return tags_in_string
+ return conf.get("traces", "tags", fallback=None)
if TYPE_CHECKING:
Trace: EmptyTrace
else:
- class Trace(metaclass=_Trace):
+ class Trace(metaclass=_TraceMeta):
"""Empty class for Trace - we use metaclass to inject the right one."""