xBis7 commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1883781119


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1029,6 +1048,168 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for run_id, span in self.active_dagrun_spans.get_all().items():
+            dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
+            if dag_run.state in State.finished_dr_states:
+                dagv = session.scalar(select(DagVersion).where(DagVersion.id 
== dag_run.dag_version_id))
+                DagRun.set_dagrun_span_attrs(span=span, dag_run=dag_run, 
dagv=dagv)
+
+                span.end(end_time=datetime_to_nano(dag_run.end_date))
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+            else:
+                span.end()
+                dag_run.set_span_status(
+                    status=SpanStatus.NEEDS_CONTINUANCE, session=session, 
with_commit=False
+                )
+                initial_dag_run_context = 
Trace.extract(dag_run.context_carrier)
+                with Trace.start_child_span(
+                    span_name="current_scheduler_exited", 
parent_context=initial_dag_run_context
+                ) as s:
+                    s.set_attribute("trace_status", "needs continuance")
+
+        for key, span in self.active_ti_spans.get_all().items():
+            # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+            ti: TaskInstance = session.scalars(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == key.dag_id,
+                    TaskInstance.task_id == key.task_id,
+                    TaskInstance.run_id == key.run_id,
+                )
+            ).one()
+            if ti.state in State.finished:
+                self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                span.end(end_time=datetime_to_nano(ti.end_date))
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+            else:
+                span.end()
+                ti.set_span_status(status=SpanStatus.NEEDS_CONTINUANCE, 
session=session, with_commit=False)
+
+        self.active_dagrun_spans.clear()
+        self.active_ti_spans.clear()
+
+    @provide_session
+    def _end_spans_of_externally_ended_ops(self, session: Session = 
NEW_SESSION):
+        # The scheduler that starts a dag_run or a task is also the one that 
starts the spans.
+        # Each scheduler should end the spans that it has started.
+        #
+        # Otel spans are designed so that only the process that starts them,
+        # has full control over their lifecycle.
+        # This also means that the process that started them, is the only one 
that can end them.
+        #
+        # If another scheduler has finished processing a dag_run or a task and 
there is a reference
+        # on the active_spans dictionary, then the current scheduler started 
the span,
+        # and therefore must end it.
+        dag_runs_should_end: list[DagRun] = session.scalars(
+            select(DagRun).where(DagRun.span_status == SpanStatus.SHOULD_END)
+        ).all()
+        tis_should_end: list[TaskInstance] = session.scalars(
+            select(TaskInstance).where(TaskInstance.span_status == 
SpanStatus.SHOULD_END)
+        ).all()
+
+        for dag_run in dag_runs_should_end:
+            active_dagrun_span = self.active_dagrun_spans.get(dag_run.run_id)
+            if active_dagrun_span is not None:
+                if dag_run.state in State.finished_dr_states:
+                    dagv = 
session.scalar(select(DagVersion).where(DagVersion.id == 
dag_run.dag_version_id))
+                    DagRun.set_dagrun_span_attrs(span=active_dagrun_span, 
dag_run=dag_run, dagv=dagv)
+
+                    
active_dagrun_span.end(end_time=datetime_to_nano(dag_run.end_date))
+                else:
+                    active_dagrun_span.end()
+                self.active_dagrun_spans.delete(dag_run.run_id)
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+
+        for ti in tis_should_end:
+            active_ti_span = self.active_ti_spans.get(ti.key)
+            if active_ti_span is not None:
+                if ti.state in State.finished:
+                    self.set_ti_span_attrs(span=active_ti_span, 
state=ti.state, ti=ti)
+                    active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
+                else:
+                    active_ti_span.end()
+                self.active_ti_spans.delete(ti.key)
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+
+    @provide_session
+    def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, 
session: Session = NEW_SESSION):
+        scheduler_health_timeout = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+
+        # There are two scenarios:
+        #   1. scheduler is unhealthy but managed to update span_status
+        #   2. scheduler is unhealthy and didn't manage to make any updates
+        # Check the span_status first, in case the 2nd db query can be avoided 
(scenario 1).
+
+        # Get the latest values from the db.
+        dr: DagRun = session.scalars(
+            select(DagRun).where(
+                DagRun.run_id == dag_run.run_id,
+                DagRun.dag_id == dag_run.dag_id,
+            )
+        ).one()

Review Comment:
   > I don't think a refresh is needed. This is passed DagRun objects that come 
from DagRun.get_running_dag_runs_to_examine which gets rows via SELECT ... FOR 
UPDATE -- meaning while this transaction is active nothing else can update the 
rows.
   
   You are right! I'll fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to