ashb commented on code in PR #40802:
URL: https://github.com/apache/airflow/pull/40802#discussion_r1832724193


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -814,6 +817,60 @@ def _process_executor_events(self, executor: BaseExecutor, 
session: Session) ->
                 ti.pid,
             )
 
+            with Trace.start_span_from_taskinstance(ti=ti) as span:
+                span.set_attribute("category", "scheduler")
+                span.set_attribute("task_id", ti.task_id)
+                span.set_attribute("dag_id", ti.dag_id)
+                span.set_attribute("state", ti.state)
+                if ti.state == TaskInstanceState.FAILED:
+                    span.set_attribute("error", True)
+                span.set_attribute("start_date", str(ti.start_date))
+                span.set_attribute("end_date", str(ti.end_date))
+                span.set_attribute("duration", ti.duration)
+                span.set_attribute("executor_config", str(ti.executor_config))
+                span.set_attribute("execution_date", str(ti.execution_date))
+                span.set_attribute("hostname", ti.hostname)
+                span.set_attribute("log_url", ti.log_url)
+                span.set_attribute("operator", str(ti.operator))
+                span.set_attribute("try_number", ti.try_number - 1)
+                span.set_attribute("executor_state", state)
+                span.set_attribute("job_id", ti.job_id)
+                span.set_attribute("pool", ti.pool)
+                span.set_attribute("queue", ti.queue)
+                span.set_attribute("priority_weight", ti.priority_weight)
+                span.set_attribute("queued_dttm", str(ti.queued_dttm))
+                span.set_attribute("ququed_by_job_id", ti.queued_by_job_id)
+                span.set_attribute("pid", ti.pid)
+                if span.is_recording():
+                    span.add_event(name="queued", 
timestamp=datetime_to_nano(ti.queued_dttm))
+                    span.add_event(name="started", 
timestamp=datetime_to_nano(ti.start_date))
+                    span.add_event(name="ended", 
timestamp=datetime_to_nano(ti.end_date))
+                if conf.has_option("traces", "otel_task_log_event") and 
conf.getboolean(
+                    "traces", "otel_task_log_event"
+                ):
+                    from airflow.utils.log.log_reader import TaskLogReader
+
+                    task_log_reader = TaskLogReader()
+                    if task_log_reader.supports_read:
+                        metadata: dict[str, Any] = {}
+                        logs, metadata = task_log_reader.read_log_chunks(ti, 
ti.try_number, metadata)
+                        if ti.hostname in dict(logs[0]):
+                            message = 
str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
+                            while metadata["end_of_log"] is False:
+                                logs, metadata = 
task_log_reader.read_log_chunks(
+                                    ti, ti.try_number - 1, metadata
+                                )
+                                if ti.hostname in dict(logs[0]):
+                                    message = message + 
str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
+                            if span.is_recording():
+                                span.add_event(
+                                    name="task_log",
+                                    attributes={
+                                        "message": message,
+                                        "metadata": str(metadata),
+                                    },
+                                )

Review Comment:
   @howardyoo @ferruzzi This is a huge no-no. The scheduler cannot do any 
processing that will block the main scheduling loop, and going and reading all 
of the logs is going to block the scheduler loop.
   
   THis block needs reverting I'm afraid -- it is not a feature that can exist 
in the scheduler. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to