ashb commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1881690845


##########
airflow/cli/cli_config.py:
##########
@@ -551,6 +551,9 @@ def string_lower_type(val):
     action="store_true",
 )
 ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
+ARG_CARRIER = Arg(
+    ("-c", "--carrier"), help="Context Carrier, containing the injected 
context for the task span", nargs="?"

Review Comment:
   ```suggestion
       ("-c", "--carrier"), help="Context Carrier, containing the injected 
context for the OTel task span", nargs="?"
   ```
   
   And shouldn't this be `nargs=1`?



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -170,6 +172,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
     job_type = "SchedulerJob"
 
+    # key: dag_run.run_id | value: span
+    active_dagrun_spans = ThreadSafeDict()
+    # key: ti.key | value: span
+    active_ti_spans = ThreadSafeDict()

Review Comment:
   Do we need/is it worth having two dicts here? The keys are disjoint between 
TI and DagRun so we could have one dict of `active_spans`



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1029,6 +1048,168 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for run_id, span in self.active_dagrun_spans.get_all().items():
+            dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
+            if dag_run.state in State.finished_dr_states:
+                dagv = session.scalar(select(DagVersion).where(DagVersion.id 
== dag_run.dag_version_id))
+                DagRun.set_dagrun_span_attrs(span=span, dag_run=dag_run, 
dagv=dagv)
+
+                span.end(end_time=datetime_to_nano(dag_run.end_date))
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+            else:
+                span.end()
+                dag_run.set_span_status(
+                    status=SpanStatus.NEEDS_CONTINUANCE, session=session, 
with_commit=False
+                )
+                initial_dag_run_context = 
Trace.extract(dag_run.context_carrier)
+                with Trace.start_child_span(
+                    span_name="current_scheduler_exited", 
parent_context=initial_dag_run_context
+                ) as s:
+                    s.set_attribute("trace_status", "needs continuance")
+
+        for key, span in self.active_ti_spans.get_all().items():
+            # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+            ti: TaskInstance = session.scalars(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == key.dag_id,
+                    TaskInstance.task_id == key.task_id,
+                    TaskInstance.run_id == key.run_id,
+                )
+            ).one()
+            if ti.state in State.finished:
+                self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                span.end(end_time=datetime_to_nano(ti.end_date))
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+            else:
+                span.end()
+                ti.set_span_status(status=SpanStatus.NEEDS_CONTINUANCE, 
session=session, with_commit=False)
+
+        self.active_dagrun_spans.clear()
+        self.active_ti_spans.clear()
+
+    @provide_session
+    def _end_spans_of_externally_ended_ops(self, session: Session = 
NEW_SESSION):
+        # The scheduler that starts a dag_run or a task is also the one that 
starts the spans.
+        # Each scheduler should end the spans that it has started.
+        #
+        # Otel spans are designed so that only the process that starts them,
+        # has full control over their lifecycle.

Review Comment:
   I'm not sure this is really true. It's an implementation choice of the otel 
