xBis7 commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1883769479


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1029,6 +1048,168 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for run_id, span in self.active_dagrun_spans.get_all().items():
+            dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
+            if dag_run.state in State.finished_dr_states:
+                dagv = session.scalar(select(DagVersion).where(DagVersion.id 
== dag_run.dag_version_id))
+                DagRun.set_dagrun_span_attrs(span=span, dag_run=dag_run, 
dagv=dagv)
+
+                span.end(end_time=datetime_to_nano(dag_run.end_date))
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+            else:
+                span.end()
+                dag_run.set_span_status(
+                    status=SpanStatus.NEEDS_CONTINUANCE, session=session, 
with_commit=False
+                )
+                initial_dag_run_context = 
Trace.extract(dag_run.context_carrier)
+                with Trace.start_child_span(
+                    span_name="current_scheduler_exited", 
parent_context=initial_dag_run_context
+                ) as s:
+                    s.set_attribute("trace_status", "needs continuance")
+
+        for key, span in self.active_ti_spans.get_all().items():
+            # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+            ti: TaskInstance = session.scalars(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == key.dag_id,
+                    TaskInstance.task_id == key.task_id,
+                    TaskInstance.run_id == key.run_id,
+                )
+            ).one()
+            if ti.state in State.finished:
+                self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                span.end(end_time=datetime_to_nano(ti.end_date))
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+            else:
+                span.end()
+                ti.set_span_status(status=SpanStatus.NEEDS_CONTINUANCE, 
session=session, with_commit=False)
+
+        self.active_dagrun_spans.clear()
+        self.active_ti_spans.clear()
+
+    @provide_session
+    def _end_spans_of_externally_ended_ops(self, session: Session = 
NEW_SESSION):
+        # The scheduler that starts a dag_run or a task is also the one that 
starts the spans.
+        # Each scheduler should end the spans that it has started.
+        #
+        # Otel spans are designed so that only the process that starts them,
+        # has full control over their lifecycle.
+        # This also means that the process that started them, is the only one 
that can end them.
+        #
+        # If another scheduler has finished processing a dag_run or a task and 
there is a reference
+        # on the active_spans dictionary, then the current scheduler started 
the span,
+        # and therefore must end it.
+        dag_runs_should_end: list[DagRun] = session.scalars(
+            select(DagRun).where(DagRun.span_status == SpanStatus.SHOULD_END)
+        ).all()
+        tis_should_end: list[TaskInstance] = session.scalars(
+            select(TaskInstance).where(TaskInstance.span_status == 
SpanStatus.SHOULD_END)
+        ).all()
+
+        for dag_run in dag_runs_should_end:
+            active_dagrun_span = self.active_dagrun_spans.get(dag_run.run_id)
+            if active_dagrun_span is not None:
+                if dag_run.state in State.finished_dr_states:
+                    dagv = 
session.scalar(select(DagVersion).where(DagVersion.id == 
dag_run.dag_version_id))
+                    DagRun.set_dagrun_span_attrs(span=active_dagrun_span, 
dag_run=dag_run, dagv=dagv)
+
+                    
active_dagrun_span.end(end_time=datetime_to_nano(dag_run.end_date))
+                else:
+                    active_dagrun_span.end()
+                self.active_dagrun_spans.delete(dag_run.run_id)
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+
+        for ti in tis_should_end:
+            active_ti_span = self.active_ti_spans.get(ti.key)
+            if active_ti_span is not None:
+                if ti.state in State.finished:
+                    self.set_ti_span_attrs(span=active_ti_span, 
state=ti.state, ti=ti)
+                    active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
+                else:
+                    active_ti_span.end()
+                self.active_ti_spans.delete(ti.key)
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+
+    @provide_session
+    def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, 
session: Session = NEW_SESSION):

Review Comment:
   If the original span belonged to a task that has already finished and that 
task had sub-spans, then the task sub-spans will be lost. The recreation is to 
handle the scenario where the scheduler wasn't stopped but exited without doing 
any cleanup.
   
   If we do it for all spans, then we will end up close to the original 
implementation.
   
   Let's say that we have these spans
   ```
   dagrun       |----------------
   task1              |--------|            -> this is referencing the active 
dagrun span as the parent
   task1 sub           |---|                -> this is referencing task1 span 
as the parent
   ```
   
   and the scheduler exits forcefully. In that case, these are the issues 
   * the dagrun span was active and it has been lost
   * task1 span and sub span, were ended and exported but they are referencing 
the lost dagrun span
       * they are orphaned
   
   We need to recreate the dagrun span but we are forced to recreate the ended 
spans as well to associate them with the new root span.
   
   This is how we are handling it
   * We start a new dagrun span
   * Create a span for the ended task1 
       * it needs to reference the new dagrun span as the parent
       * there is no way to know if there was 1 or more sub spans under task1, 
those are lost
   
   The end result
   ```
   dagrun       |---------------------|       -> recreated
   task1              |--------|              -> recreated
   (task1 sub           |---|                 -> this is lost because it was 
referencing the old parent)
   task2                       |----|         -> using the new dagrun span as 
the parent
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to