potiuk commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1151889757
##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
previous_heartbeat = self.latest_heartbeat
+ def session_merge(session):
+ session.merge(self)
+
try:
- with create_session() as session:
- # This will cause it to load from the db
- session.merge(self)
- previous_heartbeat = self.latest_heartbeat
+ self.handle_db_task(task_function = session_merge)
+ except OperationalError:
+ # We didn't manage to heartbeat, so make sure that the timestamp
isn't updated
+ self.latest_heartbeat = previous_heartbeat
+
+ previous_heartbeat = self.latest_heartbeat
- if self.state in State.terminating_states:
- self.kill()
+ if self.state in State.terminating_states:
+ self.kill()
- # Figure out how long to sleep for
- sleep_for = 0
- if self.latest_heartbeat:
- seconds_remaining = (
- self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
- )
- sleep_for = max(0, seconds_remaining)
- sleep(sleep_for)
+ # Figure out how long to sleep for
+ sleep_for = 0
+ if self.latest_heartbeat:
+ seconds_remaining = (
+ self.heartrate - (timezone.utcnow() -
self.latest_heartbeat).total_seconds()
+ )
+ sleep_for = max(0, seconds_remaining)
+ sleep(sleep_for)
- # Update last heartbeat time
- with create_session() as session:
- # Make the session aware of this object
- session.merge(self)
- self.latest_heartbeat = timezone.utcnow()
- session.commit()
- # At this point, the DB has updated.
- previous_heartbeat = self.latest_heartbeat
+ def session_merge_and_commit(session):
+ # Make the session aware of this object
+ session.merge(self)
+ self.latest_heartbeat = timezone.utcnow()
+ session.commit()
Review Comment:
I am actually now working on refactoring this part of the code in #30255,
#30302 and #30308 (and one more to follow) in order to handle AIP-44 (
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44 ) . There is already
a part there to split the method into separate steps (#30308) and I think the
discussion should happen around the time I do it there as it wil be changing
slightly the behaviour of this particular transaction (this is not yet reviewed
change so we might have more discussions there - for now I am waiting for the
#30255 and #30302 to be reviewed and merged as they are prerequisite for that
change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]