xBis7 commented on code in PR #62436:
URL: https://github.com/apache/airflow/pull/62436#discussion_r2853451269


##########
task-sdk/src/airflow/sdk/configuration.py:
##########
@@ -266,6 +271,13 @@ def initialize_config() -> AirflowSDKConfigParser:
     return airflow_config_parser
 
 
+provider = TracerProvider()
+exporter = OTLPSpanExporter()
+span_processor = SimpleSpanProcessor(exporter)
+provider.add_span_processor(span_processor)
+trace.set_tracer_provider(provider)
+

Review Comment:
   Will it work if we add 
   
   ```
   from airflow_shared.observability.traces.otel_tracer import configure_otel
   
   configure_otel()
   ```
   
   here and then do 
   
   ```
   tracer = trace.get_tracer(__name__)
   ```
   
   whenever needed?
   
   Instead of having the config here and `configure_otel` in task_runner?
   



##########
shared/observability/src/airflow_shared/observability/traces/otel_tracer.py:
##########
@@ -18,385 +18,70 @@
 from __future__ import annotations
 
 import logging
-import random
-from contextlib import AbstractContextManager
-from typing import TYPE_CHECKING
+from contextlib import contextmanager
 
-import pendulum
-from opentelemetry import trace
-from opentelemetry.context import attach, create_key
-from opentelemetry.sdk.resources import SERVICE_NAME, Resource
-from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer as 
OpenTelemetryTracer, TracerProvider
+from opentelemetry import context, trace
+from opentelemetry.context import create_key
+from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+from opentelemetry.sdk.trace import TracerProvider
 from opentelemetry.sdk.trace.export import (
-    BatchSpanProcessor,
-    ConsoleSpanExporter,
     SimpleSpanProcessor,
-    SpanExporter,
 )
-from opentelemetry.sdk.trace.id_generator import IdGenerator
-from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
-from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
-from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
-
-from ..common import get_otel_data_exporter
-from ..otel_env_config import load_traces_env_config
-from .utils import (
-    datetime_to_nano,
-    parse_traceparent,
-    parse_tracestate,
-)
-
-if TYPE_CHECKING:
-    from opentelemetry.context.context import Context
+from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
 
 log = logging.getLogger(__name__)
 
 _NEXT_ID = create_key("next_id")
 
 
