ashb commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r2026525706


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -965,6 +993,154 @@ def _update_dag_run_state_for_paused_dags(self, session: 
Session = NEW_SESSION)
         except Exception as e:  # should not fail the scheduler
             self.log.exception("Failed to update dag run state for paused dags 
due to %s", e)
 
+    @provide_session
+    def _end_active_spans(self, session: Session = NEW_SESSION):
+        # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
+        for key, span in self.active_spans.get_all().items():
+            from airflow.models.taskinstance import TaskInstanceKey
+
+            if isinstance(key, TaskInstanceKey):  # ti span.
+                # Can't compare the key directly because the try_number or the 
map_index might not be the same.
+                ti: TaskInstance = session.scalars(
+                    select(TaskInstance).where(
+                        TaskInstance.dag_id == key.dag_id,
+                        TaskInstance.task_id == key.task_id,
+                        TaskInstance.run_id == key.run_id,
+                    )
+                ).one()
+                if ti.state in State.finished:
+                    self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
+                    span.end(end_time=datetime_to_nano(ti.end_date))
+                    ti.span_status = SpanStatus.ENDED
+                else:
+                    span.end()
+                    ti.span_status = SpanStatus.NEEDS_CONTINUANCE

Review Comment:
   Follow up work: This should probably look at ti.try_id instead of 
TaskInstanceKey



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to