ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518064180
##########
airflow/executors/local_executor.py:
##########
@@ -261,6 +268,13 @@ def execute_async(
if TYPE_CHECKING:
assert self.executor.result_queue
+ s = Trace.get_current_span()
Review Comment:
Here and elsewhere: Please use `span` (or something descriptive, is span is
a bad idea) instead of `s`.
##########
airflow/jobs/job.py:
##########
@@ -181,48 +182,67 @@ def heartbeat(
:param session to use for saving the job
"""
previous_heartbeat = self.latest_heartbeat
-
- try:
- # This will cause it to load from the db
- self._merge_from(Job._fetch_from_db(self, session))
- previous_heartbeat = self.latest_heartbeat
-
- if self.state == JobState.RESTARTING:
- self.kill()
-
- # Figure out how long to sleep for
- sleep_for = 0
- if self.latest_heartbeat:
- seconds_remaining = (
- self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
- )
- sleep_for = max(0, seconds_remaining)
- sleep(sleep_for)
-
- job = Job._update_heartbeat(job=self, session=session)
- self._merge_from(job)
-
- # At this point, the DB has updated.
- previous_heartbeat = self.latest_heartbeat
-
- heartbeat_callback(session)
- self.log.debug("[heartbeat]")
- except OperationalError:
- Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
- if not self.heartbeat_failed:
- self.log.exception("%s heartbeat got an exception",
self.__class__.__name__)
- self.heartbeat_failed = True
- if self.is_alive():
- self.log.error(
- "%s heartbeat failed with error. Scheduler may go into
unhealthy state",
- self.__class__.__name__,
- )
- else:
- self.log.error(
- "%s heartbeat failed with error. Scheduler is in unhealthy
state", self.__class__.__name__
- )
- # We didn't manage to heartbeat, so make sure that the timestamp
isn't updated
- self.latest_heartbeat = previous_heartbeat
+ with Trace.start_span(span_name="heartbeat", component="Job") as s:
+ try:
+ s.set_attribute("heartbeat", str(self.latest_heartbeat))
+ # This will cause it to load from the db
+ self._merge_from(Job._fetch_from_db(self, session))
+ previous_heartbeat = self.latest_heartbeat
+
+ if self.state == JobState.RESTARTING:
+ self.kill()
+
+ # Figure out how long to sleep for
+ sleep_for = 0
+ if self.latest_heartbeat:
+ seconds_remaining = (
+ self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
+ )
+ sleep_for = max(0, seconds_remaining)
+ s.add_event(name="sleep()", attributes={"sleep_for":
sleep_for})
+ sleep(sleep_for)
+
+ job = Job._update_heartbeat(job=self, session=session)
+ self._merge_from(job)
+
+ # At this point, the DB has updated.
+ previous_heartbeat = self.latest_heartbeat
+
+ heartbeat_callback(session)
+ self.log.debug("[heartbeat]")
+ except OperationalError:
+ Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
+ if not self.heartbeat_failed:
+ self.log.exception("%s heartbeat got an exception",
self.__class__.__name__)
Review Comment:
Here and below, since you are using the same message for the log and the
span, consider storing the message in a variable so they stay in sync if one
ever gets changed. Example:
```
if not self.heartbeat_failed:
msg = "%s heartbeat got an exception" % self.__class__.__name__
self.log.exception(msg, self.class.__name__)
self.heartbeat_failed = True
s.add_event(name="error", attributes={"message": msg})
<... etc ...>
```
##########
airflow/executors/base_executor.py:
##########
@@ -287,15 +301,41 @@ def trigger_tasks(self, open_slots: int) -> None:
if key in self.attempts:
del self.attempts[key]
task_tuples.append((key, command, queue, ti.executor_config))
+ s.add_event(
+ name="task to trigger",
+ attributes={"command": str(command), "conf":
str(ti.executor_config)},
+ )
if task_tuples:
self._process_tasks(task_tuples)
+ @span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+ from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
+
for key, command, queue, executor_config in task_tuples:
- del self.queued_tasks[key]
- self.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
- self.running.add(key)
+ qt = self.queued_tasks[key][3]
+ trace_id = int(gen_trace_id(qt.dag_run), 16) # TaskInstance in
fourth element
Review Comment:
If you are casting this return value to `int` often, consider adding a bool
"as_int" flag to the method, defaulting to False. Then this could just look
like
`trace_id = gen_trace_id(qt.dag_run, as_int=True)`
##########
airflow/jobs/job.py:
##########
@@ -181,48 +182,67 @@ def heartbeat(
:param session to use for saving the job
"""
previous_heartbeat = self.latest_heartbeat
-
- try:
- # This will cause it to load from the db
- self._merge_from(Job._fetch_from_db(self, session))
- previous_heartbeat = self.latest_heartbeat
-
- if self.state == JobState.RESTARTING:
- self.kill()
-
- # Figure out how long to sleep for
- sleep_for = 0
- if self.latest_heartbeat:
- seconds_remaining = (
- self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
- )
- sleep_for = max(0, seconds_remaining)
- sleep(sleep_for)
-
- job = Job._update_heartbeat(job=self, session=session)
- self._merge_from(job)
-
- # At this point, the DB has updated.
- previous_heartbeat = self.latest_heartbeat
-
- heartbeat_callback(session)
- self.log.debug("[heartbeat]")
- except OperationalError:
- Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
- if not self.heartbeat_failed:
- self.log.exception("%s heartbeat got an exception",
self.__class__.__name__)
- self.heartbeat_failed = True
- if self.is_alive():
- self.log.error(
- "%s heartbeat failed with error. Scheduler may go into
unhealthy state",
- self.__class__.__name__,
- )
- else:
- self.log.error(
- "%s heartbeat failed with error. Scheduler is in unhealthy
state", self.__class__.__name__
- )
- # We didn't manage to heartbeat, so make sure that the timestamp
isn't updated
- self.latest_heartbeat = previous_heartbeat
+ with Trace.start_span(span_name="heartbeat", component="Job") as s:
+ try:
+ s.set_attribute("heartbeat", str(self.latest_heartbeat))
+ # This will cause it to load from the db
+ self._merge_from(Job._fetch_from_db(self, session))
+ previous_heartbeat = self.latest_heartbeat
+
+ if self.state == JobState.RESTARTING:
+ self.kill()
+
+ # Figure out how long to sleep for
+ sleep_for = 0
+ if self.latest_heartbeat:
+ seconds_remaining = (
+ self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
+ )
+ sleep_for = max(0, seconds_remaining)
+ s.add_event(name="sleep()", attributes={"sleep_for":
sleep_for})
+ sleep(sleep_for)
+
+ job = Job._update_heartbeat(job=self, session=session)
+ self._merge_from(job)
+
+ # At this point, the DB has updated.
+ previous_heartbeat = self.latest_heartbeat
+
+ heartbeat_callback(session)
+ self.log.debug("[heartbeat]")
+ except OperationalError:
+ Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
Review Comment:
We're using `self.class.__name__` a bunch of times now, maybe it's time to
drop that into a variable to clean it up a little? Up to you, could easily be
considered out of scope.
##########
airflow/dag_processing/manager.py:
##########
@@ -1028,6 +1049,23 @@ def _collect_results_from_processor(self, processor) ->
None:
)
self._file_stats[processor.file_path] = stat
file_name = Path(processor.file_path).stem
+
+ """crude exposure of instrumentation code which may need to be
furnished"""
+ s = Trace.get_tracer("DagFileProcessorManager").start_span(
+ "dag_processing", start_time=int(processor.start_time.timestamp()
* 1000000000)
Review Comment:
You seem to be doing this "convert a datetime to integer" trick a handful of
times throughout this PR. Maybe it is worth making a
`datetime_to_int(datetime) -> int `helper in airflow/utils/dates.py?
##########
airflow/executors/base_executor.py:
##########
@@ -287,15 +301,41 @@ def trigger_tasks(self, open_slots: int) -> None:
if key in self.attempts:
del self.attempts[key]
task_tuples.append((key, command, queue, ti.executor_config))
+ s.add_event(
+ name="task to trigger",
+ attributes={"command": str(command), "conf":
str(ti.executor_config)},
+ )
if task_tuples:
self._process_tasks(task_tuples)
+ @span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+ from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
Review Comment:
Here and below: Why are we importing these locally and not at the top?
##########
airflow/executors/base_executor.py:
##########
@@ -287,15 +301,41 @@ def trigger_tasks(self, open_slots: int) -> None:
if key in self.attempts:
del self.attempts[key]
task_tuples.append((key, command, queue, ti.executor_config))
+ s.add_event(
+ name="task to trigger",
+ attributes={"command": str(command), "conf":
str(ti.executor_config)},
+ )
if task_tuples:
self._process_tasks(task_tuples)
+ @span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+ from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
Review Comment:
Why are we importing this locally and not at the top?
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1415,70 +1478,101 @@ def _schedule_dag_run(
:param dag_run: The DagRun to schedule
:return: Callback that needs to be executed
"""
- callback: DagCallbackRequest | None = None
+ trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run), 16)
+ span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run), 16)
+ links = [{"trace_id": trace_id, "span_id": span_id}]
+
+ with Trace.start_span(
+ span_name="_schedule_dag_run", component="SchedulerJobRunner",
links=links
+ ) as s:
+ s.set_attribute("dag_id", dag_run.dag_id)
+ s.set_attribute("run_id", dag_run.run_id)
+ s.set_attribute("run_type", dag_run.run_type)
+
+ callback: DagCallbackRequest | None = None
+
+ dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id,
session=session)
+ dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+
+ if not dag or not dag_model:
+ self.log.error("Couldn't find DAG %s in DAG bag or database!",
dag_run.dag_id)
+ return callback
+
+ if (
+ dag_run.start_date
+ and dag.dagrun_timeout
+ and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
+ ):
+ dag_run.set_state(DagRunState.FAILED)
+ unfinished_task_instances = session.scalars(
+ select(TI)
+ .where(TI.dag_id == dag_run.dag_id)
+ .where(TI.run_id == dag_run.run_id)
+ .where(TI.state.in_(State.unfinished))
+ )
+ for task_instance in unfinished_task_instances:
+ task_instance.state = TaskInstanceState.SKIPPED
+ session.merge(task_instance)
+ session.flush()
+ self.log.info("Run %s of %s has timed-out", dag_run.run_id,
dag_run.dag_id)
+
+ if self._should_update_dag_next_dagruns(
+ dag, dag_model, last_dag_run=dag_run, session=session
+ ):
+ dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
+
+ callback_to_execute = DagCallbackRequest(
+ full_filepath=dag.fileloc,
+ dag_id=dag.dag_id,
+ run_id=dag_run.run_id,
+ is_failure_callback=True,
+ processor_subdir=dag_model.processor_subdir,
+ msg="timed_out",
+ )
- dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id,
session=session)
- dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+ dag_run.notify_dagrun_state_changed()
+ duration = dag_run.end_date - dag_run.start_date
+ Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}",
duration)
+ Stats.timing("dagrun.duration.failed", duration,
tags={"dag_id": dag_run.dag_id})
+
+ s.set_attribute("error", True)
+ s.add_event(
+ name="error",
+ attributes={
+ "message": f"Run {dag_run.run_id} of {dag_run.dag_id}
has timed-out",
+ "duration": str(duration),
+ },
+ )
+ return callback_to_execute
- if not dag or not dag_model:
- self.log.error("Couldn't find DAG %s in DAG bag or database!",
dag_run.dag_id)
- return callback
+ if dag_run.execution_date > timezone.utcnow() and not
dag.allow_future_exec_dates:
+ self.log.error("Execution date is in future: %s",
dag_run.execution_date)
+ return callback
- if (
- dag_run.start_date
- and dag.dagrun_timeout
- and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
- ):
- dag_run.set_state(DagRunState.FAILED)
- unfinished_task_instances = session.scalars(
- select(TI)
- .where(TI.dag_id == dag_run.dag_id)
- .where(TI.run_id == dag_run.run_id)
- .where(TI.state.in_(State.unfinished))
- )
- for task_instance in unfinished_task_instances:
- task_instance.state = TaskInstanceState.SKIPPED
- session.merge(task_instance)
- session.flush()
- self.log.info("Run %s of %s has timed-out", dag_run.run_id,
dag_run.dag_id)
+ if not self._verify_integrity_if_dag_changed(dag_run=dag_run,
session=session):
+ self.log.warning(
+ "The DAG disappeared before verifying integrity: %s.
Skipping.", dag_run.dag_id
+ )
+ return callback
+ # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something
else?
+ schedulable_tis, callback_to_run =
dag_run.update_state(session=session, execute_callbacks=False)
if self._should_update_dag_next_dagruns(dag, dag_model,
last_dag_run=dag_run, session=session):
dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
-
- callback_to_execute = DagCallbackRequest(
- full_filepath=dag.fileloc,
- dag_id=dag.dag_id,
- run_id=dag_run.run_id,
- is_failure_callback=True,
- processor_subdir=dag_model.processor_subdir,
- msg="timed_out",
+ # This will do one query per dag run. We "could" build up a complex
+ # query to update all the TIs across all the execution dates and
dag
+ # IDs in a single query, but it turns out that can be _very very
slow_
+ # see #11147/commit ee90807ac for more details
+ _schedulable_ti_ids = []
+ for _ti in schedulable_tis:
+ _schedulable_ti_ids.append(_ti.task_id)
Review Comment:
These three lines can be replaced with a list comprehension:
```_schedulable_ti_ids = [_ti.task_id for _ti in schedulable_tis]```
and that could be inlined in the assignment on L1571 if you wanted, instead
of storing it in a local variable at all:
```attributes={"message": "dag_run scheduling its tis", "schedulable_tis":
[_ti.task_id for _ti in schedulable_tis]})
##########
airflow/executors/base_executor.py:
##########
@@ -287,15 +301,41 @@ def trigger_tasks(self, open_slots: int) -> None:
if key in self.attempts:
del self.attempts[key]
task_tuples.append((key, command, queue, ti.executor_config))
+ s.add_event(
+ name="task to trigger",
+ attributes={"command": str(command), "conf":
str(ti.executor_config)},
+ )
if task_tuples:
self._process_tasks(task_tuples)
+ @span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+ from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
+
for key, command, queue, executor_config in task_tuples:
- del self.queued_tasks[key]
- self.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
- self.running.add(key)
+ qt = self.queued_tasks[key][3]
Review Comment:
It isn't entirely obvious what this is here. I assume `qt` means "queued
task" but then what's [3] getting us. Digging through the code, it looks like
this is the TaskInstance (which I see in the comment on the next line, so that
maybe could be moved up if you don't rename this or removed if you do) so
perhaps "task_instance" would be a better name than "qt".
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1415,70 +1478,101 @@ def _schedule_dag_run(
:param dag_run: The DagRun to schedule
:return: Callback that needs to be executed
"""
- callback: DagCallbackRequest | None = None
+ trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run), 16)
+ span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run), 16)
Review Comment:
In addition to another comment where I suggested an "as_int" flag, if you
are fetching them both together often, maybe consider a "get_ids" helper to do
both and return them as a tuple? Something like this in the trace_utils module
maybe:
```
def get_trace_ids(span_type: str, identifier: str) -> (str, str):
if span_type == "dag_run:
span_id = gen_dag_span_id(dag_run=dag_run, as_int=True)
if span_type == .....
return gen_trace_id(dag_run=dag_run, as_int=True), span_id
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]