This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8c4f10427cac2c679bdab5063d1fde84c88a5f5b
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Mar 8 14:31:46 2024 +0100

    Fix a bug where scheduler heartrate parameter were not used (#37992)
    
    Sinc #30255 scheduler heartrate has not been properly calculated.
    We missed the check for SchedulerJob type and setting heartrate
    value from `scheduler_health_check_threshold`.
    
    This PR fixes it.
    
    Fix: #37971
    (cherry picked from commit 01e40abc759a219cc359fab6ca73434cca55901a)
---
 airflow/jobs/job.py                  |  2 ++
 airflow/jobs/scheduler_job_runner.py |  1 -
 tests/jobs/test_base_job.py          |  9 ++++++---
 tests/jobs/test_scheduler_job.py     | 10 ++++++++++
 4 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 0afbb2a026..0bd81e0c80 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -264,6 +264,8 @@ class Job(Base, LoggingMixin):
     def _heartrate(job_type: str) -> float:
         if job_type == "TriggererJob":
             return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
+        elif job_type == "SchedulerJob":
+            return conf.getfloat("scheduler", "SCHEDULER_HEARTBEAT_SEC")
         else:
             # Heartrate used to be hardcoded to scheduler, so in all other
             # cases continue to use that value for back compat
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index bf5a28cb5b..eddd742571 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -151,7 +151,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     """
 
     job_type = "SchedulerJob"
-    heartrate: int = conf.getint("scheduler", "SCHEDULER_HEARTBEAT_SEC")
 
     def __init__(
         self,
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index 8f7237ffc6..62e0369791 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -97,9 +97,12 @@ class TestJob:
     )
     def test_heart_rate_after_fetched_from_db(self, job_runner, job_type, 
job_heartbeat_sec):
         """Ensure heartrate is set correctly after jobs are queried from the 
DB"""
-        with create_session() as session, conf_vars(
-            {(job_type.lower(), "job_heartbeat_sec"): job_heartbeat_sec}
-        ):
+        if job_type == "scheduler":
+            config_name = "scheduler_heartbeat_sec"
+        else:
+            config_name = "job_heartbeat_sec"
+
+        with create_session() as session, conf_vars({(job_type.lower(), 
config_name): job_heartbeat_sec}):
             job = Job()
             job_runner(job=job)
             session.add(job)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 8e745e1e08..e5de64ee9d 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -193,6 +193,16 @@ class TestSchedulerJob:
                 not scheduler_job.is_alive()
             ), "Completed jobs even with recent heartbeat should not be alive"
 
+    @pytest.mark.parametrize(
+        "heartrate",
+        [10, 5],
+    )
+    def test_heartrate(self, heartrate):
+        with conf_vars({("scheduler", "scheduler_heartbeat_sec"): 
str(heartrate)}):
+            scheduler_job = Job(executor=self.null_exec)
+            _ = SchedulerJobRunner(job=scheduler_job)
+            assert scheduler_job.heartrate == heartrate
+
     def run_single_scheduler_loop_with_no_dags(self, dags_folder):
         """
         Utility function that runs a single scheduler loop without actually

Reply via email to