ashb commented on code in PR #40802:
URL: https://github.com/apache/airflow/pull/40802#discussion_r1832724193
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -814,6 +817,60 @@ def _process_executor_events(self, executor: BaseExecutor,
session: Session) ->
ti.pid,
)
+ with Trace.start_span_from_taskinstance(ti=ti) as span:
+ span.set_attribute("category", "scheduler")
+ span.set_attribute("task_id", ti.task_id)
+ span.set_attribute("dag_id", ti.dag_id)
+ span.set_attribute("state", ti.state)
+ if ti.state == TaskInstanceState.FAILED:
+ span.set_attribute("error", True)
+ span.set_attribute("start_date", str(ti.start_date))
+ span.set_attribute("end_date", str(ti.end_date))
+ span.set_attribute("duration", ti.duration)
+ span.set_attribute("executor_config", str(ti.executor_config))
+ span.set_attribute("execution_date", str(ti.execution_date))
+ span.set_attribute("hostname", ti.hostname)
+ span.set_attribute("log_url", ti.log_url)
+ span.set_attribute("operator", str(ti.operator))
+ span.set_attribute("try_number", ti.try_number - 1)
+ span.set_attribute("executor_state", state)
+ span.set_attribute("job_id", ti.job_id)
+ span.set_attribute("pool", ti.pool)
+ span.set_attribute("queue", ti.queue)
+ span.set_attribute("priority_weight", ti.priority_weight)
+ span.set_attribute("queued_dttm", str(ti.queued_dttm))
+ span.set_attribute("ququed_by_job_id", ti.queued_by_job_id)
+ span.set_attribute("pid", ti.pid)
+ if span.is_recording():
+ span.add_event(name="queued",
timestamp=datetime_to_nano(ti.queued_dttm))
+ span.add_event(name="started",
timestamp=datetime_to_nano(ti.start_date))
+ span.add_event(name="ended",
timestamp=datetime_to_nano(ti.end_date))
+ if conf.has_option("traces", "otel_task_log_event") and
conf.getboolean(
+ "traces", "otel_task_log_event"
+ ):
+ from airflow.utils.log.log_reader import TaskLogReader
+
+ task_log_reader = TaskLogReader()
+ if task_log_reader.supports_read:
+ metadata: dict[str, Any] = {}
+ logs, metadata = task_log_reader.read_log_chunks(ti,
ti.try_number, metadata)
+ if ti.hostname in dict(logs[0]):
+ message =
str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
+ while metadata["end_of_log"] is False:
+ logs, metadata =
task_log_reader.read_log_chunks(
+ ti, ti.try_number - 1, metadata
+ )
+ if ti.hostname in dict(logs[0]):
+ message = message +
str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
+ if span.is_recording():
+ span.add_event(
+ name="task_log",
+ attributes={
+ "message": message,
+ "metadata": str(metadata),
+ },
+ )
Review Comment:
@howardyoo @ferruzzi This is a huge no-no. The scheduler cannot do any
processing that will block the main scheduling loop for so long, and going and
reading all of the logs is going to block the scheduler loop for a noticable
time.
THis block needs reverting I'm afraid -- it is not a feature that can exist
in the scheduler.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]