library, but it is not an inherent property of the OTel protocol.



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1029,6 +1048,168 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for run_id, span in self.active_dagrun_spans.get_all().items():
+            dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
+            if dag_run.state in State.finished_dr_states:
+                dagv = session.scalar(select(DagVersion).where(DagVersion.id 
== dag_run.dag_version_id))
+                DagRun.set_dagrun_span_attrs(span=span, dag_run=dag_run, 
dagv=dagv)
+
+                span.end(end_time=datetime_to_nano(dag_run.end_date))
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+            else:
+                span.end()
+                dag_run.set_span_status(
+                    status=SpanStatus.NEEDS_CONTINUANCE, session=session, 
with_commit=False
+                )
+                initial_dag_run_context = 
Trace.extract(dag_run.context_carrier)
+                with Trace.start_child_span(
+                    span_name="current_scheduler_exited", 
parent_context=initial_dag_run_context
+                ) as s:
+                    s.set_attribute("trace_status", "needs continuance")
+
+        for key, span in self.active_ti_spans.get_all().items():
+            # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+            ti: TaskInstance = session.scalars(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == key.dag_id,
+                    TaskInstance.task_id == key.task_id,
+                    TaskInstance.run_id == key.run_id,
+                )
+            ).one()
+            if ti.state in State.finished:
+                self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                span.end(end_time=datetime_to_nano(ti.end_date))
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+            else:
+                span.end()
+                ti.set_span_status(status=SpanStatus.NEEDS_CONTINUANCE, 
session=session, with_commit=False)
+
+        self.active_dagrun_spans.clear()
+        self.active_ti_spans.clear()
+
+    @provide_session
+    def _end_spans_of_externally_ended_ops(self, session: Session = 
NEW_SESSION):
+        # The scheduler that starts a dag_run or a task is also the one that 
starts the spans.
+        # Each scheduler should end the spans that it has started.
+        #
+        # Otel spans are designed so that only the process that starts them,
+        # has full control over their lifecycle.
+        # This also means that the process that started them, is the only one 
that can end them.
+        #
+        # If another scheduler has finished processing a dag_run or a task and 
there is a reference
+        # on the active_spans dictionary, then the current scheduler started 
the span,
+        # and therefore must end it.
+        dag_runs_should_end: list[DagRun] = session.scalars(
+            select(DagRun).where(DagRun.span_status == SpanStatus.SHOULD_END)
+        ).all()
+        tis_should_end: list[TaskInstance] = session.scalars(
+            select(TaskInstance).where(TaskInstance.span_status == 
SpanStatus.SHOULD_END)
+        ).all()
+
+        for dag_run in dag_runs_should_end:
+            active_dagrun_span = self.active_dagrun_spans.get(dag_run.run_id)
+            if active_dagrun_span is not None:
+                if dag_run.state in State.finished_dr_states:
+                    dagv = 
session.scalar(select(DagVersion).where(DagVersion.id == 
dag_run.dag_version_id))
+                    DagRun.set_dagrun_span_attrs(span=active_dagrun_span, 
dag_run=dag_run, dagv=dagv)
+
+                    
active_dagrun_span.end(end_time=datetime_to_nano(dag_run.end_date))
+                else:
+                    active_dagrun_span.end()
+                self.active_dagrun_spans.delete(dag_run.run_id)
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+
+        for ti in tis_should_end:
+            active_ti_span = self.active_ti_spans.get(ti.key)
+            if active_ti_span is not None:
+                if ti.state in State.finished:
+                    self.set_ti_span_attrs(span=active_ti_span, 
state=ti.state, ti=ti)
+                    active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
+                else:
+                    active_ti_span.end()
+                self.active_ti_spans.delete(ti.key)
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+
+    @provide_session
+    def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, 
session: Session = NEW_SESSION):
+        scheduler_health_timeout = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+
+        # There are two scenarios:
+        #   1. scheduler is unhealthy but managed to update span_status
+        #   2. scheduler is unhealthy and didn't manage to make any updates
+        # Check the span_status first, in case the 2nd db query can be avoided 
(scenario 1).
+
+        # Get the latest values from the db.
+        dr: DagRun = session.scalars(
+            select(DagRun).where(
+                DagRun.run_id == dag_run.run_id,
+                DagRun.dag_id == dag_run.dag_id,
+            )
+        ).one()
+
+        # If the dag_run is scheduled by a different scheduler, and it's still 
running and the span is active,
+        # then check the Job table to determine if the initial scheduler is 
still healthy.
+        if (
+            dr.scheduled_by_job_id != self.job.id
+            and dr.state in State.unfinished_dr_states
+            and dr.span_status == SpanStatus.ACTIVE
+        ):
+            job: Job = session.scalars(
+                select(Job).where(
+                    Job.id == dr.scheduled_by_job_id,
+                    Job.job_type == "SchedulerJob",
+                )
+            ).one()
+
+            # If the time passed since the last heartbeat is less than the 
timeout.
+            is_healthy = scheduler_health_timeout > (timezone.utcnow() - 
job.latest_heartbeat).total_seconds()

Review Comment:
   There is a method on `Job` to do this -- `is_alive()`, please use that 
rather than duplicating the logic.



