This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f366d955cd Fix calculation of health check threshold for SchedulerJob 
(#31277)
f366d955cd is described below

commit f366d955cd3be551c96ad7f794e0b8525900d13d
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon May 15 10:31:14 2023 +0200

    Fix calculation of health check threshold for SchedulerJob (#31277)
    
    The change ##30302 split Job from JobRunner, but it missed the fact
    that SchedulerJob had a special case of checking the threshold -
    instead of using the standard grace multiplier, it used whatever
    has been defined in the `scheduler_helth_check_threshold`. The
    `is_alive` method in SchedulerJobRunner has remained unused, and
    default 2.1 grace multiplier has been used for both /health
    endpoint and `airflow jobs check`.
    
    This PR brings the exception for SchedulerJob back and clarifies
    that the same treshold is also used for airflow jobs check in
    the documentation.
    
    Fixes: #31200
---
 airflow/config_templates/config.yml          |  3 ++-
 airflow/config_templates/default_airflow.cfg |  3 ++-
 airflow/jobs/job.py                          |  7 +++++--
 airflow/jobs/scheduler_job_runner.py         | 21 ---------------------
 newsfragments/31277.significant.rst          |  8 ++++++++
 tests/jobs/test_base_job.py                  | 23 +++++++++++++++++++++++
 6 files changed, 40 insertions(+), 25 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index a1db97ff58..37ad1dc0ae 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2294,7 +2294,8 @@ scheduler:
       description: |
         If the last scheduler heartbeat happened more than 
scheduler_health_check_threshold
         ago (in seconds), scheduler is considered unhealthy.
-        This is used by the health check in the "/health" endpoint
+        This is used by the health check in the "/health" endpoint and in 
`airflow jobs check` CLI
+        for SchedulerJob.
       version_added: 1.10.2
       type: string
       example: ~
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index bb9a954c74..2e20cff91c 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1183,7 +1183,8 @@ pool_metrics_interval = 5.0
 
 # If the last scheduler heartbeat happened more than 
scheduler_health_check_threshold
 # ago (in seconds), scheduler is considered unhealthy.
-# This is used by the health check in the "/health" endpoint
+# This is used by the health check in the "/health" endpoint and in `airflow 
jobs check` CLI
+# for SchedulerJob.
 scheduler_health_check_threshold = 30
 
 # When you start a scheduler, airflow starts a tiny web server
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 9867efa9ec..399fa39f25 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -130,10 +130,13 @@ class Job(Base, LoggingMixin):
         :param grace_multiplier: multiplier of heartrate to require heart beat
             within
         """
+        if self.job_type == "SchedulerJob":
+            health_check_threshold: int = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+        else:
+            health_check_threshold: int = self.heartrate * grace_multiplier
         return (
             self.state == State.RUNNING
-            and (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-            < self.heartrate * grace_multiplier
+            and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < 
health_check_threshold
         )
 
     @provide_session
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 337f317df8..1c1c2eddcf 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -264,27 +264,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         self.job.executor.debug_dump()
         self.log.info("-" * 80)
 
-    def is_alive(self, grace_multiplier: float | None = None) -> bool:
-        """
-        Whether the SchedulerJob is alive.
-
-        We define alive as in a state of running and a heartbeat within the
-        threshold defined in the ``scheduler_health_check_threshold`` config
-        setting.
-
-        ``grace_multiplier`` is accepted for compatibility with the parent 
class.
-
-        """
-        if grace_multiplier is not None:
-            # Accept the same behaviour as superclass
-            return self.job.is_alive(grace_multiplier=grace_multiplier)
-        scheduler_health_check_threshold: int = conf.getint("scheduler", 
"scheduler_health_check_threshold")
-        return (
-            self.job.state == State.RUNNING
-            and (timezone.utcnow() - self.job.latest_heartbeat).total_seconds()
-            < scheduler_health_check_threshold
-        )
-
     def __get_concurrency_maps(self, states: Iterable[TaskInstanceState], 
session: Session) -> ConcurrencyMap:
         """
         Get the concurrency maps.
diff --git a/newsfragments/31277.significant.rst 
b/newsfragments/31277.significant.rst
new file mode 100644
index 0000000000..d65329a6bc
--- /dev/null
+++ b/newsfragments/31277.significant.rst
@@ -0,0 +1,8 @@
+Clarifications of the external Health Check mechanism and using ``Job`` 
classes.
+
+In the past SchedulerJob and other ``*Job`` classes are known to have been 
used to perform
+external health checks for Airflow components. Those are, however, Airflow DB 
ORM related classes.
+The DB models and database structure of Airflow are considered as internal 
implementation detail, following
+`public interface 
<https://airflow.apache.org/docs/apache-airflow/stable/public-airflow-interface.html>`_).
+Therefore, they should not be used for external health checks. Instead, you 
should use the
+``airflow jobs check`` CLI command (introduced in Airflow 2.1) for that 
purpose.
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index fb415e31a3..c646979237 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -148,6 +148,29 @@ class TestJob:
         job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=10)
         assert job.is_alive() is False, "Completed jobs even with recent 
heartbeat should not be alive"
 
+    def test_is_alive_scheduler(self):
+        job = Job(heartrate=10, state=State.RUNNING, job_type="SchedulerJob")
+        assert job.is_alive() is True
+
+        job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=20)
+        assert job.is_alive() is True
+
+        # default health-check grace period for scheduler job is not 
heartrate*2.1 but 30 seconds
+        job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=21)
+        assert job.is_alive() is True
+
+        job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=31)
+        assert job.is_alive() is False
+
+        # test because .seconds was used before instead of total_seconds
+        # internal repr of datetime is (days, seconds)
+        job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1)
+        assert job.is_alive() is False
+
+        job.state = State.SUCCESS
+        job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=10)
+        assert job.is_alive() is False, "Completed jobs even with recent 
heartbeat should not be alive"
+
     @patch("airflow.jobs.job.create_session")
     def test_heartbeat_failed(self, mock_create_session):
         when = timezone.utcnow() - datetime.timedelta(seconds=60)

Reply via email to