This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 63789436209db9ab0f612581381d46bb18eb8f82
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Aug 9 00:29:43 2023 -0700

    Don't just default to scheduler heartbeat in jobs (#33084)
    
    (cherry picked from commit c39359eafb7da21c098c07202c2bfe155c1db9ed)
---
 airflow/config_templates/unit_tests.cfg |  7 +++++++
 airflow/jobs/job.py                     | 11 +++++++++--
 tests/jobs/test_base_job.py             | 33 ++++++++++++++++++++++++++++++++-
 tests/utils/test_helpers.py             |  8 ++++++++
 4 files changed, 56 insertions(+), 3 deletions(-)

diff --git a/airflow/config_templates/unit_tests.cfg 
b/airflow/config_templates/unit_tests.cfg
index 29d422704e..69c2d65bba 100644
--- a/airflow/config_templates/unit_tests.cfg
+++ b/airflow/config_templates/unit_tests.cfg
@@ -92,6 +92,13 @@ scheduler_heartbeat_sec = 5
 parsing_processes = 2
 dag_dir_list_interval = 0
 
+[triggerer]
+# Those values are set so that during unit tests things run faster than usual.
+# Triggerer heartbeat intentionally different from scheduler to catch bad 
assumptions in code
+# that they are the same which by default they are in production but they can 
be configured differently
+# by users.
+job_heartbeat_sec = 2
+
 [example_section]
 # This section is used to test coercions of configuration values retrieval
 string_value = 21600
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index bf404fc821..264eed15aa 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -97,8 +97,6 @@ class Job(Base, LoggingMixin):
     Only makes sense for SchedulerJob and BackfillJob instances.
     """
 
-    heartrate = conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC")
-
     def __init__(self, executor=None, heartrate=None, **kwargs):
         # Save init parameters as DB fields
         self.hostname = get_hostname()
@@ -117,6 +115,15 @@ class Job(Base, LoggingMixin):
     def executor(self):
         return ExecutorLoader.get_default_executor()
 
+    @cached_property
+    def heartrate(self):
+        if self.job_type == "TriggererJob":
+            return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
+        else:
+            # Heartrate used to be hardcoded to scheduler, so in all other
+            # cases continue to use that value for back compat
+            return conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC")
+
     def is_alive(self, grace_multiplier=2.1):
         """
         Is this job currently alive.
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index ee8e8e0b4f..998cc3bebe 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -32,7 +32,7 @@ from airflow.utils.session import create_session
 from airflow.utils.state import State
 from tests.listeners import lifecycle_listener
 from tests.test_utils.config import conf_vars
-from tests.utils.test_helpers import MockJobRunner
+from tests.utils.test_helpers import MockJobRunner, SchedulerJobRunner, 
TriggererJobRunner
 
 
 class TestJob:
@@ -90,6 +90,37 @@ class TestJob:
         assert job.state == State.FAILED
         assert job.end_date is not None
 
+    @pytest.mark.parametrize(
+        "job_runner, job_type,job_heartbeat_sec",
+        [(SchedulerJobRunner, "scheduler", "11"), (TriggererJobRunner, 
"triggerer", "9")],
+    )
+    def test_heart_rate_after_fetched_from_db(self, job_runner, job_type, 
job_heartbeat_sec):
+        """Ensure heartrate is set correctly after jobs are queried from the 
DB"""
+        with create_session() as session, conf_vars(
+            {(job_type.lower(), "job_heartbeat_sec"): job_heartbeat_sec}
+        ):
+            job = Job()
+            job_runner(job=job)
+            session.add(job)
+            session.flush()
+
+            most_recent = most_recent_job(job_runner.job_type, session=session)
+            assert most_recent.heartrate == float(job_heartbeat_sec)
+
+            session.rollback()
+
+    @pytest.mark.parametrize(
+        "job_runner, job_type,job_heartbeat_sec",
+        [(SchedulerJobRunner, "scheduler", "11"), (TriggererJobRunner, 
"triggerer", "9")],
+    )
+    def test_heart_rate_via_constructor_persists(self, job_runner, job_type, 
job_heartbeat_sec):
+        """Ensure heartrate passed via constructor is set correctly"""
+        with conf_vars({(job_type.lower(), "job_heartbeat_sec"): 
job_heartbeat_sec}):
+            job = Job(heartrate=12)
+            job_runner(job)
+            # heartrate should be 12 since we passed that to the constructor 
directly
+            assert job.heartrate == 12
+
     def test_most_recent_job(self):
         with create_session() as session:
             old_job = Job(heartrate=10)
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index 3b72768fc7..9d6020874c 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -343,3 +343,11 @@ class MockJobRunner(BaseJobRunner):
         if self.func is not None:
             return self.func()
         return None
+
+
+class SchedulerJobRunner(MockJobRunner):
+    job_type = "SchedulerJob"
+
+
+class TriggererJobRunner(MockJobRunner):
+    job_type = "TriggererJob"

Reply via email to