##########
airflow/executors/base_executor.py:
##########
@@ -337,6 +345,35 @@ def trigger_tasks(self, open_slots: int) -> None:
         for _ in range(min((open_slots, len(self.queued_tasks)))):
             key, (command, _, queue, ti) = sorted_queue.pop(0)
 
+            # If it's None, then the span for the current TaskInstanceKey 
hasn't been started.
+            if self.active_spans is not None and self.active_spans.get(key) is 
None:
+                from airflow.models.taskinstance import SimpleTaskInstance
+
+                if isinstance(ti, SimpleTaskInstance):
+                    parent_context = Trace.extract(ti.parent_context_carrier)
+                else:
+                    parent_context = Trace.extract(ti.dag_run.context_carrier)
+                # Start a new span using the context from the parent.
+                # Attributes will be set once the task has finished so that all
+                # values will be available (end_time, duration, etc.).
+                span = Trace.start_child_span(
+                    span_name=f"{ti.task_id}",
+                    parent_context=parent_context,
+                    component="task",
+                    start_time=ti.queued_dttm,
+                    start_as_current=False,
+                )
+                self.active_spans.set(key, span)
+                # Inject the current context into the carrier.
+                carrier = Trace.inject()
+                # The carrier needs to be set on the ti, but it can't happen 
here because db calls are expensive.
+                # By the time the db update has finished, another heartbeat 
will have started
+                # and the tasks will have been triggered again.

Review Comment:
   Yes, db calls are expensive, but the scheduler is single threaded and 
non-async, so I don't think this part of the comment is accurate.



##########
airflow/models/dagrun.py:
##########
@@ -1088,6 +1237,123 @@ def _filter_tis_and_exclude_removed(dag: DAG, tis: 
list[TI]) -> Iterable[TI]:
             finished_tis=finished_tis,
         )
 
