xBis7 commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1883775535


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1029,6 +1048,168 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for run_id, span in self.active_dagrun_spans.get_all().items():
+            dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
+            if dag_run.state in State.finished_dr_states:
+                dagv = session.scalar(select(DagVersion).where(DagVersion.id 
== dag_run.dag_version_id))
+                DagRun.set_dagrun_span_attrs(span=span, dag_run=dag_run, 
dagv=dagv)
+
+                span.end(end_time=datetime_to_nano(dag_run.end_date))
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+            else:
+                span.end()
+                dag_run.set_span_status(
+                    status=SpanStatus.NEEDS_CONTINUANCE, session=session, 
with_commit=False
+                )
+                initial_dag_run_context = 
Trace.extract(dag_run.context_carrier)
+                with Trace.start_child_span(
+                    span_name="current_scheduler_exited", 
parent_context=initial_dag_run_context
+                ) as s:
+                    s.set_attribute("trace_status", "needs continuance")
+
+        for key, span in self.active_ti_spans.get_all().items():
+            # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+            ti: TaskInstance = session.scalars(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == key.dag_id,
+                    TaskInstance.task_id == key.task_id,
+                    TaskInstance.run_id == key.run_id,
+                )
+            ).one()
+            if ti.state in State.finished:
+                self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                span.end(end_time=datetime_to_nano(ti.end_date))
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+            else:
+                span.end()
+                ti.set_span_status(status=SpanStatus.NEEDS_CONTINUANCE, 
session=session, with_commit=False)
+
+        self.active_dagrun_spans.clear()
+        self.active_ti_spans.clear()
+
+    @provide_session
+    def _end_spans_of_externally_ended_ops(self, session: Session = 
NEW_SESSION):
+        # The scheduler that starts a dag_run or a task is also the one that 
starts the spans.
+        # Each scheduler should end the spans that it has started.
+        #
+        # Otel spans are designed so that only the process that starts them,
+        # has full control over their lifecycle.
+        # This also means that the process that started them, is the only one 
that can end them.
+        #
+        # If another scheduler has finished processing a dag_run or a task and 
there is a reference
+        # on the active_spans dictionary, then the current scheduler started 
the span,
+        # and therefore must end it.
+        dag_runs_should_end: list[DagRun] = session.scalars(
+            select(DagRun).where(DagRun.span_status == SpanStatus.SHOULD_END)
+        ).all()
+        tis_should_end: list[TaskInstance] = session.scalars(
+            select(TaskInstance).where(TaskInstance.span_status == 
SpanStatus.SHOULD_END)
+        ).all()
+
+        for dag_run in dag_runs_should_end:
+            active_dagrun_span = self.active_dagrun_spans.get(dag_run.run_id)
+            if active_dagrun_span is not None:
+                if dag_run.state in State.finished_dr_states:
+                    dagv = 
session.scalar(select(DagVersion).where(DagVersion.id == 
dag_run.dag_version_id))
+                    DagRun.set_dagrun_span_attrs(span=active_dagrun_span, 
dag_run=dag_run, dagv=dagv)
+
+                    
active_dagrun_span.end(end_time=datetime_to_nano(dag_run.end_date))
+                else:
+                    active_dagrun_span.end()
+                self.active_dagrun_spans.delete(dag_run.run_id)
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+
+        for ti in tis_should_end:
+            active_ti_span = self.active_ti_spans.get(ti.key)
+            if active_ti_span is not None:
+                if ti.state in State.finished:
+                    self.set_ti_span_attrs(span=active_ti_span, 
state=ti.state, ti=ti)
+                    active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
+                else:
+                    active_ti_span.end()
+                self.active_ti_spans.delete(ti.key)
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+
+    @provide_session
+    def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, 
session: Session = NEW_SESSION):
+        scheduler_health_timeout = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+
+        # There are two scenarios:
+        #   1. scheduler is unhealthy but managed to update span_status
+        #   2. scheduler is unhealthy and didn't manage to make any updates
+        # Check the span_status first, in case the 2nd db query can be avoided 
(scenario 1).
+
+        # Get the latest values from the db.
+        dr: DagRun = session.scalars(
+            select(DagRun).where(
+                DagRun.run_id == dag_run.run_id,
+                DagRun.dag_id == dag_run.dag_id,
+            )
+        ).one()
+
+        # If the dag_run is scheduled by a different scheduler, and it's still 
running and the span is active,
+        # then check the Job table to determine if the initial scheduler is 
still healthy.
+        if (
+            dr.scheduled_by_job_id != self.job.id
+            and dr.state in State.unfinished_dr_states
+            and dr.span_status == SpanStatus.ACTIVE
+        ):
+            job: Job = session.scalars(
+                select(Job).where(
+                    Job.id == dr.scheduled_by_job_id,
+                    Job.job_type == "SchedulerJob",
+                )
+            ).one()
+
+            # If the time passed since the last heartbeat is less than the 
timeout.
+            is_healthy = scheduler_health_timeout > (timezone.utcnow() - 
job.latest_heartbeat).total_seconds()

Review Comment:
   Thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to