uranusjr commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1147129731


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp 
isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   Hmm, the more I think, the more problematic this commit seems (I think 
there’s at least one other below). Say a database executes this commit, and 
then hiccups during `heartbeat_callback`. This entire block would be retried 
again with an extra update. That doesn’t feel right to me. We should perhaps 
either
   
   a. remove this commit so the entire block shares one single transaction and 
gets rolled back and restarted when something bad happens, or
   b. split this into two blocks (by the `commit` call), and retry them 
separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to