ferruzzi commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1978363034


##########
airflow/executors/base_executor.py:
##########
@@ -401,6 +409,33 @@ def trigger_tasks(self, open_slots: int) -> None:
                     workloads.append(item)
                 else:
                     (command, _, queue, ti) = item
+                    # If it's None, then the span for the current 
TaskInstanceKey hasn't been started.
+                    if self.active_spans is not None and 
self.active_spans.get(key) is None:
+                        from airflow.models.taskinstance import 
SimpleTaskInstance
+
+                        if isinstance(ti, SimpleTaskInstance):
+                            parent_context = 
Trace.extract(ti.parent_context_carrier)
+                        else:
+                            parent_context = 
Trace.extract(ti.dag_run.context_carrier)
+                        # Start a new span using the context from the parent.
+                        # Attributes will be set once the task has finished so 
that all
+                        # values will be available (end_time, duration, etc.).
+                        span = Trace.start_child_span(
+                            span_name=f"{ti.task_id}",
+                            parent_context=parent_context,
+                            component="task",
+                            start_time=ti.queued_dttm,
+                            start_as_current=False,
+                        )
+                        self.active_spans.set(key, span)
+                        # Inject the current context into the carrier.
+                        carrier = Trace.inject()
+                        # The carrier needs to be set on the ti, but it can't 
happen here because db calls are expensive.
+                        # So set the carrier as an argument to the command.
+                        # The command execution will set it on the ti, and it 
will be propagated to the task itself.
+                        command = list(command)
+                        command.append("--carrier")
+                        command.append(json.dumps(carrier))

Review Comment:
   Nitpick, feel free to ignore and resolve:
   
   ```suggestion
                           command = list(command) + ["--carrier", 
json.dumps(carrier)]
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to