dstandish commented on code in PR #40802:
URL: https://github.com/apache/airflow/pull/40802#discussion_r1681481796
##########
airflow/executors/base_executor.py:
##########
@@ -299,15 +315,40 @@ def trigger_tasks(self, open_slots: int) -> None:
if key in self.attempts:
del self.attempts[key]
task_tuples.append((key, command, queue, ti.executor_config))
+ if span.is_recording():
+ span.add_event(
+ name="task to trigger",
+ attributes={"command": str(command), "conf":
str(ti.executor_config)},
+ )
if task_tuples:
self._process_tasks(task_tuples)
+ @span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
for key, command, queue, executor_config in task_tuples:
- del self.queued_tasks[key]
- self.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
- self.running.add(key)
+ task_instance = self.queued_tasks[key][3] # TaskInstance in
fourth element
+ trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))
+ span_id = int(gen_span_id_from_ti_key(key, as_int=True))
+ links = [{"trace_id": trace_id, "span_id": span_id}]
+
+ # assuming that the span_id will very likely be unique inside the
trace
+ with Trace.start_span(
+ span_name=f"{key.dag_id}.{key.task_id}",
+ component="BaseExecutor",
+ span_id=span_id,
+ links=links,
+ ) as span:
+ span.set_attribute("dag_id", key.dag_id)
+ span.set_attribute("run_id", key.run_id)
+ span.set_attribute("task_id", key.task_id)
+ span.set_attribute("try_number", key.try_number - 1)
Review Comment:
yes i think we should no longer need to make this kind of adjustment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]