xBis7 commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1883734706


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1029,6 +1048,168 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for run_id, span in self.active_dagrun_spans.get_all().items():
+            dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one()
+            if dag_run.state in State.finished_dr_states:
+                dagv = session.scalar(select(DagVersion).where(DagVersion.id 
== dag_run.dag_version_id))
+                DagRun.set_dagrun_span_attrs(span=span, dag_run=dag_run, 
dagv=dagv)
+
+                span.end(end_time=datetime_to_nano(dag_run.end_date))
+                dag_run.set_span_status(status=SpanStatus.ENDED, 
session=session, with_commit=False)
+            else:
+                span.end()
+                dag_run.set_span_status(
+                    status=SpanStatus.NEEDS_CONTINUANCE, session=session, 
with_commit=False
+                )
+                initial_dag_run_context = 
Trace.extract(dag_run.context_carrier)
+                with Trace.start_child_span(
+                    span_name="current_scheduler_exited", 
parent_context=initial_dag_run_context
+                ) as s:
+                    s.set_attribute("trace_status", "needs continuance")
+
+        for key, span in self.active_ti_spans.get_all().items():
+            # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+            ti: TaskInstance = session.scalars(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == key.dag_id,
+                    TaskInstance.task_id == key.task_id,
+                    TaskInstance.run_id == key.run_id,
+                )
+            ).one()
+            if ti.state in State.finished:
+                self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                span.end(end_time=datetime_to_nano(ti.end_date))
+                ti.set_span_status(status=SpanStatus.ENDED, session=session, 
with_commit=False)
+            else:
+                span.end()
+                ti.set_span_status(status=SpanStatus.NEEDS_CONTINUANCE, 
session=session, with_commit=False)
+
+        self.active_dagrun_spans.clear()
+        self.active_ti_spans.clear()
+
+    @provide_session
+    def _end_spans_of_externally_ended_ops(self, session: Session = 
NEW_SESSION):
+        # The scheduler that starts a dag_run or a task is also the one that 
starts the spans.
+        # Each scheduler should end the spans that it has started.
+        #
+        # Otel spans are designed so that only the process that starts them,
+        # has full control over their lifecycle.

Review Comment:
   Yes, but the way that they are implemented, this is forced. Let me rephrase 
the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to