This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 56c0871dce Make BaseJob.most_recent_job favor "running" jobs (#28119)
56c0871dce is described below

commit 56c0871dce2fb2b7ed2252e4b2d1d8d5d0c07c58
Author: Kosteev Eugene <[email protected]>
AuthorDate: Wed Dec 7 07:48:00 2022 +0200

    Make BaseJob.most_recent_job favor "running" jobs (#28119)
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/jobs/base_job.py    | 15 +++++++++++++--
 tests/jobs/test_base_job.py | 20 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 9d1b6173fd..673f035533 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 from time import sleep
 
-from sqlalchemy import Column, Index, Integer, String
+from sqlalchemy import Column, Index, Integer, String, case
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import backref, foreign, relationship
 from sqlalchemy.orm.session import make_transient
@@ -125,12 +125,23 @@ class BaseJob(Base, LoggingMixin):
         """
         Return the most recent job of this type, if any, based on last 
heartbeat received.
 
+        Jobs in "running" state take precedence over others to make sure alive
+        job is returned if it is available.
         This method should be called on a subclass (i.e. on SchedulerJob) to
         return jobs of that type.
 
         :param session: Database session
         """
-        return 
session.query(cls).order_by(cls.latest_heartbeat.desc()).limit(1).first()
+        return (
+            session.query(cls)
+            .order_by(
+                # Put "running" jobs at the front.
+                case({State.RUNNING: 0}, value=cls.state, else_=1),
+                cls.latest_heartbeat.desc(),
+            )
+            .limit(1)
+            .first()
+        )
 
     def is_alive(self, grace_multiplier=2.1):
         """
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index 66715a9d63..6b0fbf7489 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -108,6 +108,26 @@ class TestBaseJob:
 
             session.rollback()
 
+    def test_most_recent_job_running_precedence(self):
+        with create_session() as session:
+            old_running_state_job = MockJob(None, heartrate=10)
+            old_running_state_job.latest_heartbeat = timezone.utcnow()
+            old_running_state_job.state = State.RUNNING
+            new_failed_state_job = MockJob(None, heartrate=10)
+            new_failed_state_job.latest_heartbeat = timezone.utcnow()
+            new_failed_state_job.state = State.FAILED
+            new_null_state_job = MockJob(None, heartrate=10)
+            new_null_state_job.latest_heartbeat = timezone.utcnow()
+            new_null_state_job.state = None
+            session.add(old_running_state_job)
+            session.add(new_failed_state_job)
+            session.add(new_null_state_job)
+            session.flush()
+
+            assert MockJob.most_recent_job(session=session) == 
old_running_state_job
+
+            session.rollback()
+
     def test_is_alive(self):
         job = MockJob(None, heartrate=10, state=State.RUNNING)
         assert job.is_alive() is True

Reply via email to