potiuk commented on code in PR #40802:
URL: https://github.com/apache/airflow/pull/40802#discussion_r1681479331
##########
airflow/executors/base_executor.py:
##########
@@ -299,15 +315,40 @@ def trigger_tasks(self, open_slots: int) -> None:
if key in self.attempts:
del self.attempts[key]
task_tuples.append((key, command, queue, ti.executor_config))
+ if span.is_recording():
+ span.add_event(
+ name="task to trigger",
+ attributes={"command": str(command), "conf":
str(ti.executor_config)},
+ )
if task_tuples:
self._process_tasks(task_tuples)
+ @span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
for key, command, queue, executor_config in task_tuples:
- del self.queued_tasks[key]
- self.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
- self.running.add(key)
+ task_instance = self.queued_tasks[key][3] # TaskInstance in
fourth element
+ trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))
+ span_id = int(gen_span_id_from_ti_key(key, as_int=True))
+ links = [{"trace_id": trace_id, "span_id": span_id}]
+
+ # assuming that the span_id will very likely be unique inside the
trace
+ with Trace.start_span(
+ span_name=f"{key.dag_id}.{key.task_id}",
+ component="BaseExecutor",
+ span_id=span_id,
+ links=links,
+ ) as span:
+ span.set_attribute("dag_id", key.dag_id)
+ span.set_attribute("run_id", key.run_id)
+ span.set_attribute("task_id", key.task_id)
+ span.set_attribute("try_number", key.try_number - 1)
Review Comment:
I think try_number is already fixed and we do not need to subtract one here
(@dstandish ?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]