+    @staticmethod
+    def _set_scheduled_by_job_id(dag_run: DagRun, job_id: int, session: 
Session, with_commit: bool) -> bool:
+        if not isinstance(dag_run, DagRun):
+            dag_run = session.scalars(
+                select(DagRun).where(
+                    DagRun.dag_id == dag_run.dag_id,
+                    DagRun.run_id == dag_run.run_id,
+                )
+            ).one()
+
+        if dag_run.scheduled_by_job_id == job_id:
+            return False
+
+        dag_run.log.debug("Setting dag_run scheduled_by_job_id for run_id: 
%s", dag_run.run_id)
+        dag_run.scheduled_by_job_id = job_id
+
+        session.merge(dag_run)
+
+        if with_commit:
+            session.commit()
+
+        return True
+
+    @provide_session
+    def set_scheduled_by_job_id(
+        self, job_id: int, session: Session = NEW_SESSION, with_commit: bool = 
False
+    ) -> bool:
+        """
+        Set DagRun scheduled_by_job_id.
+
+        :param job_id: integer with the scheduled_by_job_id to set for the 
dag_run
+        :param session: SQLAlchemy ORM Session
+        :param with_commit: should the scheduled_by_job_id be committed?
+        :return: has the scheduled_by_job_id been changed?
+        """
+        return self._set_scheduled_by_job_id(
+            dag_run=self, job_id=job_id, session=session, 
with_commit=with_commit
+        )
+
+    @staticmethod
+    def _set_context_carrier(
+        dag_run: DagRun, context_carrier: dict, session: Session, with_commit: 
bool
+    ) -> bool:

Review Comment:
   Two comments here:
   
   1. this takes the object as the first argument, just make it a normal 
function, but bigger
   2. I don't think this or the calling function need to exist, just do 
`dag_run.context_carrier = context_carrier` two levels up from here, and let 
SQLA do it's job. 



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1029,6 +1048,168 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for run_id, span in self.active_dagrun_spans.get_all().items():
+            dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
+            if dag_run.state in State.finished_dr_states:
+                dagv = session.scalar(select(DagVersion).where(DagVersion.id 
== dag_run.dag_version_id))
+                DagRun.set_dagrun_span_attrs(span=span, dag_run=dag_run, 
dagv=dagv)
+
+                span.end(end_time=datetime_to_nano(dag_run.end_date))
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+            else:
+                span.end()
+                dag_run.set_span_status(
+                    status=SpanStatus.NEEDS_CONTINUANCE, session=session, 
with_commit=False
+                )
+                initial_dag_run_context = 
Trace.extract(dag_run.context_carrier)
+                with Trace.start_child_span(
+                    span_name="current_scheduler_exited", 
parent_context=initial_dag_run_context
+                ) as s:
+                    s.set_attribute("trace_status", "needs continuance")
+
+        for key, span in self.active_ti_spans.get_all().items():
+            # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+            ti: TaskInstance = session.scalars(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == key.dag_id,
+                    TaskInstance.task_id == key.task_id,
+                    TaskInstance.run_id == key.run_id,
+                )
+            ).one()
+            if ti.state in State.finished:
+                self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                span.end(end_time=datetime_to_nano(ti.end_date))
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+            else:
+                span.end()
+                ti.set_span_status(status=SpanStatus.NEEDS_CONTINUANCE, 
session=session, with_commit=False)
+
+        self.active_dagrun_spans.clear()
+        self.active_ti_spans.clear()
+
+    @provide_session
+    def _end_spans_of_externally_ended_ops(self, session: Session = 
NEW_SESSION):
+        # The scheduler that starts a dag_run or a task is also the one that 
starts the spans.
+        # Each scheduler should end the spans that it has started.
+        #
+        # Otel spans are designed so that only the process that starts them,
+        # has full control over their lifecycle.
+        # This also means that the process that started them, is the only one 
that can end them.
+        #
+        # If another scheduler has finished processing a dag_run or a task and 
there is a reference
+        # on the active_spans dictionary, then the current scheduler started 
the span,
+        # and therefore must end it.
+        dag_runs_should_end: list[DagRun] = session.scalars(
+            select(DagRun).where(DagRun.span_status == SpanStatus.SHOULD_END)
+        ).all()
+        tis_should_end: list[TaskInstance] = session.scalars(
+            select(TaskInstance).where(TaskInstance.span_status == 
SpanStatus.SHOULD_END)
+        ).all()
+
+        for dag_run in dag_runs_should_end:
+            active_dagrun_span = self.active_dagrun_spans.get(dag_run.run_id)
+            if active_dagrun_span is not None:
+                if dag_run.state in State.finished_dr_states:
+                    dagv = 
session.scalar(select(DagVersion).where(DagVersion.id == 
dag_run.dag_version_id))
+                    DagRun.set_dagrun_span_attrs(span=active_dagrun_span, 
dag_run=dag_run, dagv=dagv)
+
+                    
active_dagrun_span.end(end_time=datetime_to_nano(dag_run.end_date))
+                else:
+                    active_dagrun_span.end()
+                self.active_dagrun_spans.delete(dag_run.run_id)
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+
+        for ti in tis_should_end:
+            active_ti_span = self.active_ti_spans.get(ti.key)
+            if active_ti_span is not None:
+                if ti.state in State.finished:
+                    self.set_ti_span_attrs(span=active_ti_span, 
state=ti.state, ti=ti)
+                    active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
+                else:
+                    active_ti_span.end()
+                self.active_ti_spans.delete(ti.key)
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+
+    @provide_session
+    def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, 
session: Session = NEW_SESSION):

Review Comment:
   If we can re-create spans for an unhealthy scheduler in the case that it's 
gone away, couldn't we always do this?
   
   That would mean we wouldn't have to add extra DB queries or columns I think, 
