howardyoo commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1518471507
##########
airflow/jobs/job.py:
##########
@@ -181,48 +182,67 @@ def heartbeat(
:param session to use for saving the job
"""
previous_heartbeat = self.latest_heartbeat
-
- try:
- # This will cause it to load from the db
- self._merge_from(Job._fetch_from_db(self, session))
- previous_heartbeat = self.latest_heartbeat
-
- if self.state == JobState.RESTARTING:
- self.kill()
-
- # Figure out how long to sleep for
- sleep_for = 0
- if self.latest_heartbeat:
- seconds_remaining = (
- self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
- )
- sleep_for = max(0, seconds_remaining)
- sleep(sleep_for)
-
- job = Job._update_heartbeat(job=self, session=session)
- self._merge_from(job)
-
- # At this point, the DB has updated.
- previous_heartbeat = self.latest_heartbeat
-
- heartbeat_callback(session)
- self.log.debug("[heartbeat]")
- except OperationalError:
- Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
- if not self.heartbeat_failed:
- self.log.exception("%s heartbeat got an exception",
self.__class__.__name__)
- self.heartbeat_failed = True
- if self.is_alive():
- self.log.error(
- "%s heartbeat failed with error. Scheduler may go into
unhealthy state",
- self.__class__.__name__,
- )
- else:
- self.log.error(
- "%s heartbeat failed with error. Scheduler is in unhealthy
state", self.__class__.__name__
- )
- # We didn't manage to heartbeat, so make sure that the timestamp
isn't updated
- self.latest_heartbeat = previous_heartbeat
+ with Trace.start_span(span_name="heartbeat", component="Job") as s:
+ try:
+ s.set_attribute("heartbeat", str(self.latest_heartbeat))
+ # This will cause it to load from the db
+ self._merge_from(Job._fetch_from_db(self, session))
+ previous_heartbeat = self.latest_heartbeat
+
+ if self.state == JobState.RESTARTING:
+ self.kill()
+
+ # Figure out how long to sleep for
+ sleep_for = 0
+ if self.latest_heartbeat:
+ seconds_remaining = (
+ self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
+ )
+ sleep_for = max(0, seconds_remaining)
+ s.add_event(name="sleep()", attributes={"sleep_for":
sleep_for})
+ sleep(sleep_for)
+
+ job = Job._update_heartbeat(job=self, session=session)
+ self._merge_from(job)
+
+ # At this point, the DB has updated.
+ previous_heartbeat = self.latest_heartbeat
+
+ heartbeat_callback(session)
+ self.log.debug("[heartbeat]")
+ except OperationalError:
+ Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
+ if not self.heartbeat_failed:
+ self.log.exception("%s heartbeat got an exception",
self.__class__.__name__)
Review Comment:
I actually thought about that! (but was just lazy :-( )
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]