dstandish commented on code in PR #62554:
URL: https://github.com/apache/airflow/pull/62554#discussion_r2908270835
##########
airflow-core/src/airflow/observability/traces/__init__.py:
##########
@@ -15,3 +15,134 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
+import logging
+import os
+from contextlib import contextmanager
+from importlib.metadata import entry_points
+
+from opentelemetry import context, trace
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
+from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
+from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+from airflow.configuration import conf
+
+log = logging.getLogger(__name__)
+
+OVERRIDE_SPAN_ID_KEY = context.create_key("override_span_id")
+OVERRIDE_TRACE_ID_KEY = context.create_key("override_trace_id")
+
+
+class OverrideableRandomIdGenerator(RandomIdGenerator):
+ """Lets you override the span id."""
+
+ def generate_span_id(self):
+ override = context.get_value(OVERRIDE_SPAN_ID_KEY)
+ if override is not None:
+ return override
+ return super().generate_span_id()
+
+ def generate_trace_id(self):
+ override = context.get_value(OVERRIDE_TRACE_ID_KEY)
+ if override is not None:
+ return override
+ return super().generate_trace_id()
+
+
+def new_dagrun_trace_carrier() -> dict[str, str]:
+ """Generate a fresh W3C traceparent carrier without creating a recordable
span."""
+ gen = RandomIdGenerator()
+ span_ctx = SpanContext(
+ trace_id=gen.generate_trace_id(),
+ span_id=gen.generate_span_id(),
+ is_remote=False,
+ trace_flags=TraceFlags(TraceFlags.SAMPLED),
+ )
+ ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))
+ carrier: dict[str, str] = {}
+ TraceContextTextMapPropagator().inject(carrier, context=ctx)
+ return carrier
+
+
+@contextmanager
+def override_ids(trace_id, span_id, ctx=None):
+ ctx = context.set_value(OVERRIDE_TRACE_ID_KEY, trace_id, context=ctx)
+ ctx = context.set_value(OVERRIDE_SPAN_ID_KEY, span_id, context=ctx)
+ token = context.attach(ctx)
+ try:
+ yield
+ finally:
+ context.detach(token)
+
+
+def _get_backcompat_config() -> tuple[str | None, Resource | None]:
+ """
+ Possibly get deprecated Airflow configs for otel.
+
+ Ideally we return (None, None) here. But if the old configuration is
there,
+ then we will use it.
+ """
+ resource = None
+ if not os.environ.get("OTEL_SERVICE_NAME") and not
os.environ.get("OTEL_RESOURCE_ATTRIBUTES"):
+ service_name = conf.get("traces", "otel_service", fallback=None)
+ if service_name:
+ resource = Resource({"service.name": service_name})
+
+ endpoint = None
+ if not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") and not
os.environ.get(
+ "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
+ ):
+ # this is only for backcompat!
+ host = conf.get("traces", "otel_host", fallback=None)
+ port = conf.get("traces", "otel_port", fallback=None)
+ ssl_active = conf.getboolean("traces", "otel_ssl_active",
fallback=False)
+ if host and port:
+ scheme = "https" if ssl_active else "http"
+ endpoint = f"{scheme}://{host}:{port}/v1/traces"
+ return endpoint, resource
+
+
+def _load_exporter_from_env() -> SpanExporter:
+ """
+ Load a span exporter using the OTEL_TRACES_EXPORTER env var.
+
+ Mirrors the entry-point mechanism used by the OTEL SDK auto-instrumentation
+ configurator. Supported values (from installed packages):
+ - ``otlp`` (default) — OTLP/gRPC
+ - ``otlp_proto_http`` — OTLP/HTTP
+ - ``console`` — stdout (useful for debugging)
+ """
+ exporter_name = os.environ.get("OTEL_TRACES_EXPORTER", "otlp")
+ eps = entry_points(group="opentelemetry_traces_exporter",
name=exporter_name)
+ ep = next(iter(eps), None)
+ if ep is None:
+ raise RuntimeError(
+ f"No span exporter found for
OTEL_TRACES_EXPORTER={exporter_name!r}. "
+ f"Available: {[e.name for e in
entry_points(group='opentelemetry_traces_exporter')]}"
+ )
+ return ep.load()()
+
+
+def configure_otel():
+ otel_on = conf.getboolean("traces", "otel_on", fallback=False)
+ if not otel_on:
+ return
+
+ # ideally both endpoint and resource are None here
+ # they would only be something other than None if user is using deprecated
+ # Airflow-defined otel configs
+ endpoint, resource = _get_backcompat_config()
+
+ # backcompat: if old-style host/port config provided an endpoint, set the
+ # env var so the exporter (loaded below) picks it up automatically
+ if endpoint and not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"):
Review Comment:
updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]