and not have to have the `active_spans` dicts either?



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1029,6 +1048,168 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for run_id, span in self.active_dagrun_spans.get_all().items():
+            dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
+            if dag_run.state in State.finished_dr_states:
+                dagv = session.scalar(select(DagVersion).where(DagVersion.id 
== dag_run.dag_version_id))
+                DagRun.set_dagrun_span_attrs(span=span, dag_run=dag_run, 
dagv=dagv)
+
+                span.end(end_time=datetime_to_nano(dag_run.end_date))
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+            else:
+                span.end()
+                dag_run.set_span_status(
+                    status=SpanStatus.NEEDS_CONTINUANCE, session=session, 
with_commit=False
+                )
+                initial_dag_run_context = 
Trace.extract(dag_run.context_carrier)
+                with Trace.start_child_span(
+                    span_name="current_scheduler_exited", 
parent_context=initial_dag_run_context
+                ) as s:
+                    s.set_attribute("trace_status", "needs continuance")
+
+        for key, span in self.active_ti_spans.get_all().items():
+            # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+            ti: TaskInstance = session.scalars(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == key.dag_id,
+                    TaskInstance.task_id == key.task_id,
+                    TaskInstance.run_id == key.run_id,
+                )
+            ).one()
+            if ti.state in State.finished:
+                self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                span.end(end_time=datetime_to_nano(ti.end_date))
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+            else:
+                span.end()
+                ti.set_span_status(status=SpanStatus.NEEDS_CONTINUANCE, 
session=session, with_commit=False)
+
+        self.active_dagrun_spans.clear()
+        self.active_ti_spans.clear()
+
+    @provide_session
+    def _end_spans_of_externally_ended_ops(self, session: Session = 
NEW_SESSION):
+        # The scheduler that starts a dag_run or a task is also the one that 
starts the spans.
+        # Each scheduler should end the spans that it has started.
+        #
+        # Otel spans are designed so that only the process that starts them,
+        # has full control over their lifecycle.
+        # This also means that the process that started them, is the only one 
that can end them.
+        #
+        # If another scheduler has finished processing a dag_run or a task and 
there is a reference
+        # on the active_spans dictionary, then the current scheduler started 
the span,
+        # and therefore must end it.
+        dag_runs_should_end: list[DagRun] = session.scalars(
+            select(DagRun).where(DagRun.span_status == SpanStatus.SHOULD_END)
+        ).all()
+        tis_should_end: list[TaskInstance] = session.scalars(
+            select(TaskInstance).where(TaskInstance.span_status == 
SpanStatus.SHOULD_END)
+        ).all()
+
+        for dag_run in dag_runs_should_end:
+            active_dagrun_span = self.active_dagrun_spans.get(dag_run.run_id)
+            if active_dagrun_span is not None:
+                if dag_run.state in State.finished_dr_states:
+                    dagv = 
session.scalar(select(DagVersion).where(DagVersion.id == 
dag_run.dag_version_id))
+                    DagRun.set_dagrun_span_attrs(span=active_dagrun_span, 
dag_run=dag_run, dagv=dagv)
+
+                    
active_dagrun_span.end(end_time=datetime_to_nano(dag_run.end_date))
+                else:
+                    active_dagrun_span.end()
+                self.active_dagrun_spans.delete(dag_run.run_id)
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+
+        for ti in tis_should_end:
+            active_ti_span = self.active_ti_spans.get(ti.key)
+            if active_ti_span is not None:
+                if ti.state in State.finished:
+                    self.set_ti_span_attrs(span=active_ti_span, 
state=ti.state, ti=ti)
+                    active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
+                else:
+                    active_ti_span.end()
+                self.active_ti_spans.delete(ti.key)
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+
+    @provide_session
+    def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, 
session: Session = NEW_SESSION):
+        scheduler_health_timeout = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+
+        # There are two scenarios:
+        #   1. scheduler is unhealthy but managed to update span_status
+        #   2. scheduler is unhealthy and didn't manage to make any updates
+        # Check the span_status first, in case the 2nd db query can be avoided 
(scenario 1).
+
+        # Get the latest values from the db.
+        dr: DagRun = session.scalars(
+            select(DagRun).where(
+                DagRun.run_id == dag_run.run_id,
+                DagRun.dag_id == dag_run.dag_id,
+            )
+        ).one()

Review Comment:
   I don't think a refresh is needed. This is passed DagRun objects that come 
from `DagRun.get_running_dag_runs_to_examine` which gets rows via `SELECT ... 
FOR UPDATE` -- meaning while this transaction is active nothing else can update 
the rows.
   
   (Removing this refresh also means the condition just below and at the 
callsite can be combined)



