This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6c866f49e5 recovery message (#34457)
6c866f49e5 is described below
commit 6c866f49e536051d603c5bc20bc27308cf3804c1
Author: Bowrna <[email protected]>
AuthorDate: Sat Apr 6 05:00:27 2024 +0530
recovery message (#34457)
---
airflow/jobs/job.py | 47 +++++++++++++++++++----------------
airflow/serialization/pydantic/job.py | 9 +++----
tests/jobs/test_base_job.py | 10 +++++---
3 files changed, 36 insertions(+), 30 deletions(-)
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 99ca45a3e9..b1e41499dc 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations
-from functools import cached_property
+from functools import cached_property, lru_cache
from time import sleep
from typing import TYPE_CHECKING, Callable, NoReturn
@@ -56,6 +56,19 @@ def _resolve_dagrun_model():
return DagRun
+@lru_cache
+def health_check_threshold(job_type: str, heartrate: int) -> int | float:
+ grace_multiplier = 2.1
+ health_check_threshold_value: int | float
+ if job_type == "SchedulerJob":
+ health_check_threshold_value = conf.getint("scheduler",
"scheduler_health_check_threshold")
+ elif job_type == "TriggererJob":
+ health_check_threshold_value = conf.getfloat("triggerer",
"triggerer_health_check_threshold")
+ else:
+ health_check_threshold_value = heartrate * grace_multiplier
+ return health_check_threshold_value
+
+
class Job(Base, LoggingMixin):
"""
The ORM class representing Job stored in the database.
@@ -112,6 +125,7 @@ class Job(Base, LoggingMixin):
self.executor = executor
self.start_date = timezone.utcnow()
self.latest_heartbeat = timezone.utcnow()
+ self.previous_heartbeat = None
if heartrate is not None:
self.heartrate = heartrate
self.unixname = getuser()
@@ -131,22 +145,18 @@ class Job(Base, LoggingMixin):
def heartrate(self) -> float:
return Job._heartrate(self.job_type)
- def is_alive(self, grace_multiplier=2.1) -> bool:
+ def is_alive(self) -> bool:
"""
Is this job currently alive.
We define alive as in a state of RUNNING, and having sent a heartbeat
within a multiple of the heartrate (default of 2.1)
-
- :param grace_multiplier: multiplier of heartrate to require heart beat
- within
"""
+ threshold_value = health_check_threshold(self.job_type, self.heartrate)
return Job._is_alive(
- job_type=self.job_type,
- heartrate=self.heartrate,
state=self.state,
+ health_check_threshold_value=threshold_value,
latest_heartbeat=self.latest_heartbeat,
- grace_multiplier=grace_multiplier,
)
@provide_session
@@ -206,16 +216,20 @@ class Job(Base, LoggingMixin):
job = Job._update_heartbeat(job=self, session=session)
self._merge_from(job)
-
+ time_since_last_heartbeat = (timezone.utcnow() -
previous_heartbeat).total_seconds()
+ health_check_threshold_value =
health_check_threshold(self.job_type, self.heartrate)
+ if time_since_last_heartbeat > health_check_threshold_value:
+ self.log.info("Heartbeat recovered after %.2f seconds",
time_since_last_heartbeat)
# At this point, the DB has updated.
previous_heartbeat = self.latest_heartbeat
heartbeat_callback(session)
self.log.debug("[heartbeat]")
+ self.heartbeat_failed = False
except OperationalError:
Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
if not self.heartbeat_failed:
- self.log.exception("%s heartbeat got an exception",
self.__class__.__name__)
+ self.log.exception("%s heartbeat failed with error",
self.__class__.__name__)
self.heartbeat_failed = True
if self.is_alive():
self.log.error(
@@ -278,22 +292,13 @@ class Job(Base, LoggingMixin):
@staticmethod
def _is_alive(
- job_type: str | None,
- heartrate: float,
state: JobState | str | None,
+ health_check_threshold_value: float | int,
latest_heartbeat: datetime.datetime,
- grace_multiplier: float = 2.1,
) -> bool:
- health_check_threshold: float
- if job_type == "SchedulerJob":
- health_check_threshold = conf.getint("scheduler",
"scheduler_health_check_threshold")
- elif job_type == "TriggererJob":
- health_check_threshold = conf.getint("triggerer",
"triggerer_health_check_threshold")
- else:
- health_check_threshold = heartrate * grace_multiplier
return (
state == JobState.RUNNING
- and (timezone.utcnow() - latest_heartbeat).total_seconds() <
health_check_threshold
+ and (timezone.utcnow() - latest_heartbeat).total_seconds() <
health_check_threshold_value
)
@staticmethod
diff --git a/airflow/serialization/pydantic/job.py
b/airflow/serialization/pydantic/job.py
index 7aec389ba9..bea8e9a3af 100644
--- a/airflow/serialization/pydantic/job.py
+++ b/airflow/serialization/pydantic/job.py
@@ -42,6 +42,7 @@ class JobPydantic(BaseModelPydantic):
executor_class: Optional[str]
hostname: Optional[str]
unixname: Optional[str]
+ grace_multiplier: float = 2.1
model_config = ConfigDict(from_attributes=True)
@@ -57,14 +58,12 @@ class JobPydantic(BaseModelPydantic):
assert self.job_type is not None
return Job._heartrate(self.job_type)
- def is_alive(self, grace_multiplier=2.1) -> bool:
+ def is_alive(self) -> bool:
"""Is this job currently alive."""
- from airflow.jobs.job import Job
+ from airflow.jobs.job import Job, health_check_threshold
return Job._is_alive(
- job_type=self.job_type,
- heartrate=self.heartrate,
state=self.state,
+ health_check_threshold_value=health_check_threshold(self.job_type,
self.heartrate),
latest_heartbeat=self.latest_heartbeat,
- grace_multiplier=grace_multiplier,
)
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index a7eb4932bf..fcbb84fef7 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import datetime
+import logging
import sys
from unittest.mock import ANY, Mock, patch
@@ -207,16 +208,17 @@ class TestJob:
job.latest_heartbeat = timezone.utcnow() -
datetime.timedelta(seconds=10)
assert job.is_alive() is False, "Completed jobs even with recent
heartbeat should not be alive"
- def test_heartbeat_failed(self):
+ def test_heartbeat_failed(self, caplog):
when = timezone.utcnow() - datetime.timedelta(seconds=60)
mock_session = Mock(name="MockSession")
mock_session.commit.side_effect = OperationalError("Force fail", {},
None)
job = Job(heartrate=10, state=State.RUNNING)
job.latest_heartbeat = when
-
- job.heartbeat(heartbeat_callback=lambda: None, session=mock_session)
-
+ with caplog.at_level(logging.ERROR):
+ job.heartbeat(heartbeat_callback=lambda: None,
session=mock_session)
+ assert "heartbeat failed with error" in caplog.text
assert job.latest_heartbeat == when, "attribute not updated when
heartbeat fails"
+ assert job.heartbeat_failed
@conf_vars(
{