This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fdb96229b61c3e380bb84886cdc757a0e83b2c2a
Author: Niko Oliveira <[email protected]>
AuthorDate: Sun Aug 6 15:25:53 2023 -0700

    Configurable health check threshold for triggerer (#33089)
    
    Recently the Triggerer was forced to a health check threshold of
    `trigger_heartbeat * 2.1` with a default of 5s for the heartbeat, this
    generates a threshold of 10.5s, the previous threshold was 30s, this
    leads to a very unstable situation where Triggerers are not given a
    reasonable amount of time to heartbeat and their triggers are taken
    from them.
    
    This change allows the user to configure this threshold the same way we
    do for the scheduler.
    
    (cherry picked from commit 6ec3b9abb67617ebbea2129421c05e45dc863bee)
---
 airflow/config_templates/config.yml  | 10 +++++
 airflow/jobs/job.py                  |  2 +
 airflow/jobs/triggerer_job_runner.py |  4 +-
 airflow/models/trigger.py            | 14 +++---
 tests/jobs/test_base_job.py          |  9 ++--
 tests/models/test_trigger.py         | 86 +++++++++++-------------------------
 6 files changed, 53 insertions(+), 72 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 28c1942041..8b3188c062 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2311,6 +2311,16 @@ triggerer:
       type: float
       example: ~
       default: "5"
+    triggerer_health_check_threshold:
+      description: |
+        If the last triggerer heartbeat happened more than 
triggerer_health_check_threshold
+        ago (in seconds), triggerer is considered unhealthy.
+        This is used by the health check in the "/health" endpoint and in 
`airflow jobs check` CLI
+        for TriggererJob.
+      version_added: 2.7.0
+      type: float
+      example: ~
+      default: "30"
 kerberos:
   description: ~
   options:
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index add43e0aa5..bf404fc821 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -129,6 +129,8 @@ class Job(Base, LoggingMixin):
         """
         if self.job_type == "SchedulerJob":
             health_check_threshold: int = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+        elif self.job_type == "TriggererJob":
+            health_check_threshold: int = conf.getint("triggerer", 
"triggerer_health_check_threshold")
         else:
             health_check_threshold: int = self.heartrate * grace_multiplier
         return (
diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index 9fcc8d14d9..9f8dac6184 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -257,6 +257,8 @@ class TriggererJobRunner(BaseJobRunner["Job | 
JobPydantic"], LoggingMixin):
         else:
             raise ValueError(f"Capacity number {capacity} is invalid")
 
+        self.health_check_threshold = conf.getint("triggerer", 
"triggerer_health_check_threshold")
+
         should_queue = True
         if DISABLE_WRAPPER:
             self.log.warning(
@@ -363,7 +365,7 @@ class TriggererJobRunner(BaseJobRunner["Job | 
JobPydantic"], LoggingMixin):
 
     def load_triggers(self):
         """Query the database for the triggers we're supposed to be running 
and update the runner."""
-        Trigger.assign_unassigned(self.job.id, self.capacity, 
self.job.heartrate)
+        Trigger.assign_unassigned(self.job.id, self.capacity, 
self.health_check_threshold)
         ids = Trigger.ids_for_triggerer(self.job.id)
         self.trigger_runner.update_triggers(set(ids))
 
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index bba60c9449..0161d5c9fe 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -199,13 +199,15 @@ class Trigger(Base):
     @classmethod
     @internal_api_call
     @provide_session
-    def assign_unassigned(cls, triggerer_id, capacity, heartrate, session: 
Session = NEW_SESSION) -> None:
+    def assign_unassigned(
+        cls, triggerer_id, capacity, health_check_threshold, session: Session 
= NEW_SESSION
+    ) -> None:
         """
         Assign unassigned triggers based on a number of conditions.
 
-        Takes a triggerer_id, the capacity for that triggerer and the 
Triggerer job heartrate,
-        and assigns unassigned triggers until that capacity is reached, or 
there are no more
-        unassigned triggers.
+        Takes a triggerer_id, the capacity for that triggerer and the 
Triggerer job heartrate
+        health check threshold, and assigns unassigned triggers until that 
capacity is reached,
+        or there are no more unassigned triggers.
         """
         from airflow.jobs.job import Job  # To avoid circular import
 
@@ -214,9 +216,7 @@ class Trigger(Base):
 
         if capacity <= 0:
             return
-        # we multiply heartrate by a grace_multiplier to give the triggerer
-        # a chance to heartbeat before we consider it dead
-        health_check_threshold = heartrate * 2.1
+
         alive_triggerer_ids = session.scalars(
             select(Job.id).where(
                 Job.end_date.is_(None),
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index 0e36bd4571..ee8e8e0b4f 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -21,7 +21,7 @@ import datetime
 import sys
 from unittest.mock import ANY, Mock, patch
 
-from pytest import raises
+import pytest
 from sqlalchemy.exc import OperationalError
 
 from airflow.executors.sequential_executor import SequentialExecutor
@@ -84,7 +84,7 @@ class TestJob:
 
         job = Job()
         job_runner = MockJobRunner(job=job, func=abort)
-        with raises(RuntimeError):
+        with pytest.raises(RuntimeError):
             run_job(job=job, execute_callable=job_runner._execute)
 
         assert job.state == State.FAILED
@@ -148,8 +148,9 @@ class TestJob:
         job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=10)
         assert job.is_alive() is False, "Completed jobs even with recent 
heartbeat should not be alive"
 
-    def test_is_alive_scheduler(self):
-        job = Job(heartrate=10, state=State.RUNNING, job_type="SchedulerJob")
+    @pytest.mark.parametrize("job_type", ["SchedulerJob", "TriggererJob"])
+    def test_is_alive_scheduler(self, job_type):
+        job = Job(heartrate=10, state=State.RUNNING, job_type=job_type)
         assert job.is_alive() is True
 
         job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=20)
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index 0c5e5eeadc..98c570c935 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -141,10 +141,11 @@ def test_assign_unassigned(session, create_task_instance):
     """
     Tests that unassigned triggers of all appropriate states are assigned.
     """
+    time_now = timezone.utcnow()
     triggerer_heartrate = 10
     finished_triggerer = Job(heartrate=triggerer_heartrate, 
state=State.SUCCESS)
     TriggererJobRunner(finished_triggerer)
-    finished_triggerer.end_date = timezone.utcnow() - 
datetime.timedelta(hours=1)
+    finished_triggerer.end_date = time_now - datetime.timedelta(hours=1)
     session.add(finished_triggerer)
     assert not finished_triggerer.is_alive()
     healthy_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING)
@@ -155,22 +156,37 @@ def test_assign_unassigned(session, create_task_instance):
     TriggererJobRunner(new_triggerer)
     session.add(new_triggerer)
     assert new_triggerer.is_alive()
+    # This trigger's last heartbeat is older than the check threshold, expect
+    # its triggers to be taken by other healthy triggerers below
+    unhealthy_triggerer = Job(
+        heartrate=triggerer_heartrate,
+        state=State.RUNNING,
+        latest_heartbeat=time_now - datetime.timedelta(seconds=100),
+    )
+    TriggererJobRunner(unhealthy_triggerer)
+    session.add(unhealthy_triggerer)
+    # Triggerer is not healtht, its last heartbeat was too long ago
+    assert not unhealthy_triggerer.is_alive()
     session.commit()
     trigger_on_healthy_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
     trigger_on_healthy_triggerer.id = 1
     trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id
+    trigger_on_unhealthy_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_on_unhealthy_triggerer.id = 2
+    trigger_on_unhealthy_triggerer.triggerer_id = unhealthy_triggerer.id
     trigger_on_killed_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
-    trigger_on_killed_triggerer.id = 2
+    trigger_on_killed_triggerer.id = 3
     trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id
     trigger_unassigned_to_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
-    trigger_unassigned_to_triggerer.id = 3
+    trigger_unassigned_to_triggerer.id = 4
     assert trigger_unassigned_to_triggerer.triggerer_id is None
     session.add(trigger_on_healthy_triggerer)
+    session.add(trigger_on_unhealthy_triggerer)
     session.add(trigger_on_killed_triggerer)
     session.add(trigger_unassigned_to_triggerer)
     session.commit()
-    assert session.query(Trigger).count() == 3
-    Trigger.assign_unassigned(new_triggerer.id, 100, session=session, 
heartrate=triggerer_heartrate)
+    assert session.query(Trigger).count() == 4
+    Trigger.assign_unassigned(new_triggerer.id, 100, health_check_threshold=30)
     session.expire_all()
     # Check that trigger on killed triggerer and unassigned trigger are 
assigned to new triggerer
     assert (
@@ -186,61 +202,11 @@ def test_assign_unassigned(session, create_task_instance):
         session.query(Trigger).filter(Trigger.id == 
trigger_on_healthy_triggerer.id).one().triggerer_id
         == healthy_triggerer.id
     )
-
-
[email protected]("check_triggerer_heartrate", [10, 60, 300])
-def test_assign_unassigned_missing_heartbeat(session, create_task_instance, 
check_triggerer_heartrate):
-    """
-    Tests that the triggers assigned to a dead triggers are considered as 
unassigned
-    and they are  assigned to an alive triggerer.
-    """
-    import time_machine
-
-    block_triggerer_heartrate = 9999
-    with time_machine.travel(datetime.datetime.utcnow(), tick=False) as t:
-        first_triggerer = Job(heartrate=block_triggerer_heartrate, 
state=State.RUNNING)
-        TriggererJobRunner(first_triggerer)
-        session.add(first_triggerer)
-        assert first_triggerer.is_alive()
-        second_triggerer = Job(heartrate=block_triggerer_heartrate, 
state=State.RUNNING)
-        TriggererJobRunner(second_triggerer)
-        session.add(second_triggerer)
-        assert second_triggerer.is_alive()
-        session.commit()
-        trigger_on_first_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
-        trigger_on_first_triggerer.id = 1
-        trigger_on_first_triggerer.triggerer_id = first_triggerer.id
-        trigger_on_second_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
-        trigger_on_second_triggerer.id = 2
-        trigger_on_second_triggerer.triggerer_id = second_triggerer.id
-        session.add(trigger_on_first_triggerer)
-        session.add(trigger_on_second_triggerer)
-        session.commit()
-        assert session.query(Trigger).count() == 2
-        triggers_ids = [
-            (first_triggerer.id, second_triggerer.id),
-            (first_triggerer.id, second_triggerer.id),
-            (first_triggerer.id, second_triggerer.id),
-            # Check that after more than 2.1 heartrates, the first triggerer 
is considered dead
-            # and the first trigger is assigned to the second triggerer
-            (second_triggerer.id, second_triggerer.id),
-        ]
-        for i in range(4):
-            Trigger.assign_unassigned(
-                second_triggerer.id, 100, session=session, 
heartrate=check_triggerer_heartrate
-            )
-            session.expire_all()
-            # Check that trigger on killed triggerer and unassigned trigger 
are assigned to new triggerer
-            assert (
-                session.query(Trigger).filter(Trigger.id == 
trigger_on_first_triggerer.id).one().triggerer_id
-                == triggers_ids[i][0]
-            )
-            assert (
-                session.query(Trigger).filter(Trigger.id == 
trigger_on_second_triggerer.id).one().triggerer_id
-                == triggers_ids[i][1]
-            )
-            t.shift(datetime.timedelta(seconds=check_triggerer_heartrate))
-            second_triggerer.latest_heartbeat += 
datetime.timedelta(seconds=check_triggerer_heartrate)
+    # Check that trigger on unhealthy triggerer is assigned to new triggerer
+    assert (
+        session.query(Trigger).filter(Trigger.id == 
trigger_on_unhealthy_triggerer.id).one().triggerer_id
+        == new_triggerer.id
+    )
 
 
 def test_get_sorted_triggers(session, create_task_instance):

Reply via email to