##########
airflow/models/taskinstance.py:
##########
@@ -3731,6 +3814,12 @@ def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
             key=ti.key,
             run_as_user=ti.run_as_user if hasattr(ti, "run_as_user") else None,
             priority_weight=ti.priority_weight if hasattr(ti, 
"priority_weight") else None,
+            # Inspect the ti, to check if the 'dag_run' relationship is loaded.
+            parent_context_carrier=ti.dag_run.context_carrier
+            if "dag_run" not in inspect(ti).unloaded
+            else None,
+            context_carrier=ti.context_carrier if hasattr(ti, 
"context_carrier") else None,
+            span_status=ti.span_status if hasattr(ti, "span_status") else None,

Review Comment:
   `ti` will always have this attribute won't it?



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1113,6 +1294,20 @@ def _run_scheduler_loop(self) -> None:
                     self.log.debug("Waiting for processors to finish since 
we're using sqlite")
                     self.processor_agent.wait_until_finished()
 
+                # This is using a new session.
+                self._end_spans_of_externally_ended_ops()
+
+                # Pass a reference to the dictionary.
+                # Any changes made by a dag_run instance, will be reflected to 
the dictionaries of this class.
+                DagRun.set_active_spans(
+                    active_dagrun_spans=self.active_dagrun_spans, 
active_ti_spans=self.active_ti_spans
+                )
+
+                # local import due to type_checking.
+                from airflow.executors.base_executor import BaseExecutor
+
+                
BaseExecutor.set_active_spans(active_spans=self.active_ti_spans)

Review Comment:
   Does this need to be done every time around the loop?



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1751,6 +1946,19 @@ def _schedule_dag_run(
                     "The DAG disappeared before verifying integrity: %s. 
Skipping.", dag_run.dag_id
                 )
                 return callback
+
+            if (
+                dag_run.scheduled_by_job_id is not None
+                and dag_run.set_scheduled_by_job_id != self.job.id

Review Comment:
   ```suggestion
                   dag_run.set_scheduled_by_job_id != self.job.id
   ```
   
   Don't think we need the `not None` check. 



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1113,6 +1294,20 @@ def _run_scheduler_loop(self) -> None:
                     self.log.debug("Waiting for processors to finish since 
we're using sqlite")
                     self.processor_agent.wait_until_finished()
 
+                # This is using a new session.
+                self._end_spans_of_externally_ended_ops()

Review Comment:
   I would rather we didn't use `@provide_session` here and were explicit about 
managing and passing in a session



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1751,6 +1946,19 @@ def _schedule_dag_run(
                     "The DAG disappeared before verifying integrity: %s. 
Skipping.", dag_run.dag_id
                 )
                 return callback
+
+            if (
+                dag_run.scheduled_by_job_id is not None
+                and dag_run.set_scheduled_by_job_id != self.job.id
+                and self.active_dagrun_spans.get(dag_run.run_id) is None
+            ):
+                # If the dag_run has been previously scheduled by another job 
and there is no active span,
+                # then check if the job is still healthy.
+                # If it's not healthy, then recreate the spans.
+                self._recreate_unhealthy_scheduler_spans_if_needed(dag_run, 
session)
+
+            dag_run.set_scheduled_by_job_id(job_id=self.job.id, 
session=session, with_commit=False)

Review Comment:
   Why are we resetting the scheduled_by here?



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1029,6 +1048,168 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for run_id, span in self.active_dagrun_spans.get_all().items():
+            dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
+            if dag_run.state in State.finished_dr_states:
+                dagv = session.scalar(select(DagVersion).where(DagVersion.id 
== dag_run.dag_version_id))
+                DagRun.set_dagrun_span_attrs(span=span, dag_run=dag_run, 
dagv=dagv)
+
+                span.end(end_time=datetime_to_nano(dag_run.end_date))
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+            else:
+                span.end()
+                dag_run.set_span_status(
+                    status=SpanStatus.NEEDS_CONTINUANCE, session=session, 
with_commit=False
+                )
+                initial_dag_run_context = 
Trace.extract(dag_run.context_carrier)
+                with Trace.start_child_span(
+                    span_name="current_scheduler_exited", 
parent_context=initial_dag_run_context
+                ) as s:
+                    s.set_attribute("trace_status", "needs continuance")
+
+        for key, span in self.active_ti_spans.get_all().items():
+            # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+            ti: TaskInstance = session.scalars(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == key.dag_id,
+                    TaskInstance.task_id == key.task_id,
+                    TaskInstance.run_id == key.run_id,
+                )
+            ).one()
+            if ti.state in State.finished:
+                self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                span.end(end_time=datetime_to_nano(ti.end_date))
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+            else:
+                span.end()
+                ti.set_span_status(status=SpanStatus.NEEDS_CONTINUANCE, 
session=session, with_commit=False)
+
+        self.active_dagrun_spans.clear()
+        self.active_ti_spans.clear()
+
+    @provide_session
+    def _end_spans_of_externally_ended_ops(self, session: Session = 
NEW_SESSION):
+        # The scheduler that starts a dag_run or a task is also the one that 
starts the spans.
+        # Each scheduler should end the spans that it has started.
+        #
+        # Otel spans are designed so that only the process that starts them,
+        # has full control over their lifecycle.
+        # This also means that the process that started them, is the only one 
that can end them.
+        #
+        # If another scheduler has finished processing a dag_run or a task and 
there is a reference
+        # on the active_spans dictionary, then the current scheduler started 
the span,
+        # and therefore must end it.
+        dag_runs_should_end: list[DagRun] = session.scalars(
+            select(DagRun).where(DagRun.span_status == SpanStatus.SHOULD_END)
+        ).all()
+        tis_should_end: list[TaskInstance] = session.scalars(
+            select(TaskInstance).where(TaskInstance.span_status == 
SpanStatus.SHOULD_END)
+        ).all()
+
+        for dag_run in dag_runs_should_end:
+            active_dagrun_span = self.active_dagrun_spans.get(dag_run.run_id)
+            if active_dagrun_span is not None:
+                if dag_run.state in State.finished_dr_states:
+                    dagv = 
session.scalar(select(DagVersion).where(DagVersion.id == 
dag_run.dag_version_id))
+                    DagRun.set_dagrun_span_attrs(span=active_dagrun_span, 
dag_run=dag_run, dagv=dagv)
+
+                    
active_dagrun_span.end(end_time=datetime_to_nano(dag_run.end_date))
+                else:
+                    active_dagrun_span.end()
+                self.active_dagrun_spans.delete(dag_run.run_id)
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+
+        for ti in tis_should_end:
+            active_ti_span = self.active_ti_spans.get(ti.key)
+            if active_ti_span is not None:
+                if ti.state in State.finished:
+                    self.set_ti_span_attrs(span=active_ti_span, 
state=ti.state, ti=ti)
+                    active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
+                else:
+                    active_ti_span.end()
+                self.active_ti_spans.delete(ti.key)
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+
+    @provide_session
+    def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, 
session: Session = NEW_SESSION):
+        scheduler_health_timeout = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+
+        # There are two scenarios:
+        #   1. scheduler is unhealthy but managed to update span_status
+        #   2. scheduler is unhealthy and didn't manage to make any updates
+        # Check the span_status first, in case the 2nd db query can be avoided 
(scenario 1).
+
+        # Get the latest values from the db.
+        dr: DagRun = session.scalars(
+            select(DagRun).where(
+                DagRun.run_id == dag_run.run_id,
+                DagRun.dag_id == dag_run.dag_id,
+            )
+        ).one()

