This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f366d955cd Fix calculation of health check threshold for SchedulerJob
(#31277)
f366d955cd is described below
commit f366d955cd3be551c96ad7f794e0b8525900d13d
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon May 15 10:31:14 2023 +0200
Fix calculation of health check threshold for SchedulerJob (#31277)
The change ##30302 split Job from JobRunner, but it missed the fact
that SchedulerJob had a special case of checking the threshold -
instead of using the standard grace multiplier, it used whatever
has been defined in the `scheduler_helth_check_threshold`. The
`is_alive` method in SchedulerJobRunner has remained unused, and
default 2.1 grace multiplier has been used for both /health
endpoint and `airflow jobs check`.
This PR brings the exception for SchedulerJob back and clarifies
that the same treshold is also used for airflow jobs check in
the documentation.
Fixes: #31200
---
airflow/config_templates/config.yml | 3 ++-
airflow/config_templates/default_airflow.cfg | 3 ++-
airflow/jobs/job.py | 7 +++++--
airflow/jobs/scheduler_job_runner.py | 21 ---------------------
newsfragments/31277.significant.rst | 8 ++++++++
tests/jobs/test_base_job.py | 23 +++++++++++++++++++++++
6 files changed, 40 insertions(+), 25 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index a1db97ff58..37ad1dc0ae 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2294,7 +2294,8 @@ scheduler:
description: |
If the last scheduler heartbeat happened more than
scheduler_health_check_threshold
ago (in seconds), scheduler is considered unhealthy.
- This is used by the health check in the "/health" endpoint
+ This is used by the health check in the "/health" endpoint and in
`airflow jobs check` CLI
+ for SchedulerJob.
version_added: 1.10.2
type: string
example: ~
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index bb9a954c74..2e20cff91c 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1183,7 +1183,8 @@ pool_metrics_interval = 5.0
# If the last scheduler heartbeat happened more than
scheduler_health_check_threshold
# ago (in seconds), scheduler is considered unhealthy.
-# This is used by the health check in the "/health" endpoint
+# This is used by the health check in the "/health" endpoint and in `airflow
jobs check` CLI
+# for SchedulerJob.
scheduler_health_check_threshold = 30
# When you start a scheduler, airflow starts a tiny web server
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 9867efa9ec..399fa39f25 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -130,10 +130,13 @@ class Job(Base, LoggingMixin):
:param grace_multiplier: multiplier of heartrate to require heart beat
within
"""
+ if self.job_type == "SchedulerJob":
+ health_check_threshold: int = conf.getint("scheduler",
"scheduler_health_check_threshold")
+ else:
+ health_check_threshold: int = self.heartrate * grace_multiplier
return (
self.state == State.RUNNING
- and (timezone.utcnow() - self.latest_heartbeat).total_seconds()
- < self.heartrate * grace_multiplier
+ and (timezone.utcnow() - self.latest_heartbeat).total_seconds() <
health_check_threshold
)
@provide_session
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 337f317df8..1c1c2eddcf 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -264,27 +264,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.job.executor.debug_dump()
self.log.info("-" * 80)
- def is_alive(self, grace_multiplier: float | None = None) -> bool:
- """
- Whether the SchedulerJob is alive.
-
- We define alive as in a state of running and a heartbeat within the
- threshold defined in the ``scheduler_health_check_threshold`` config
- setting.
-
- ``grace_multiplier`` is accepted for compatibility with the parent
class.
-
- """
- if grace_multiplier is not None:
- # Accept the same behaviour as superclass
- return self.job.is_alive(grace_multiplier=grace_multiplier)
- scheduler_health_check_threshold: int = conf.getint("scheduler",
"scheduler_health_check_threshold")
- return (
- self.job.state == State.RUNNING
- and (timezone.utcnow() - self.job.latest_heartbeat).total_seconds()
- < scheduler_health_check_threshold
- )
-
def __get_concurrency_maps(self, states: Iterable[TaskInstanceState],
session: Session) -> ConcurrencyMap:
"""
Get the concurrency maps.
diff --git a/newsfragments/31277.significant.rst
b/newsfragments/31277.significant.rst
new file mode 100644
index 0000000000..d65329a6bc
--- /dev/null
+++ b/newsfragments/31277.significant.rst
@@ -0,0 +1,8 @@
+Clarifications of the external Health Check mechanism and using ``Job``
classes.
+
+In the past SchedulerJob and other ``*Job`` classes are known to have been
used to perform
+external health checks for Airflow components. Those are, however, Airflow DB
ORM related classes.
+The DB models and database structure of Airflow are considered as internal
implementation detail, following
+`public interface
<https://airflow.apache.org/docs/apache-airflow/stable/public-airflow-interface.html>`_).
+Therefore, they should not be used for external health checks. Instead, you
should use the
+``airflow jobs check`` CLI command (introduced in Airflow 2.1) for that
purpose.
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index fb415e31a3..c646979237 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -148,6 +148,29 @@ class TestJob:
job.latest_heartbeat = timezone.utcnow() -
datetime.timedelta(seconds=10)
assert job.is_alive() is False, "Completed jobs even with recent
heartbeat should not be alive"
+ def test_is_alive_scheduler(self):
+ job = Job(heartrate=10, state=State.RUNNING, job_type="SchedulerJob")
+ assert job.is_alive() is True
+
+ job.latest_heartbeat = timezone.utcnow() -
datetime.timedelta(seconds=20)
+ assert job.is_alive() is True
+
+ # default health-check grace period for scheduler job is not
heartrate*2.1 but 30 seconds
+ job.latest_heartbeat = timezone.utcnow() -
datetime.timedelta(seconds=21)
+ assert job.is_alive() is True
+
+ job.latest_heartbeat = timezone.utcnow() -
datetime.timedelta(seconds=31)
+ assert job.is_alive() is False
+
+ # test because .seconds was used before instead of total_seconds
+ # internal repr of datetime is (days, seconds)
+ job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1)
+ assert job.is_alive() is False
+
+ job.state = State.SUCCESS
+ job.latest_heartbeat = timezone.utcnow() -
datetime.timedelta(seconds=10)
+ assert job.is_alive() is False, "Completed jobs even with recent
heartbeat should not be alive"
+
@patch("airflow.jobs.job.create_session")
def test_heartbeat_failed(self, mock_create_session):
when = timezone.utcnow() - datetime.timedelta(seconds=60)