This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d117728cd6 Fix triggerers alive check and add a new conf for triggerer
heartbeat rate (#32123)
d117728cd6 is described below
commit d117728cd6f337266bebcf4916325d5de815fe03
Author: Hussein Awala <[email protected]>
AuthorDate: Fri Jun 30 22:19:06 2023 +0200
Fix triggerers alive check and add a new conf for triggerer heartbeat rate
(#32123)
Signed-off-by: Hussein Awala <[email protected]>
---
airflow/cli/commands/triggerer_command.py | 3 +-
airflow/config_templates/config.yml | 7 +++
airflow/config_templates/default_airflow.cfg | 3 ++
airflow/jobs/triggerer_job_runner.py | 2 +-
airflow/models/trigger.py | 13 +++---
tests/models/test_trigger.py | 64 ++++++++++++++++++++++++++--
6 files changed, 81 insertions(+), 11 deletions(-)
diff --git a/airflow/cli/commands/triggerer_command.py
b/airflow/cli/commands/triggerer_command.py
index 2da4821877..aa06d641c7 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -55,7 +55,8 @@ def triggerer(args):
"""Starts Airflow Triggerer."""
settings.MASK_SECRETS_IN_LOGS = True
print(settings.HEADER)
- triggerer_job_runner = TriggererJobRunner(job=Job(),
capacity=args.capacity)
+ triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
+ triggerer_job_runner =
TriggererJobRunner(job=Job(heartrate=triggerer_heartrate),
capacity=args.capacity)
if args.daemon:
pid, stdout, stderr, log_file = setup_locations(
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index b6b1603265..a252716ed1 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2565,6 +2565,13 @@ triggerer:
type: string
example: ~
default: "1000"
+ job_heartbeat_sec:
+ description: |
+ How often to heartbeat the Triggerer job to ensure it hasn't been
killed.
+ version_added: 2.6.3
+ type: float
+ example: ~
+ default: "5"
kerberos:
description: ~
options:
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 1fbf58f7bc..4eaab9ae95 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1329,6 +1329,9 @@ task_queued_timeout_check_interval = 120.0
# How many triggers a single Triggerer will run at once, by default.
default_capacity = 1000
+# How often to heartbeat the Triggerer job to ensure it hasn't been killed.
+job_heartbeat_sec = 5
+
[kerberos]
ccache = /tmp/airflow_krb5_ccache
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index 32435cc1e1..633e90944c 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -374,7 +374,7 @@ class TriggererJobRunner(BaseJobRunner["Job |
JobPydantic"], LoggingMixin):
adds them to our runner, and then removes ones from it we no longer
need.
"""
- Trigger.assign_unassigned(self.job.id, self.capacity)
+ Trigger.assign_unassigned(self.job.id, self.capacity,
self.job.heartrate)
ids = Trigger.ids_for_triggerer(self.job.id)
self.trigger_runner.update_triggers(set(ids))
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 603ff95157..c0d749eb59 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -200,10 +200,11 @@ class Trigger(Base):
@classmethod
@internal_api_call
@provide_session
- def assign_unassigned(cls, triggerer_id, capacity, session: Session =
NEW_SESSION) -> None:
+ def assign_unassigned(cls, triggerer_id, capacity, heartrate, session:
Session = NEW_SESSION) -> None:
"""
- Takes a triggerer_id and the capacity for that triggerer and assigns
unassigned
- triggers until that capacity is reached, or there are no more
unassigned triggers.
+ Takes a triggerer_id, the capacity for that triggerer and the
Triggerer job heartrate,
+ and assigns unassigned triggers until that capacity is reached, or
there are no more
+ unassigned triggers.
"""
from airflow.jobs.job import Job # To avoid circular import
@@ -212,12 +213,14 @@ class Trigger(Base):
if capacity <= 0:
return
-
+ # we multiply heartrate by a grace_multiplier to give the triggerer
+ # a chance to heartbeat before we consider it dead
+ health_check_threshold = heartrate * 2.1
alive_triggerer_ids = [
row[0]
for row in session.query(Job.id).filter(
Job.end_date.is_(None),
- Job.latest_heartbeat > timezone.utcnow() -
datetime.timedelta(seconds=30),
+ Job.latest_heartbeat > timezone.utcnow() -
datetime.timedelta(seconds=health_check_threshold),
Job.job_type == "TriggererJob",
)
]
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index 4abd7e9e42..0c5e5eeadc 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -141,16 +141,17 @@ def test_assign_unassigned(session, create_task_instance):
"""
Tests that unassigned triggers of all appropriate states are assigned.
"""
- finished_triggerer = Job(heartrate=10, state=State.SUCCESS)
+ triggerer_heartrate = 10
+ finished_triggerer = Job(heartrate=triggerer_heartrate,
state=State.SUCCESS)
TriggererJobRunner(finished_triggerer)
finished_triggerer.end_date = timezone.utcnow() -
datetime.timedelta(hours=1)
session.add(finished_triggerer)
assert not finished_triggerer.is_alive()
- healthy_triggerer = Job(heartrate=10, state=State.RUNNING)
+ healthy_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING)
TriggererJobRunner(healthy_triggerer)
session.add(healthy_triggerer)
assert healthy_triggerer.is_alive()
- new_triggerer = Job(heartrate=10, state=State.RUNNING)
+ new_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING)
TriggererJobRunner(new_triggerer)
session.add(new_triggerer)
assert new_triggerer.is_alive()
@@ -169,7 +170,7 @@ def test_assign_unassigned(session, create_task_instance):
session.add(trigger_unassigned_to_triggerer)
session.commit()
assert session.query(Trigger).count() == 3
- Trigger.assign_unassigned(new_triggerer.id, 100, session=session)
+ Trigger.assign_unassigned(new_triggerer.id, 100, session=session,
heartrate=triggerer_heartrate)
session.expire_all()
# Check that trigger on killed triggerer and unassigned trigger are
assigned to new triggerer
assert (
@@ -187,6 +188,61 @@ def test_assign_unassigned(session, create_task_instance):
)
[email protected]("check_triggerer_heartrate", [10, 60, 300])
+def test_assign_unassigned_missing_heartbeat(session, create_task_instance,
check_triggerer_heartrate):
+ """
+ Tests that the triggers assigned to a dead triggers are considered as
unassigned
+ and they are assigned to an alive triggerer.
+ """
+ import time_machine
+
+ block_triggerer_heartrate = 9999
+ with time_machine.travel(datetime.datetime.utcnow(), tick=False) as t:
+ first_triggerer = Job(heartrate=block_triggerer_heartrate,
state=State.RUNNING)
+ TriggererJobRunner(first_triggerer)
+ session.add(first_triggerer)
+ assert first_triggerer.is_alive()
+ second_triggerer = Job(heartrate=block_triggerer_heartrate,
state=State.RUNNING)
+ TriggererJobRunner(second_triggerer)
+ session.add(second_triggerer)
+ assert second_triggerer.is_alive()
+ session.commit()
+ trigger_on_first_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+ trigger_on_first_triggerer.id = 1
+ trigger_on_first_triggerer.triggerer_id = first_triggerer.id
+ trigger_on_second_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+ trigger_on_second_triggerer.id = 2
+ trigger_on_second_triggerer.triggerer_id = second_triggerer.id
+ session.add(trigger_on_first_triggerer)
+ session.add(trigger_on_second_triggerer)
+ session.commit()
+ assert session.query(Trigger).count() == 2
+ triggers_ids = [
+ (first_triggerer.id, second_triggerer.id),
+ (first_triggerer.id, second_triggerer.id),
+ (first_triggerer.id, second_triggerer.id),
+ # Check that after more than 2.1 heartrates, the first triggerer
is considered dead
+ # and the first trigger is assigned to the second triggerer
+ (second_triggerer.id, second_triggerer.id),
+ ]
+ for i in range(4):
+ Trigger.assign_unassigned(
+ second_triggerer.id, 100, session=session,
heartrate=check_triggerer_heartrate
+ )
+ session.expire_all()
+ # Check that trigger on killed triggerer and unassigned trigger
are assigned to new triggerer
+ assert (
+ session.query(Trigger).filter(Trigger.id ==
trigger_on_first_triggerer.id).one().triggerer_id
+ == triggers_ids[i][0]
+ )
+ assert (
+ session.query(Trigger).filter(Trigger.id ==
trigger_on_second_triggerer.id).one().triggerer_id
+ == triggers_ids[i][1]
+ )
+ t.shift(datetime.timedelta(seconds=check_triggerer_heartrate))
+ second_triggerer.latest_heartbeat +=
datetime.timedelta(seconds=check_triggerer_heartrate)
+
+
def test_get_sorted_triggers(session, create_task_instance):
"""
Tests that triggers are sorted by the creation_date.