xBis7 commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1883671989


##########
airflow/executors/base_executor.py:
##########
@@ -337,6 +345,35 @@ def trigger_tasks(self, open_slots: int) -> None:
         for _ in range(min((open_slots, len(self.queued_tasks)))):
             key, (command, _, queue, ti) = sorted_queue.pop(0)
 
+            # If it's None, then the span for the current TaskInstanceKey 
hasn't been started.
+            if self.active_spans is not None and self.active_spans.get(key) is 
None:
+                from airflow.models.taskinstance import SimpleTaskInstance
+
+                if isinstance(ti, SimpleTaskInstance):
+                    parent_context = Trace.extract(ti.parent_context_carrier)
+                else:
+                    parent_context = Trace.extract(ti.dag_run.context_carrier)
+                # Start a new span using the context from the parent.
+                # Attributes will be set once the task has finished so that all
+                # values will be available (end_time, duration, etc.).
+                span = Trace.start_child_span(
+                    span_name=f"{ti.task_id}",
+                    parent_context=parent_context,
+                    component="task",
+                    start_time=ti.queued_dttm,
+                    start_as_current=False,
+                )
+                self.active_spans.set(key, span)
+                # Inject the current context into the carrier.
+                carrier = Trace.inject()
+                # The carrier needs to be set on the ti, but it can't happen 
here because db calls are expensive.
+                # By the time the db update has finished, another heartbeat 
will have started
+                # and the tasks will have been triggered again.

Review Comment:
   I'll remove the second part of the comment. I initially added it because I 
was printing debug logs and I was seeing that `trigger_tasks` was getting 
called multiple times before the update on the ti had a chance to finish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to