-class OtelTrace:
-    """
-    Handle all tracing requirements such as getting the tracer, and starting a 
new span.
-
-    When OTEL is enabled, the Trace class will be replaced by this class.
-    """
-
-    def __init__(
-        self,
-        span_exporter: SpanExporter,
-        use_simple_processor: bool,
-        tag_string: str | None = None,
-        otel_service: str | None = None,
-        debug: bool = False,
-    ):
-        self.span_exporter = span_exporter
-        self.use_simple_processor = use_simple_processor
-        if self.use_simple_processor:
-            # With a BatchSpanProcessor, spans are exported at an interval.
-            # A task can run fast and finish before spans have enough time to 
get exported to the collector.
-            # When creating spans from inside a task, a SimpleSpanProcessor 
needs to be used because
-            # it exports the spans immediately after they are created.
-            log.info("(otel_tracer.__init__) - [SimpleSpanProcessor] is being 
used")
-            self.span_processor: SpanProcessor = 
SimpleSpanProcessor(self.span_exporter)
-        else:
-            log.info("(otel_tracer.__init__) - [BatchSpanProcessor] is being 
used")
-            self.span_processor = BatchSpanProcessor(self.span_exporter)
-        self.tag_string = tag_string
-        self.otel_service: str = otel_service or "airflow"
-        self.resource = Resource.create(attributes={SERVICE_NAME: 
self.otel_service})
-        self.debug = debug
-
-    def get_otel_tracer_provider(
-        self,
-        trace_id: int | None = None,
-        span_id: int | None = None,
-    ) -> TracerProvider:
-        """
-        Tracer that will use special AirflowOtelIdGenerator to control 
producing certain span and trace id.
-
-        It can be used to get a tracer and directly create spans, or for 
auto-instrumentation.
-        """
-        if trace_id or span_id:
-            # in case where trace_id or span_id was given
-            tracer_provider = TracerProvider(
-                resource=self.resource,
-                id_generator=AirflowOtelIdGenerator(span_id=span_id, 
trace_id=trace_id),
-            )
-        else:
-            tracer_provider = TracerProvider(resource=self.resource)
-        if self.debug is True:
-            log.info("[ConsoleSpanExporter] is being used")
-            if self.use_simple_processor:
-                log.info("[SimpleSpanProcessor] is being used")
-                span_processor_for_tracer_prov: SpanProcessor = 
SimpleSpanProcessor(ConsoleSpanExporter())
-            else:
-                log.info("[BatchSpanProcessor] is being used")
-                span_processor_for_tracer_prov = 
BatchSpanProcessor(ConsoleSpanExporter())
-        else:
-            span_processor_for_tracer_prov = self.span_processor
-
-        tracer_provider.add_span_processor(span_processor_for_tracer_prov)
-        return tracer_provider
-
-    def get_tracer(
-        self,
-        component: str,
-        trace_id: int | None = None,
-        span_id: int | None = None,
-    ) -> OpenTelemetryTracer | Tracer:
-        tracer_provider = self.get_otel_tracer_provider(trace_id=trace_id, 
span_id=span_id)
-        tracer = tracer_provider.get_tracer(component)
-        """
-        Tracer will produce a single ID value if value is provided. Note that 
this is one-time only, so any
-        subsequent call will produce the normal random ids.
-        """
-        return tracer
-
-    def get_current_span(self):
-        return trace.get_current_span()
-
-    def use_span(self, span: Span):
-        return trace.use_span(span=span)
-
-    def start_span(
-        self,
-        span_name: str,
-        component: str | None = None,
-        parent_sc: SpanContext | None = None,
-        span_id=None,
-        links=None,
-        start_time=None,
-    ):
-        """Start a span."""
-        # Common practice is to use the module name.
-        component = component or __name__
-
-        trace_id = self.get_current_span().get_span_context().trace_id
-        tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
-
-        attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
-
-        if links is not None:
-            _links = gen_links_from_kv_list(links)
-        else:
-            _links = []
-
-        if start_time is not None:
-            start_time = datetime_to_nano(start_time)
-
-        if parent_sc is not None:
-            ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
-            span = tracer.start_as_current_span(
-                span_name, context=ctx, attributes=attributes, links=_links, 
start_time=start_time
-            )
-        else:
-            span = tracer.start_as_current_span(
-                span_name, attributes=attributes, links=_links, 
start_time=start_time
-            )
-        return span
-
-    def start_root_span(
-        self,
-        span_name: str,
-        component: str | None = None,
-        links=None,
-        start_time=None,
-        start_as_current: bool = True,
-    ):
-        """Start a root span."""
-        # If no context is passed to the new span,
-        # then it will try to get the context of the current active span.
-        # Due to that, the context parameter can't be empty.
-        # It needs an invalid context in order to declare the new span as root.
-        invalid_span_ctx = SpanContext(
-            trace_id=INVALID_TRACE_ID, span_id=INVALID_SPAN_ID, 
is_remote=True, trace_flags=TraceFlags(0x01)
-        )
-        invalid_ctx = 
trace.set_span_in_context(NonRecordingSpan(invalid_span_ctx))
-
-        if links is None:
-            _links = []
-        else:
-            _links = links
-
-        return self._new_span(
-            span_name=span_name,
-            parent_context=invalid_ctx,
-            component=component,
-            links=_links,
-            start_time=start_time,
-            start_as_current=start_as_current,
-        )
-
-    def start_child_span(
-        self,
-        span_name: str,
-        parent_context: Context | None = None,
-        component: str | None = None,
-        links=None,
-        start_time=None,
-        start_as_current: bool = True,
-    ):
-        """Start a child span."""
-        if parent_context is None:
-            # If no context is passed, then use the current.
-            parent_span_context = trace.get_current_span().get_span_context()
-            parent_context = 
trace.set_span_in_context(NonRecordingSpan(parent_span_context))
-        else:
-            context_val = next(iter(parent_context.values()))
-            parent_span_context = None
-            if isinstance(context_val, NonRecordingSpan):
-                parent_span_context = context_val.get_span_context()
-
-        if links is None:
-            _links = []
-        else:
-            _links = links
-
-        if parent_span_context is not None:
-            _links.append(
-                Link(
-                    context=parent_span_context,
-                    attributes={"meta.annotation_type": "link", "from": 
"parenttrace"},
-                )
-            )
+OVERRIDE_SPAN_ID_KEY = context.create_key("override_span_id")
+OVERRIDE_TRACE_ID_KEY = context.create_key("override_trace_id")
 
-        return self._new_span(
-            span_name=span_name,
-            parent_context=parent_context,
-            component=component,
-            links=_links,
-            start_time=start_time,
-            start_as_current=start_as_current,
-        )
 
-    def _new_span(
-        self,
-        span_name: str,
-        parent_context: Context | None = None,
-        component: str | None = None,
-        links=None,
-        start_time=None,
-        start_as_current: bool = True,
-    ) -> AbstractContextManager[trace.span.Span] | trace.span.Span:
-        # Common practice is to use the module name.
-        component = component or __name__
+class OverrideableRandomIdGenerator(RandomIdGenerator):
+    """Lets you override the span id."""
 
-        tracer = self.get_tracer(component=component)
+    def generate_span_id(self):
+        override = context.get_value(OVERRIDE_SPAN_ID_KEY)
+        if override is not None:
+            context.attach(context.set_value(OVERRIDE_SPAN_ID_KEY, None))
+            return override
+        return super().generate_span_id()
 
-        if start_time is None:
-            start_time = pendulum.now(tz=pendulum.UTC)
+    def generate_trace_id(self):
+        override = context.get_value(OVERRIDE_TRACE_ID_KEY)
+        if override is not None:
+            context.attach(context.set_value(OVERRIDE_TRACE_ID_KEY, None))
+            return override
+        return super().generate_trace_id()
 
-        if links is None:
-            links = []
 
-        if start_as_current:
-            return tracer.start_as_current_span(
-                name=span_name,
-                context=parent_context,
-                links=links,
-                start_time=datetime_to_nano(start_time),
-            )
+@contextmanager
+def override_ids(trace_id, span_id, ctx=None):
+    ctx = context.set_value(OVERRIDE_TRACE_ID_KEY, trace_id, context=ctx)
+    ctx = context.set_value(OVERRIDE_SPAN_ID_KEY, span_id, context=ctx)
+    token = context.attach(ctx)
+    try:
+        yield
+    finally:
+        context.detach(token)
 
-        span = tracer.start_span(  # type: ignore[assignment]
-            name=span_name,
-            context=parent_context,
-            links=links,
-            start_time=datetime_to_nano(start_time),
-        )
-        current_span_ctx = 
trace.set_span_in_context(NonRecordingSpan(span.get_span_context()))  # type: 
ignore[attr-defined]
-        # We have to manually make the span context as the active context.
-        # If the span needs to be injected into the carrier, then this is 
needed to make sure
-        # that the injected context will point to the span context that was 
just created.
-        attach(current_span_ctx)
-        return span
 
-    def inject(self) -> dict:
-        """Inject the current span context into a carrier and return it."""
-        carrier: dict[str, str] = {}
-        TraceContextTextMapPropagator().inject(carrier)
-        return carrier
-
-    def extract(self, carrier: dict) -> Context:
-        """Extract the span context from a provided carrier."""
-        return TraceContextTextMapPropagator().extract(carrier)
-
-
-def gen_context(trace_id: int, span_id: int):
-    """Generate a remote span context for given trace and span id."""
-    span_ctx = SpanContext(trace_id=trace_id, span_id=span_id, is_remote=True, 
trace_flags=TraceFlags(0x01))
-    return span_ctx
-
-
-def gen_links_from_kv_list(kv_list):
-    """Convert list of kv dic of trace_id and span_id and generate list of 
SpanContext."""
-    result = []
-    for a in kv_list:
-        trace_id = a["trace_id"]  # string of hexa
-        span_id = a["span_id"]  # string of hexa
-        span_ctx = gen_context(trace_id, span_id)
-        a_link = Link(
-            context=span_ctx,
-            attributes={"meta.annotation_type": "link"},
-        )
-        result.append(a_link)
-    return result
-
-
-def gen_link_from_traceparent(traceparent: str):
-    """Generate Link object from provided traceparent string."""
-    if traceparent is None:
-        return None
-
-    trace_ctx = parse_traceparent(traceparent)
-    trace_id = trace_ctx["trace_id"]
-    span_id = trace_ctx["parent_id"]
-    span_ctx = gen_context(int(trace_id, 16), int(span_id, 16))
-    return Link(context=span_ctx, attributes={"meta.annotation_type": "link", 
"from": "traceparent"})
-
-
-def get_otel_tracer(
-    cls,
-    use_simple_processor: bool = False,
-    *,
-    host: str | None = None,
-    port: int | None = None,
-    ssl_active: bool = False,
-    otel_service: str | None = None,
-    debug: bool = False,
-) -> OtelTrace:
-    """Get OTEL tracer from the regular OTel env variables or the airflow 
configuration."""
-    otel_env_config = load_traces_env_config()
-
-    tag_string = cls.get_constant_tags()
-
-    exporter = get_otel_data_exporter(
-        otel_env_config=otel_env_config,
-        host=host,
-        port=port,
-        ssl_active=ssl_active,
-    )
-
-    otel_service = otel_env_config.service_name or otel_service
-
-    if otel_env_config.exporter:
-        debug = otel_env_config.exporter == "console"
-
-    log.info("Should use simple processor: %s", use_simple_processor)
-    return OtelTrace(
-        span_exporter=exporter,
-        use_simple_processor=use_simple_processor,
-        tag_string=tag_string,
-        otel_service=otel_service,
-        debug=debug,
-    )
-
-
-class AirflowOtelIdGenerator(IdGenerator):
-    """
-    ID Generator for span id and trace id.
-
-    The specific purpose of this ID generator is to generate a given span_id 
when the
-    generate_span_id is called for the FIRST time. Any subsequent calls to the 
generate_span_id()
-    will then fall back into producing random ones. As for the trace_id, the 
class is designed
-    to produce the provided trace id (and not anything random)
-    """
-
-    def __init__(self, span_id=None, trace_id=None):
-        super().__init__()
-        self.span_id = span_id
-        self.trace_id = trace_id
-
-    def generate_span_id(self) -> int:
-        if self.span_id is not None:
-            id = self.span_id
-            self.span_id = None
-            return id
-        new_id = random.getrandbits(64)
-        return new_id
-
-    def generate_trace_id(self) -> int:
-        if self.trace_id is not None:
-            id = self.trace_id
-            return id
-        new_id = random.getrandbits(128)
-        return new_id
+#
+#
+# port = conf.getint("traces", "otel_port", fallback=None)
+# host = conf.get("traces", "otel_host", fallback=None)
+# ssl_active = conf.getboolean("traces", "otel_ssl_active", fallback=False)
+# otel_service = conf.get("traces", "otel_service", fallback=None)
+
+# resource = Resource.create(attributes={SERVICE_NAME: otel_service})
+# otel_env_config = load_traces_env_config()
+
+
+def configure_otel():

Review Comment:
   Nice, much simpler! As discussed offline, we still need to read some OTel 
environment variables. This is going to be needed
   
   ```
   otel_env_config = load_traces_env_config()
   ```



##########
shared/observability/src/airflow_shared/observability/metrics/otel_logger.py:
##########
@@ -386,6 +387,81 @@ def atexit_register_metrics_flush():
     atexit.register(flush_otel_metrics)
 
 
+def get_otel_data_exporter(
+    *,
+    otel_env_config: OtelEnvConfig,
+    host: str | None = None,
+    port: int | None = None,
+    ssl_active: bool = False,
+) -> SpanExporter | MetricExporter:
+    protocol = "https" if ssl_active else "http"
+
+    # According to the OpenTelemetry Spec, specific config options like 
'OTEL_EXPORTER_OTLP_TRACES_ENDPOINT'
+    # take precedence over generic ones like 'OTEL_EXPORTER_OTLP_ENDPOINT'.
+    env_exporter_protocol = (
+        otel_env_config.type_specific_exporter_protocol or 
otel_env_config.exporter_protocol
+    )
+    env_endpoint = otel_env_config.type_specific_endpoint or 
otel_env_config.base_endpoint
+
+    # If the protocol env var isn't set, then it will be None,
+    # and it will default to an http/protobuf exporter.
+    if env_endpoint and env_exporter_protocol == "grpc":
+        from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import 
OTLPMetricExporter
+        from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import 
OTLPSpanExporter

Review Comment:
   We are still going to need this part with the exporter.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -134,14 +130,17 @@
     from airflow.serialization.definitions.dag import SerializedDAG
     from airflow.utils.sqlalchemy import CommitProhibitorGuard
 
-TI = TaskInstance
 DR = DagRun
 DM = DagModel
 
 TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule"
 """:meta private:"""
 
 
+PS = ParamSpec("PS")
+RT = TypeVar("RT")

Review Comment:
   Can you explain these?



##########
airflow-core/src/airflow/settings.py:
##########
@@ -758,3 +758,7 @@ def initialize():
 # Prefix used by gunicorn workers to indicate they are ready to serve requests
 # Used by GunicornMonitor to track worker readiness via process titles
 GUNICORN_WORKER_READY_PREFIX: str = "[ready] "
+
+from airflow_shared.observability.traces.otel_tracer import configure_otel
+
+configure_otel()

Review Comment:
   I like that this is running once, in a central place.



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1016,131 +1052,39 @@ def is_effective_leaf(task):
         leaf_tis = {ti for ti in tis if ti.task_id in leaf_task_ids if 
ti.state != TaskInstanceState.REMOVED}
         return leaf_tis
 
-    def set_dagrun_span_attrs(self, span: Span | EmptySpan):
-        if self._state == DagRunState.FAILED:
-            span.set_attribute("airflow.dag_run.error", True)
-
-        # Explicitly set the value type to Union[...] to avoid a mypy error.
-        attributes: dict[str, AttributeValueType] = {
-            "airflow.category": "DAG runs",
-            "airflow.dag_run.dag_id": str(self.dag_id),
-            "airflow.dag_run.logical_date": str(self.logical_date),
-            "airflow.dag_run.run_id": str(self.run_id),
-            "airflow.dag_run.queued_at": str(self.queued_at),
-            "airflow.dag_run.run_start_date": str(self.start_date),
-            "airflow.dag_run.run_end_date": str(self.end_date),
-            "airflow.dag_run.run_duration": str(
-                (self.end_date - self.start_date).total_seconds() if 
self.start_date and self.end_date else 0
-            ),
-            "airflow.dag_run.state": str(self._state),
-            "airflow.dag_run.run_type": str(self.run_type),
-            "airflow.dag_run.data_interval_start": 
str(self.data_interval_start),
-            "airflow.dag_run.data_interval_end": str(self.data_interval_end),
-            "airflow.dag_run.conf": str(self.conf),
-        }
-        if span.is_recording():
-            span.add_event(name="airflow.dag_run.queued", 
timestamp=datetime_to_nano(self.queued_at))
-            span.add_event(name="airflow.dag_run.started", 
timestamp=datetime_to_nano(self.start_date))
-            span.add_event(name="airflow.dag_run.ended", 
timestamp=datetime_to_nano(self.end_date))
-        span.set_attributes(attributes)
-
-    def start_dr_spans_if_needed(self, tis: list[TI]):
-        # If there is no value in active_spans, then the span hasn't already 
been started.
-        if self.active_spans is not None and self.active_spans.get("dr:" + 
str(self.id)) is None:
-            if self.span_status == SpanStatus.NOT_STARTED or self.span_status 
== SpanStatus.NEEDS_CONTINUANCE:
-                dr_span = None
-                continue_ti_spans = False
-                if self.span_status == SpanStatus.NOT_STARTED:
-                    dr_span = Trace.start_root_span(
-                        span_name=f"{self.dag_id}",
-                        component="dag",
-                        start_time=self.queued_at,  # This is later converted 
to nano.
-                        start_as_current=False,
-                    )
-                elif self.span_status == SpanStatus.NEEDS_CONTINUANCE:
-                    # Use the existing context_carrier to set the initial 
dag_run span as the parent.
-                    parent_context = Trace.extract(self.context_carrier)
-                    with Trace.start_child_span(
-                        span_name="new_scheduler", 
parent_context=parent_context
-                    ) as s:
-                        s.set_attribute("trace_status", "continued")
-
-                    dr_span = Trace.start_child_span(
-                        span_name=f"{self.dag_id}_continued",
-                        parent_context=parent_context,
-                        component="dag",
-                        # No start time
-                        start_as_current=False,
-                    )
-                    # After this span is started, the context_carrier will be 
replaced by the new one.
-                    # New task span will use this span as the parent.
-                    continue_ti_spans = True
-                carrier = Trace.inject()
-                self.context_carrier = carrier
-                self.span_status = SpanStatus.ACTIVE
-                # Set the span in a synchronized dictionary, so that the 
variable can be used to end the span.
-                self.active_spans.set("dr:" + str(self.id), dr_span)
-                self.log.debug(
-                    "DagRun span has been started and the injected 
context_carrier is: %s",
-                    self.context_carrier,
-                )
-                # Start TI spans that also need continuance.
-                if continue_ti_spans:
-                    new_dagrun_context = Trace.extract(self.context_carrier)
-                    for ti in tis:
-                        if ti.span_status == SpanStatus.NEEDS_CONTINUANCE:
-                            ti_span = Trace.start_child_span(
-                                span_name=f"{ti.task_id}_continued",
-                                parent_context=new_dagrun_context,
-                                start_as_current=False,
-                            )
-                            ti_carrier = Trace.inject()
-                            ti.context_carrier = ti_carrier
-                            ti.span_status = SpanStatus.ACTIVE
-                            self.active_spans.set(f"ti:{ti.id}", ti_span)
-            else:
-                self.log.debug(
-                    "Found span_status '%s', while updating state for dag_run 
'%s'",
-                    self.span_status,
-                    self.run_id,
-                )
-
-    def end_dr_span_if_needed(self):
-        if self.active_spans is not None:
-            active_span = self.active_spans.get("dr:" + str(self.id))
-            if active_span is not None:
-                self.log.debug(
-                    "Found active span with span_id: %s, for dag_id: %s, 
run_id: %s, state: %s",
-                    active_span.get_span_context().span_id,
-                    self.dag_id,
-                    self.run_id,
-                    self.state,
-                )
-
-                self.set_dagrun_span_attrs(span=active_span)
-                active_span.end(end_time=datetime_to_nano(self.end_date))
-                # Remove the span from the dict.
-                self.active_spans.delete("dr:" + str(self.id))
-                self.span_status = SpanStatus.ENDED
-            else:
-                if self.span_status == SpanStatus.ACTIVE:
-                    # Another scheduler has started the span.
-                    # Update the DB SpanStatus to notify the owner to end it.
-                    self.span_status = SpanStatus.SHOULD_END
-                elif self.span_status == SpanStatus.NEEDS_CONTINUANCE:
-                    # This is a corner case where the scheduler exited 
gracefully
-                    # while the dag_run was almost done.
-                    # Since it reached this point, the dag has finished but 
there has been no time
-                    # to create a new span for the current scheduler.
-                    # There is no need for more spans, update the status on 
the db.
-                    self.span_status = SpanStatus.ENDED
-                else:
-                    self.log.debug(
-                        "No active span has been found for dag_id: %s, run_id: 
%s, state: %s",
-                        self.dag_id,
-                        self.run_id,
-                        self.state,
-                    )
+    def _emit_dagrun_span(self):
+        # resource = Resource.create(
+        #     attributes={
+        #         SERVICE_NAME: "syntheticemitter",
+        #     }
+        # )
+        # otel_env_config = load_traces_env_config()
+        # host = conf.get("traces", "otel_host", fallback=None)
+        # port = conf.getint("traces", "otel_port")
+        # ssl_active = conf.getboolean("traces", "otel_ssl_active", 
fallback=False)
+        # exporter = get_otel_data_exporter(
+        #     otel_env_config=otel_env_config,
+        #     host=host,
+        #     port=port,
+        #     ssl_active=ssl_active,
+        # )
+        # tracer_provider = 
TracerProvider(id_generator=OverrideableRandomIdGenerator())
+        # processor = BatchSpanProcessor(exporter)
+        # tracer_provider.add_span_processor(processor)
+        # trace.set_tracer_provider(tracer_provider)
+        ctx = TraceContextTextMapPropagator().extract(self.context_carrier)
+        span = trace.get_current_span(context=ctx)

Review Comment:
   You are starting a very short root span in the `__init__` and this is 
becoming the child?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to