This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d117728cd6 Fix triggerers alive check and add a new conf for triggerer 
heartbeat rate (#32123)
d117728cd6 is described below

commit d117728cd6f337266bebcf4916325d5de815fe03
Author: Hussein Awala <[email protected]>
AuthorDate: Fri Jun 30 22:19:06 2023 +0200

    Fix triggerers alive check and add a new conf for triggerer heartbeat rate 
(#32123)
    
    Signed-off-by: Hussein Awala <[email protected]>
---
 airflow/cli/commands/triggerer_command.py    |  3 +-
 airflow/config_templates/config.yml          |  7 +++
 airflow/config_templates/default_airflow.cfg |  3 ++
 airflow/jobs/triggerer_job_runner.py         |  2 +-
 airflow/models/trigger.py                    | 13 +++---
 tests/models/test_trigger.py                 | 64 ++++++++++++++++++++++++++--
 6 files changed, 81 insertions(+), 11 deletions(-)

diff --git a/airflow/cli/commands/triggerer_command.py 
b/airflow/cli/commands/triggerer_command.py
index 2da4821877..aa06d641c7 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -55,7 +55,8 @@ def triggerer(args):
     """Starts Airflow Triggerer."""
     settings.MASK_SECRETS_IN_LOGS = True
     print(settings.HEADER)
-    triggerer_job_runner = TriggererJobRunner(job=Job(), 
capacity=args.capacity)
+    triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
+    triggerer_job_runner = 
TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), 
capacity=args.capacity)
 
     if args.daemon:
         pid, stdout, stderr, log_file = setup_locations(
diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index b6b1603265..a252716ed1 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2565,6 +2565,13 @@ triggerer:
       type: string
       example: ~
       default: "1000"
+    job_heartbeat_sec:
+      description: |
+        How often to heartbeat the Triggerer job to ensure it hasn't been 
killed.
+      version_added: 2.6.3
+      type: float
+      example: ~
+      default: "5"
 kerberos:
   description: ~
   options:
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 1fbf58f7bc..4eaab9ae95 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1329,6 +1329,9 @@ task_queued_timeout_check_interval = 120.0
 # How many triggers a single Triggerer will run at once, by default.
 default_capacity = 1000
 
+# How often to heartbeat the Triggerer job to ensure it hasn't been killed.
+job_heartbeat_sec = 5
+
 [kerberos]
 ccache = /tmp/airflow_krb5_ccache
 
diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index 32435cc1e1..633e90944c 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -374,7 +374,7 @@ class TriggererJobRunner(BaseJobRunner["Job | 
JobPydantic"], LoggingMixin):
         adds them to our runner, and then removes ones from it we no longer
         need.
         """
-        Trigger.assign_unassigned(self.job.id, self.capacity)
+        Trigger.assign_unassigned(self.job.id, self.capacity, 
self.job.heartrate)
         ids = Trigger.ids_for_triggerer(self.job.id)
         self.trigger_runner.update_triggers(set(ids))
 
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 603ff95157..c0d749eb59 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -200,10 +200,11 @@ class Trigger(Base):
     @classmethod
     @internal_api_call
     @provide_session
-    def assign_unassigned(cls, triggerer_id, capacity, session: Session = 
NEW_SESSION) -> None:
+    def assign_unassigned(cls, triggerer_id, capacity, heartrate, session: 
Session = NEW_SESSION) -> None:
         """
-        Takes a triggerer_id and the capacity for that triggerer and assigns 
unassigned
-        triggers until that capacity is reached, or there are no more 
unassigned triggers.
+        Takes a triggerer_id, the capacity for that triggerer and the 
Triggerer job heartrate,
+        and assigns unassigned triggers until that capacity is reached, or 
there are no more
+        unassigned triggers.
         """
         from airflow.jobs.job import Job  # To avoid circular import
 
@@ -212,12 +213,14 @@ class Trigger(Base):
 
         if capacity <= 0:
             return
-
+        # we multiply heartrate by a grace_multiplier to give the triggerer
+        # a chance to heartbeat before we consider it dead
+        health_check_threshold = heartrate * 2.1
         alive_triggerer_ids = [
             row[0]
             for row in session.query(Job.id).filter(
                 Job.end_date.is_(None),
-                Job.latest_heartbeat > timezone.utcnow() - 
datetime.timedelta(seconds=30),
+                Job.latest_heartbeat > timezone.utcnow() - 
datetime.timedelta(seconds=health_check_threshold),
                 Job.job_type == "TriggererJob",
             )
         ]
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index 4abd7e9e42..0c5e5eeadc 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -141,16 +141,17 @@ def test_assign_unassigned(session, create_task_instance):
     """
     Tests that unassigned triggers of all appropriate states are assigned.
     """
-    finished_triggerer = Job(heartrate=10, state=State.SUCCESS)
+    triggerer_heartrate = 10
+    finished_triggerer = Job(heartrate=triggerer_heartrate, 
state=State.SUCCESS)
     TriggererJobRunner(finished_triggerer)
     finished_triggerer.end_date = timezone.utcnow() - 
datetime.timedelta(hours=1)
     session.add(finished_triggerer)
     assert not finished_triggerer.is_alive()
-    healthy_triggerer = Job(heartrate=10, state=State.RUNNING)
+    healthy_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING)
     TriggererJobRunner(healthy_triggerer)
     session.add(healthy_triggerer)
     assert healthy_triggerer.is_alive()
-    new_triggerer = Job(heartrate=10, state=State.RUNNING)
+    new_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING)
     TriggererJobRunner(new_triggerer)
     session.add(new_triggerer)
     assert new_triggerer.is_alive()
@@ -169,7 +170,7 @@ def test_assign_unassigned(session, create_task_instance):
     session.add(trigger_unassigned_to_triggerer)
     session.commit()
     assert session.query(Trigger).count() == 3
-    Trigger.assign_unassigned(new_triggerer.id, 100, session=session)
+    Trigger.assign_unassigned(new_triggerer.id, 100, session=session, 
heartrate=triggerer_heartrate)
     session.expire_all()
     # Check that trigger on killed triggerer and unassigned trigger are 
assigned to new triggerer
     assert (
@@ -187,6 +188,61 @@ def test_assign_unassigned(session, create_task_instance):
     )
 
 
[email protected]("check_triggerer_heartrate", [10, 60, 300])
+def test_assign_unassigned_missing_heartbeat(session, create_task_instance, 
check_triggerer_heartrate):
+    """
+    Tests that the triggers assigned to a dead triggers are considered as 
unassigned
+    and they are  assigned to an alive triggerer.
+    """
+    import time_machine
+
+    block_triggerer_heartrate = 9999
+    with time_machine.travel(datetime.datetime.utcnow(), tick=False) as t:
+        first_triggerer = Job(heartrate=block_triggerer_heartrate, 
state=State.RUNNING)
+        TriggererJobRunner(first_triggerer)
+        session.add(first_triggerer)
+        assert first_triggerer.is_alive()
+        second_triggerer = Job(heartrate=block_triggerer_heartrate, 
state=State.RUNNING)
+        TriggererJobRunner(second_triggerer)
+        session.add(second_triggerer)
+        assert second_triggerer.is_alive()
+        session.commit()
+        trigger_on_first_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+        trigger_on_first_triggerer.id = 1
+        trigger_on_first_triggerer.triggerer_id = first_triggerer.id
+        trigger_on_second_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+        trigger_on_second_triggerer.id = 2
+        trigger_on_second_triggerer.triggerer_id = second_triggerer.id
+        session.add(trigger_on_first_triggerer)
+        session.add(trigger_on_second_triggerer)
+        session.commit()
+        assert session.query(Trigger).count() == 2
+        triggers_ids = [
+            (first_triggerer.id, second_triggerer.id),
+            (first_triggerer.id, second_triggerer.id),
+            (first_triggerer.id, second_triggerer.id),
+            # Check that after more than 2.1 heartrates, the first triggerer 
is considered dead
+            # and the first trigger is assigned to the second triggerer
+            (second_triggerer.id, second_triggerer.id),
+        ]
+        for i in range(4):
+            Trigger.assign_unassigned(
+                second_triggerer.id, 100, session=session, 
heartrate=check_triggerer_heartrate
+            )
+            session.expire_all()
+            # Check that trigger on killed triggerer and unassigned trigger 
are assigned to new triggerer
+            assert (
+                session.query(Trigger).filter(Trigger.id == 
trigger_on_first_triggerer.id).one().triggerer_id
+                == triggers_ids[i][0]
+            )
+            assert (
+                session.query(Trigger).filter(Trigger.id == 
trigger_on_second_triggerer.id).one().triggerer_id
+                == triggers_ids[i][1]
+            )
+            t.shift(datetime.timedelta(seconds=check_triggerer_heartrate))
+            second_triggerer.latest_heartbeat += 
datetime.timedelta(seconds=check_triggerer_heartrate)
+
+
 def test_get_sorted_triggers(session, create_task_instance):
     """
     Tests that triggers are sorted by the creation_date.

Reply via email to