xBis7 commented on code in PR #62554:
URL: https://github.com/apache/airflow/pull/62554#discussion_r2905128039
##########
airflow-core/src/airflow/executors/workloads/task.py:
##########
@@ -86,7 +86,7 @@ def make(
from airflow.utils.helpers import log_filename_template_renderer
ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True)
- ser_ti.parent_context_carrier = ti.dag_run.context_carrier
+ ser_ti.context_carrier = ti.dag_run.context_carrier
Review Comment:
The `parent_context_carrier` had the `context_carrier` from the `dag_run`
span.
Then in the `base_executor.py`, we used the `dag_run` span to start the task
span and set the `context_carrier` from the `task` span to
`ser_ti.context_carrier`.
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -415,30 +395,6 @@ def trigger_tasks(self, open_slots: int) -> None:
if key in self.attempts:
del self.attempts[key]
- if isinstance(workload, workloads.ExecuteTask) and
hasattr(workload, "ti"):
- ti = workload.ti
-
- # If it's None, then the span for the current id hasn't been
started.
- if self.active_spans is not None and
self.active_spans.get("ti:" + str(ti.id)) is None:
- if isinstance(ti, TaskInstanceDTO):
- parent_context =
Trace.extract(ti.parent_context_carrier)
- else:
- parent_context =
Trace.extract(ti.dag_run.context_carrier)
- # Start a new span using the context from the parent.
- # Attributes will be set once the task has finished so
that all
- # values will be available (end_time, duration, etc.).
-
- span = Trace.start_child_span(
- span_name=f"{ti.task_id}",
- parent_context=parent_context,
- component="task",
- start_as_current=False,
- )
- self.active_spans.set("ti:" + str(ti.id), span)
- # Inject the current context into the carrier.
- carrier = Trace.inject()
- ti.context_carrier = carrier
Review Comment:
Here, this used to be the task `context_carrier` and now you are setting it
to the `dag_run` span `context_carrier` in `task.py`.
##########
airflow-core/src/airflow/executors/workloads/task.py:
##########
@@ -86,7 +86,7 @@ def make(
from airflow.utils.helpers import log_filename_template_renderer
ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True)
- ser_ti.parent_context_carrier = ti.dag_run.context_carrier
+ ser_ti.context_carrier = ti.dag_run.context_carrier
Review Comment:
<img width="2052" height="860" alt="Image"
src="https://github.com/user-attachments/assets/e98cae49-014f-4de3-a141-971b0a256afd"
/>
I ran the original `test_otel_dag.py` with this code. The task sub-spans
that were created using the `ti.context_carrier`, they are linked as children
of the `dag_run` span and not the task.
If the user needs the `context_carrier` to propagate it to another service,
like an external API call made from the task, and he gets it like so
```
from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
context_carrier: dict[str, str] = {}
TraceContextTextMapPropagator().inject(context_carrier)
```
there won't be a problem. But if he uses the `context_carrier` from the ti
fields, then it won't work as expected because it won't have the task span
context.
##########
airflow-core/src/airflow/observability/traces/__init__.py:
##########
@@ -15,3 +15,134 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
+import logging
+import os
+from contextlib import contextmanager
+from importlib.metadata import entry_points
+
+from opentelemetry import context, trace
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
+from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
+from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+from airflow.configuration import conf
+
+log = logging.getLogger(__name__)
+
+OVERRIDE_SPAN_ID_KEY = context.create_key("override_span_id")
+OVERRIDE_TRACE_ID_KEY = context.create_key("override_trace_id")
+
+
+class OverrideableRandomIdGenerator(RandomIdGenerator):
+ """Lets you override the span id."""
+
+ def generate_span_id(self):
+ override = context.get_value(OVERRIDE_SPAN_ID_KEY)
+ if override is not None:
+ return override
+ return super().generate_span_id()
+
+ def generate_trace_id(self):
+ override = context.get_value(OVERRIDE_TRACE_ID_KEY)
+ if override is not None:
+ return override
+ return super().generate_trace_id()
+
+
+def new_dagrun_trace_carrier() -> dict[str, str]:
+ """Generate a fresh W3C traceparent carrier without creating a recordable
span."""
+ gen = RandomIdGenerator()
+ span_ctx = SpanContext(
+ trace_id=gen.generate_trace_id(),
+ span_id=gen.generate_span_id(),
+ is_remote=False,
+ trace_flags=TraceFlags(TraceFlags.SAMPLED),
+ )
+ ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))
+ carrier: dict[str, str] = {}
+ TraceContextTextMapPropagator().inject(carrier, context=ctx)
+ return carrier
+
+
+@contextmanager
+def override_ids(trace_id, span_id, ctx=None):
+ ctx = context.set_value(OVERRIDE_TRACE_ID_KEY, trace_id, context=ctx)
+ ctx = context.set_value(OVERRIDE_SPAN_ID_KEY, span_id, context=ctx)
+ token = context.attach(ctx)
+ try:
+ yield
+ finally:
+ context.detach(token)
+
+
+def _get_backcompat_config() -> tuple[str | None, Resource | None]:
+ """
+ Possibly get deprecated Airflow configs for otel.
+
+ Ideally we return (None, None) here. But if the old configuration is
there,
+ then we will use it.
+ """
+ resource = None
+ if not os.environ.get("OTEL_SERVICE_NAME") and not
os.environ.get("OTEL_RESOURCE_ATTRIBUTES"):
+ service_name = conf.get("traces", "otel_service", fallback=None)
+ if service_name:
+ resource = Resource({"service.name": service_name})
+
+ endpoint = None
+ if not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") and not
os.environ.get(
+ "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
+ ):
+ # this is only for backcompat!
+ host = conf.get("traces", "otel_host", fallback=None)
+ port = conf.get("traces", "otel_port", fallback=None)
+ ssl_active = conf.getboolean("traces", "otel_ssl_active",
fallback=False)
+ if host and port:
+ scheme = "https" if ssl_active else "http"
+ endpoint = f"{scheme}://{host}:{port}/v1/traces"
+ return endpoint, resource
+
+
+def _load_exporter_from_env() -> SpanExporter:
+ """
+ Load a span exporter using the OTEL_TRACES_EXPORTER env var.
+
+ Mirrors the entry-point mechanism used by the OTEL SDK auto-instrumentation
+ configurator. Supported values (from installed packages):
+ - ``otlp`` (default) — OTLP/gRPC
+ - ``otlp_proto_http`` — OTLP/HTTP
+ - ``console`` — stdout (useful for debugging)
+ """
+ exporter_name = os.environ.get("OTEL_TRACES_EXPORTER", "otlp")
+ eps = entry_points(group="opentelemetry_traces_exporter",
name=exporter_name)
+ ep = next(iter(eps), None)
+ if ep is None:
+ raise RuntimeError(
+ f"No span exporter found for
OTEL_TRACES_EXPORTER={exporter_name!r}. "
+ f"Available: {[e.name for e in
entry_points(group='opentelemetry_traces_exporter')]}"
+ )
+ return ep.load()()
+
+
+def configure_otel():
+ otel_on = conf.getboolean("traces", "otel_on", fallback=False)
+ if not otel_on:
+ return
+
+ # ideally both endpoint and resource are None here
+ # they would only be something other than None if user is using deprecated
+ # Airflow-defined otel configs
+ endpoint, resource = _get_backcompat_config()
+
+ # backcompat: if old-style host/port config provided an endpoint, set the
+ # env var so the exporter (loaded below) picks it up automatically
+ if endpoint and not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"):
Review Comment:
Check `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` here as well? Also note that,
`OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` takes precedence for the SDK. E.g. if both
are set, then the type-specific will be used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]