nickstenning commented on code in PR #62436:
URL: https://github.com/apache/airflow/pull/62436#discussion_r2852911950
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1016,131 +1052,39 @@ def is_effective_leaf(task):
leaf_tis = {ti for ti in tis if ti.task_id in leaf_task_ids if
ti.state != TaskInstanceState.REMOVED}
return leaf_tis
- def set_dagrun_span_attrs(self, span: Span | EmptySpan):
- if self._state == DagRunState.FAILED:
- span.set_attribute("airflow.dag_run.error", True)
-
- # Explicitly set the value type to Union[...] to avoid a mypy error.
- attributes: dict[str, AttributeValueType] = {
- "airflow.category": "DAG runs",
- "airflow.dag_run.dag_id": str(self.dag_id),
- "airflow.dag_run.logical_date": str(self.logical_date),
- "airflow.dag_run.run_id": str(self.run_id),
- "airflow.dag_run.queued_at": str(self.queued_at),
- "airflow.dag_run.run_start_date": str(self.start_date),
- "airflow.dag_run.run_end_date": str(self.end_date),
- "airflow.dag_run.run_duration": str(
- (self.end_date - self.start_date).total_seconds() if
self.start_date and self.end_date else 0
- ),
- "airflow.dag_run.state": str(self._state),
- "airflow.dag_run.run_type": str(self.run_type),
- "airflow.dag_run.data_interval_start":
str(self.data_interval_start),
- "airflow.dag_run.data_interval_end": str(self.data_interval_end),
- "airflow.dag_run.conf": str(self.conf),
- }
- if span.is_recording():
- span.add_event(name="airflow.dag_run.queued",
timestamp=datetime_to_nano(self.queued_at))
- span.add_event(name="airflow.dag_run.started",
timestamp=datetime_to_nano(self.start_date))
- span.add_event(name="airflow.dag_run.ended",
timestamp=datetime_to_nano(self.end_date))
- span.set_attributes(attributes)
-
- def start_dr_spans_if_needed(self, tis: list[TI]):
- # If there is no value in active_spans, then the span hasn't already
been started.
- if self.active_spans is not None and self.active_spans.get("dr:" +
str(self.id)) is None:
- if self.span_status == SpanStatus.NOT_STARTED or self.span_status
== SpanStatus.NEEDS_CONTINUANCE:
- dr_span = None
- continue_ti_spans = False
- if self.span_status == SpanStatus.NOT_STARTED:
- dr_span = Trace.start_root_span(
- span_name=f"{self.dag_id}",
- component="dag",
- start_time=self.queued_at, # This is later converted
to nano.
- start_as_current=False,
- )
- elif self.span_status == SpanStatus.NEEDS_CONTINUANCE:
- # Use the existing context_carrier to set the initial
dag_run span as the parent.
- parent_context = Trace.extract(self.context_carrier)
- with Trace.start_child_span(
- span_name="new_scheduler",
parent_context=parent_context
- ) as s:
- s.set_attribute("trace_status", "continued")
-
- dr_span = Trace.start_child_span(
- span_name=f"{self.dag_id}_continued",
- parent_context=parent_context,
- component="dag",
- # No start time
- start_as_current=False,
- )
- # After this span is started, the context_carrier will be
replaced by the new one.
- # New task span will use this span as the parent.
- continue_ti_spans = True
- carrier = Trace.inject()
- self.context_carrier = carrier
- self.span_status = SpanStatus.ACTIVE
- # Set the span in a synchronized dictionary, so that the
variable can be used to end the span.
- self.active_spans.set("dr:" + str(self.id), dr_span)
- self.log.debug(
- "DagRun span has been started and the injected
context_carrier is: %s",
- self.context_carrier,
- )
- # Start TI spans that also need continuance.
- if continue_ti_spans:
- new_dagrun_context = Trace.extract(self.context_carrier)
- for ti in tis:
- if ti.span_status == SpanStatus.NEEDS_CONTINUANCE:
- ti_span = Trace.start_child_span(
- span_name=f"{ti.task_id}_continued",
- parent_context=new_dagrun_context,
- start_as_current=False,
- )
- ti_carrier = Trace.inject()
- ti.context_carrier = ti_carrier
- ti.span_status = SpanStatus.ACTIVE
- self.active_spans.set(f"ti:{ti.id}", ti_span)
- else:
- self.log.debug(
- "Found span_status '%s', while updating state for dag_run
'%s'",
- self.span_status,
- self.run_id,
- )
-
- def end_dr_span_if_needed(self):
- if self.active_spans is not None:
- active_span = self.active_spans.get("dr:" + str(self.id))
- if active_span is not None:
- self.log.debug(
- "Found active span with span_id: %s, for dag_id: %s,
run_id: %s, state: %s",
- active_span.get_span_context().span_id,
- self.dag_id,
- self.run_id,
- self.state,
- )
-
- self.set_dagrun_span_attrs(span=active_span)
- active_span.end(end_time=datetime_to_nano(self.end_date))
- # Remove the span from the dict.
- self.active_spans.delete("dr:" + str(self.id))
- self.span_status = SpanStatus.ENDED
- else:
- if self.span_status == SpanStatus.ACTIVE:
- # Another scheduler has started the span.
- # Update the DB SpanStatus to notify the owner to end it.
- self.span_status = SpanStatus.SHOULD_END
- elif self.span_status == SpanStatus.NEEDS_CONTINUANCE:
- # This is a corner case where the scheduler exited
gracefully
- # while the dag_run was almost done.
- # Since it reached this point, the dag has finished but
there has been no time
- # to create a new span for the current scheduler.
- # There is no need for more spans, update the status on
the db.
- self.span_status = SpanStatus.ENDED
- else:
- self.log.debug(
- "No active span has been found for dag_id: %s, run_id:
%s, state: %s",
- self.dag_id,
- self.run_id,
- self.state,
- )
+ def _emit_dagrun_span(self):
+ # resource = Resource.create(
+ # attributes={
+ # SERVICE_NAME: "syntheticemitter",
+ # }
+ # )
+ # otel_env_config = load_traces_env_config()
+ # host = conf.get("traces", "otel_host", fallback=None)
+ # port = conf.getint("traces", "otel_port")
+ # ssl_active = conf.getboolean("traces", "otel_ssl_active",
fallback=False)
+ # exporter = get_otel_data_exporter(
+ # otel_env_config=otel_env_config,
+ # host=host,
+ # port=port,
+ # ssl_active=ssl_active,
+ # )
+ # tracer_provider =
TracerProvider(id_generator=OverrideableRandomIdGenerator())
+ # processor = BatchSpanProcessor(exporter)
+ # tracer_provider.add_span_processor(processor)
+ # trace.set_tracer_provider(tracer_provider)
+ ctx = TraceContextTextMapPropagator().extract(self.context_carrier)
+ span = trace.get_current_span(context=ctx)
+ span_context = span.get_span_context()
+ log.warning("setting id overrides", trace_id=span_context.trace_id,
span_id=span_context.span_id)
+ with override_ids(span_context.trace_id, span_context.span_id):
+ span = tracer.start_span(
+ "my-dag-run",
+ start_time=int(self.start_date.timestamp() * 1e9),
+ attributes={"airflow.dag.id": "my_dag"},
+ context=context.Context(),
+ )
+ span.set_status(StatusCode.OK)
Review Comment:
We should set this conditionally on the basis of `self._state`, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]