Review Comment:
   We "just" fetched this. If you really want to get the latest values (which 
I'm not sure it is worth it) then
   
   ```suggestion
           # Get the latest values from the db.
           session.refresh(dag_run)
   ```



##########
airflow/models/dagrun.py:
##########
@@ -829,6 +862,50 @@ def is_effective_leaf(task):
         leaf_tis = {ti for ti in tis if ti.task_id in leaf_task_ids if 
ti.state != TaskInstanceState.REMOVED}
         return leaf_tis
 
+    @staticmethod
+    def set_dagrun_span_attrs(span: Span | EmptySpan, dag_run: DagRun, dagv: 
DagVersion):
+        if dag_run._state == DagRunState.FAILED:

Review Comment:
   Given this has a dag run object passed in why isn't it a normal method:
   
   ```suggestion
       def set_dagrun_span_attrs(self, span: Span | EmptySpan, dagv: 
DagVersion):
           if self._state == DagRunState.FAILED:
   ```



##########
airflow/models/taskinstance.py:
##########
@@ -992,7 +1004,11 @@ def get_triggering_events() -> dict[str, list[AssetEvent 
| AssetEventPydantic]]:
         # Re-attach it if we get called.
         nonlocal dag_run
         if dag_run not in session:
-            dag_run = session.merge(dag_run, load=False)
+            # In case, refresh_from_db has also included the dag_run,
+            # the object will be considered dirty by the session.
+            # Trying to merge the dirty dag_run with load=False, will result 
to an SQLAlchemy error.

Review Comment:
   What was the error? Ideally we shouldn't need to change this



##########
airflow/models/taskinstance.py:
##########
@@ -856,7 +861,14 @@ def _refresh_from_db(
     )
 
     if ti:
-        _set_ti_attrs(task_instance, ti, include_dag_run=False)
+        inspector = inspect(ti)
+        # Check if the ti is detached or the dag_run relationship isn't loaded.
+        # If the scheduler that started the dag_run has exited (gracefully or 
forcefully),
+        # there will be changes to the dag_run span context_carrier.
+        # It's best to include the dag_run whenever possible, so that the ti 
will contain the updates.
+        include_dag_run = not inspector.detached and "dag_run" not in 
inspector.unloaded
+        log.debug("Unloaded: %s", inspector.unloaded)

