potiuk commented on code in PR #30255:
URL: https://github.com/apache/airflow/pull/30255#discussion_r1149104856


##########
airflow/jobs/base_job.py:
##########
@@ -275,5 +250,41 @@ def run(self):
         Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1)
         return ret
 
-    def _execute(self):
-        raise NotImplementedError("This method needs to be overridden")
+    @property
+    def job_runner(self) -> BaseJobRunner:
+        """Returns the job runner instance."""
+        return self._job_runner
+
+
+@provide_session
+def perform_heartbeat(job: BaseJob | BaseJobPydantic, only_if_necessary: bool, 
session=None):
+    if isinstance(job, BaseJob):
+        job.heartbeat()
+    else:
+        # TODO (potiuk): Make it works over internal API as a follow up
+        BaseJob.get(job.id, 
session=session).heartbeat(only_if_necessary=only_if_necessary)
+
+
+@provide_session
+def most_recent_job(job_runner: type[BaseJobRunner], session=None) -> BaseJob 
| None:
+    """
+    Return the most recent job of this type, if any, based on last heartbeat 
received.
+
+    Jobs in "running" state take precedence over others to make sure alive
+    job is returned if it is available.
+
+    :param job_runner: Job runner class to get the most recent job for
+    :param session: Database session
+    """
+    from airflow.jobs.base_job import BaseJob

Review Comment:
   Yeah. I think I can do it - but it would be way easier after the #30308 gets 
merged. Then I think I could remove job_runner from BaseJob altogether. It's 
just easier to do it via those intermediate steps. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to