This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 56c0871dce Make BaseJob.most_recent_job favor "running" jobs (#28119)
56c0871dce is described below
commit 56c0871dce2fb2b7ed2252e4b2d1d8d5d0c07c58
Author: Kosteev Eugene <[email protected]>
AuthorDate: Wed Dec 7 07:48:00 2022 +0200
Make BaseJob.most_recent_job favor "running" jobs (#28119)
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/jobs/base_job.py | 15 +++++++++++++--
tests/jobs/test_base_job.py | 20 ++++++++++++++++++++
2 files changed, 33 insertions(+), 2 deletions(-)
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 9d1b6173fd..673f035533 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from time import sleep
-from sqlalchemy import Column, Index, Integer, String
+from sqlalchemy import Column, Index, Integer, String, case
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.orm.session import make_transient
@@ -125,12 +125,23 @@ class BaseJob(Base, LoggingMixin):
"""
Return the most recent job of this type, if any, based on last
heartbeat received.
+ Jobs in "running" state take precedence over others to make sure alive
+ job is returned if it is available.
This method should be called on a subclass (i.e. on SchedulerJob) to
return jobs of that type.
:param session: Database session
"""
- return
session.query(cls).order_by(cls.latest_heartbeat.desc()).limit(1).first()
+ return (
+ session.query(cls)
+ .order_by(
+ # Put "running" jobs at the front.
+ case({State.RUNNING: 0}, value=cls.state, else_=1),
+ cls.latest_heartbeat.desc(),
+ )
+ .limit(1)
+ .first()
+ )
def is_alive(self, grace_multiplier=2.1):
"""
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index 66715a9d63..6b0fbf7489 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -108,6 +108,26 @@ class TestBaseJob:
session.rollback()
+ def test_most_recent_job_running_precedence(self):
+ with create_session() as session:
+ old_running_state_job = MockJob(None, heartrate=10)
+ old_running_state_job.latest_heartbeat = timezone.utcnow()
+ old_running_state_job.state = State.RUNNING
+ new_failed_state_job = MockJob(None, heartrate=10)
+ new_failed_state_job.latest_heartbeat = timezone.utcnow()
+ new_failed_state_job.state = State.FAILED
+ new_null_state_job = MockJob(None, heartrate=10)
+ new_null_state_job.latest_heartbeat = timezone.utcnow()
+ new_null_state_job.state = None
+ session.add(old_running_state_job)
+ session.add(new_failed_state_job)
+ session.add(new_null_state_job)
+ session.flush()
+
+ assert MockJob.most_recent_job(session=session) ==
old_running_state_job
+
+ session.rollback()
+
def test_is_alive(self):
job = MockJob(None, heartrate=10, state=State.RUNNING)
assert job.is_alive() is True