Review Comment:
   This has no side-effect does it? It's just a debug log
   
   
   ```suggestion
   ```



##########
airflow/models/taskinstancehistory.py:
##########
@@ -83,6 +84,8 @@ class TaskInstanceHistory(Base):
     executor_config = Column(ExecutorConfigType(pickler=dill))
     updated_at = Column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow)
     rendered_map_index = Column(String(250))
+    context_carrier = Column(MutableDict.as_mutable(ExtendedJSON))
+    span_status = Column(String(50), default=SpanStatus.NOT_STARTED)

Review Comment:
   We don't need these on TIHistory -- rows only make it here once they are 
finished/done, so they will never have a span about them.



##########
airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py:
##########
@@ -236,6 +236,9 @@ def upgrade():
                 log_template_id INTEGER,
                 updated_at TIMESTAMP,
                 clear_number INTEGER DEFAULT '0' NOT NULL,
+                scheduled_by_job_id INTEGER,
+                context_carrier VARCHAR(50),
+                span_status VARCHAR(50),

Review Comment:
   If we need to add new columns then we need to add a new migration file, not 
edit an existing one.
   
   Is 50 chars enough for the context_carrier? It seems on the short side.



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1751,6 +1946,19 @@ def _schedule_dag_run(
                     "The DAG disappeared before verifying integrity: %s. 
Skipping.", dag_run.dag_id
                 )
                 return callback
+
+            if (
+                dag_run.scheduled_by_job_id is not None
+                and dag_run.set_scheduled_by_job_id != self.job.id

Review Comment:
   Also: `set_scheduled_by_job_id` is a function, not a property. I guess that 
means this isn't covered by unit tests?
   
   ```suggestion
                   dag_run.scheduled_by_job_id != self.job.id
   ```



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1751,6 +1946,19 @@ def _schedule_dag_run(
                     "The DAG disappeared before verifying integrity: %s. 
Skipping.", dag_run.dag_id
                 )
                 return callback
+
+            if (
+                dag_run.scheduled_by_job_id is not None
+                and dag_run.set_scheduled_by_job_id != self.job.id
+                and self.active_dagrun_spans.get(dag_run.run_id) is None
+            ):
+                # If the dag_run has been previously scheduled by another job 
and there is no active span,
+                # then check if the job is still healthy.
+                # If it's not healthy, then recreate the spans.
+                self._recreate_unhealthy_scheduler_spans_if_needed(dag_run, 
session)
+
+            dag_run.set_scheduled_by_job_id(job_id=self.job.id, 
session=session, with_commit=False)

Review Comment:
   Also: this object is locked with a FOR UPDATE, we don't need a method to do 
this:
   
   
   ```suggestion
               dag_run.sscheduled_by_job_id = self.job.id
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to