This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ec3b9abb6 Configurable health check threshold for triggerer (#33089)
6ec3b9abb6 is described below

commit 6ec3b9abb67617ebbea2129421c05e45dc863bee
Author: Niko Oliveira <[email protected]>
AuthorDate: Sun Aug 6 15:25:53 2023 -0700

    Configurable health check threshold for triggerer (#33089)
    
    Recently the Triggerer was forced to a health check threshold of
    `trigger_heartbeat * 2.1` with a default of 5s for the heartbeat, this
    generates a threshold of 10.5s, the previous threshold was 30s, this
    leads to a very unstable situation where Triggerers are not given a
    reasonable amount of time to heartbeat and their triggers are taken
    from them.
    
    This change allows the user to configure this threshold the same way we
    do for the scheduler.
---
 airflow/config_templates/config.yml  | 10 +++++
 airflow/jobs/job.py                  |  2 +
 airflow/jobs/triggerer_job_runner.py |  4 +-
 airflow/models/trigger.py            | 14 +++---
 tests/jobs/test_base_job.py          |  9 ++--
 tests/models/test_trigger.py         | 86 +++++++++++-------------------------
 6 files changed, 53 insertions(+), 72 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index bad34da8d5..f868a6c6ea 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2258,6 +2258,16 @@ triggerer:
       type: float
       example: ~
       default: "5"
+    triggerer_health_check_threshold:
+      description: |
+        If the last triggerer heartbeat happened more than 
triggerer_health_check_threshold
+        ago (in seconds), triggerer is considered unhealthy.
+        This is used by the health check in the "/health" endpoint and in 
`airflow jobs check` CLI
+        for TriggererJob.
+      version_added: 2.7.0
+      type: float
+      example: ~
+      default: "30"
 kerberos:
   description: ~
   options:
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index add43e0aa5..bf404fc821 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -129,6 +129,8 @@ class Job(Base, LoggingMixin):
         """
         if self.job_type == "SchedulerJob":
             health_check_threshold: int = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+        elif self.job_type == "TriggererJob":
+            health_check_threshold: int = conf.getint("triggerer", 
"triggerer_health_check_threshold")
         else:
             health_check_threshold: int = self.heartrate * grace_multiplier
         return (
diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index 9fcc8d14d9..9f8dac6184 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -257,6 +257,8 @@ class TriggererJobRunner(BaseJobRunner["Job | 
JobPydantic"], LoggingMixin):
         else:
             raise ValueError(f"Capacity number {capacity} is invalid")
 
+        self.health_check_threshold = conf.getint("triggerer", 
"triggerer_health_check_threshold")
+
         should_queue = True
         if DISABLE_WRAPPER:
             self.log.warning(
@@ -363,7 +365,7 @@ class TriggererJobRunner(BaseJobRunner["Job | 
JobPydantic"], LoggingMixin):
 
     def load_triggers(self):
         """Query the database for the triggers we're supposed to be running 
and update the runner."""
-        Trigger.assign_unassigned(self.job.id, self.capacity, 
self.job.heartrate)
+        Trigger.assign_unassigned(self.job.id, self.capacity, 
self.health_check_threshold)
         ids = Trigger.ids_for_triggerer(self.job.id)
         self.trigger_runner.update_triggers(set(ids))
 
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index bba60c9449..0161d5c9fe 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -199,13 +199,15 @@ class Trigger(Base):
     @classmethod
     @internal_api_call
     @provide_session
-    def assign_unassigned(cls, triggerer_id, capacity, heartrate, session: 
Session = NEW_SESSION) -> None:
+    def assign_unassigned(
+        cls, triggerer_id, capacity, health_check_threshold, session: Session 
= NEW_SESSION
+    ) -> None:
         """
         Assign unassigned triggers based on a number of conditions.
 
-        Takes a triggerer_id, the capacity for that triggerer and the 
Triggerer job heartrate,
-        and assigns unassigned triggers until that capacity is reached, or 
there are no more
-        unassigned triggers.
+        Takes a triggerer_id, the capacity for that triggerer and the 
Triggerer job heartrate
+        health check threshold, and assigns unassigned triggers until that 
capacity is reached,
+        or there are no more unassigned triggers.
         """
         from airflow.jobs.job import Job  # To avoid circular import
 
@@ -214,9 +216,7 @@ class Trigger(Base):
 
         if capacity <= 0:
             return
-        # we multiply heartrate by a grace_multiplier to give the triggerer
-        # a chance to heartbeat before we consider it dead
-        health_check_threshold = heartrate * 2.1
+
         alive_triggerer_ids = session.scalars(
             select(Job.id).where(
                 Job.end_date.is_(None),
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index 0e36bd4571..ee8e8e0b4f 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -21,7 +21,7 @@ import datetime
 import sys
 from unittest.mock import ANY, Mock, patch
 
-from pytest import raises
+import pytest
 from sqlalchemy.exc import OperationalError
 
 from airflow.executors.sequential_executor import SequentialExecutor
@@ -84,7 +84,7 @@ class TestJob:
 
         job = Job()
         job_runner = MockJobRunner(job=job, func=abort)
-        with raises(RuntimeError):
+        with pytest.raises(RuntimeError):
             run_job(job=job, execute_callable=job_runner._execute)
 
         assert job.state == State.FAILED
@@ -148,8 +148,9 @@ class TestJob:
         job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=10)
         assert job.is_alive() is False, "Completed jobs even with recent 
heartbeat should not be alive"
 
-    def test_is_alive_scheduler(self):
-        job = Job(heartrate=10, state=State.RUNNING, job_type="SchedulerJob")
+    @pytest.mark.parametrize("job_type", ["SchedulerJob", "TriggererJob"])
+    def test_is_alive_scheduler(self, job_type):
+        job = Job(heartrate=10, state=State.RUNNING, job_type=job_type)
         assert job.is_alive() is True
 
         job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=20)
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index 0c5e5eeadc..98c570c935 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -141,10 +141,11 @@ def test_assign_unassigned(session, create_task_instance):
     """
     Tests that unassigned triggers of all appropriate states are assigned.
     """
+    time_now = timezone.utcnow()
     triggerer_heartrate = 10
     finished_triggerer = Job(heartrate=triggerer_heartrate, 
state=State.SUCCESS)
     TriggererJobRunner(finished_triggerer)
-    finished_triggerer.end_date = timezone.utcnow() - 
datetime.timedelta(hours=1)
+    finished_triggerer.end_date = time_now - datetime.timedelta(hours=1)
     session.add(finished_triggerer)
     assert not finished_triggerer.is_alive()
     healthy_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING)
@@ -155,22 +156,37 @@ def test_assign_unassigned(session, create_task_instance):
     TriggererJobRunner(new_triggerer)
     session.add(new_triggerer)
     assert new_triggerer.is_alive()
+    # This trigger's last heartbeat is older than the check threshold, expect
+    # its triggers to be taken by other healthy triggerers below
+    unhealthy_triggerer = Job(
+        heartrate=triggerer_heartrate,
+        state=State.RUNNING,
+        latest_heartbeat=time_now - datetime.timedelta(seconds=100),
+    )
+    TriggererJobRunner(unhealthy_triggerer)
+    session.add(unhealthy_triggerer)
+    # Triggerer is not healtht, its last heartbeat was too long ago
+    assert not unhealthy_triggerer.is_alive()
     session.commit()
     trigger_on_healthy_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
     trigger_on_healthy_triggerer.id = 1
     trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id
+    trigger_on_unhealthy_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_on_unhealthy_triggerer.id = 2
+    trigger_on_unhealthy_triggerer.triggerer_id = unhealthy_triggerer.id
     trigger_on_killed_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
-    trigger_on_killed_triggerer.id = 2
+    trigger_on_killed_triggerer.id = 3
     trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id
     trigger_unassigned_to_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
-    trigger_unassigned_to_triggerer.id = 3
+    trigger_unassigned_to_triggerer.id = 4
     assert trigger_unassigned_to_triggerer.triggerer_id is None
     session.add(trigger_on_healthy_triggerer)
+    session.add(trigger_on_unhealthy_triggerer)
     session.add(trigger_on_killed_triggerer)
     session.add(trigger_unassigned_to_triggerer)
     session.commit()
-    assert session.query(Trigger).count() == 3
-    Trigger.assign_unassigned(new_triggerer.id, 100, session=session, 
heartrate=triggerer_heartrate)
+    assert session.query(Trigger).count() == 4
+    Trigger.assign_unassigned(new_triggerer.id, 100, health_check_threshold=30)
     session.expire_all()
     # Check that trigger on killed triggerer and unassigned trigger are 
assigned to new triggerer
     assert (
@@ -186,61 +202,11 @@ def test_assign_unassigned(session, create_task_instance):
         session.query(Trigger).filter(Trigger.id == 
trigger_on_healthy_triggerer.id).one().triggerer_id
         == healthy_triggerer.id
     )
-
-
[email protected]("check_triggerer_heartrate", [10, 60, 300])
-def test_assign_unassigned_missing_heartbeat(session, create_task_instance, 
check_triggerer_heartrate):
-    """
-    Tests that the triggers assigned to a dead triggers are considered as 
unassigned
-    and they are  assigned to an alive triggerer.
-    """
-    import time_machine
-
-    block_triggerer_heartrate = 9999
-    with time_machine.travel(datetime.datetime.utcnow(), tick=False) as t:
-        first_triggerer = Job(heartrate=block_triggerer_heartrate, 
state=State.RUNNING)
-        TriggererJobRunner(first_triggerer)
-        session.add(first_triggerer)
-        assert first_triggerer.is_alive()
-        second_triggerer = Job(heartrate=block_triggerer_heartrate, 
state=State.RUNNING)
-        TriggererJobRunner(second_triggerer)
-        session.add(second_triggerer)
-        assert second_triggerer.is_alive()
-        session.commit()
-        trigger_on_first_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
-        trigger_on_first_triggerer.id = 1
-        trigger_on_first_triggerer.triggerer_id = first_triggerer.id
-        trigger_on_second_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
-        trigger_on_second_triggerer.id = 2
-        trigger_on_second_triggerer.triggerer_id = second_triggerer.id
-        session.add(trigger_on_first_triggerer)
-        session.add(trigger_on_second_triggerer)
-        session.commit()
-        assert session.query(Trigger).count() == 2
-        triggers_ids = [
-            (first_triggerer.id, second_triggerer.id),
-            (first_triggerer.id, second_triggerer.id),
-            (first_triggerer.id, second_triggerer.id),
-            # Check that after more than 2.1 heartrates, the first triggerer 
is considered dead
-            # and the first trigger is assigned to the second triggerer
-            (second_triggerer.id, second_triggerer.id),
-        ]
-        for i in range(4):
-            Trigger.assign_unassigned(
-                second_triggerer.id, 100, session=session, 
heartrate=check_triggerer_heartrate
-            )
-            session.expire_all()
-            # Check that trigger on killed triggerer and unassigned trigger 
are assigned to new triggerer
-            assert (
-                session.query(Trigger).filter(Trigger.id == 
trigger_on_first_triggerer.id).one().triggerer_id
-                == triggers_ids[i][0]
-            )
-            assert (
-                session.query(Trigger).filter(Trigger.id == 
trigger_on_second_triggerer.id).one().triggerer_id
-                == triggers_ids[i][1]
-            )
-            t.shift(datetime.timedelta(seconds=check_triggerer_heartrate))
-            second_triggerer.latest_heartbeat += 
datetime.timedelta(seconds=check_triggerer_heartrate)
+    # Check that trigger on unhealthy triggerer is assigned to new triggerer
+    assert (
+        session.query(Trigger).filter(Trigger.id == 
trigger_on_unhealthy_triggerer.id).one().triggerer_id
+        == new_triggerer.id
+    )
 
 
 def test_get_sorted_triggers(session, create_task_instance):

Reply via email to