This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6ec3b9abb6 Configurable health check threshold for triggerer (#33089)
6ec3b9abb6 is described below
commit 6ec3b9abb67617ebbea2129421c05e45dc863bee
Author: Niko Oliveira <[email protected]>
AuthorDate: Sun Aug 6 15:25:53 2023 -0700
Configurable health check threshold for triggerer (#33089)
Recently the Triggerer was forced to a health check threshold of
`trigger_heartbeat * 2.1` with a default of 5s for the heartbeat, this
generates a threshold of 10.5s, the previous threshold was 30s, this
leads to a very unstable situation where Triggerers are not given a
reasonable amount of time to heartbeat and their triggers are taken
from them.
This change allows the user to configure this threshold the same way we
do for the scheduler.
---
airflow/config_templates/config.yml | 10 +++++
airflow/jobs/job.py | 2 +
airflow/jobs/triggerer_job_runner.py | 4 +-
airflow/models/trigger.py | 14 +++---
tests/jobs/test_base_job.py | 9 ++--
tests/models/test_trigger.py | 86 +++++++++++-------------------------
6 files changed, 53 insertions(+), 72 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index bad34da8d5..f868a6c6ea 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2258,6 +2258,16 @@ triggerer:
type: float
example: ~
default: "5"
+ triggerer_health_check_threshold:
+ description: |
+ If the last triggerer heartbeat happened more than
triggerer_health_check_threshold
+ ago (in seconds), triggerer is considered unhealthy.
+ This is used by the health check in the "/health" endpoint and in
`airflow jobs check` CLI
+ for TriggererJob.
+ version_added: 2.7.0
+ type: float
+ example: ~
+ default: "30"
kerberos:
description: ~
options:
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index add43e0aa5..bf404fc821 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -129,6 +129,8 @@ class Job(Base, LoggingMixin):
"""
if self.job_type == "SchedulerJob":
health_check_threshold: int = conf.getint("scheduler",
"scheduler_health_check_threshold")
+ elif self.job_type == "TriggererJob":
+ health_check_threshold: int = conf.getint("triggerer",
"triggerer_health_check_threshold")
else:
health_check_threshold: int = self.heartrate * grace_multiplier
return (
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index 9fcc8d14d9..9f8dac6184 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -257,6 +257,8 @@ class TriggererJobRunner(BaseJobRunner["Job |
JobPydantic"], LoggingMixin):
else:
raise ValueError(f"Capacity number {capacity} is invalid")
+ self.health_check_threshold = conf.getint("triggerer",
"triggerer_health_check_threshold")
+
should_queue = True
if DISABLE_WRAPPER:
self.log.warning(
@@ -363,7 +365,7 @@ class TriggererJobRunner(BaseJobRunner["Job |
JobPydantic"], LoggingMixin):
def load_triggers(self):
"""Query the database for the triggers we're supposed to be running
and update the runner."""
- Trigger.assign_unassigned(self.job.id, self.capacity,
self.job.heartrate)
+ Trigger.assign_unassigned(self.job.id, self.capacity,
self.health_check_threshold)
ids = Trigger.ids_for_triggerer(self.job.id)
self.trigger_runner.update_triggers(set(ids))
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index bba60c9449..0161d5c9fe 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -199,13 +199,15 @@ class Trigger(Base):
@classmethod
@internal_api_call
@provide_session
- def assign_unassigned(cls, triggerer_id, capacity, heartrate, session:
Session = NEW_SESSION) -> None:
+ def assign_unassigned(
+ cls, triggerer_id, capacity, health_check_threshold, session: Session
= NEW_SESSION
+ ) -> None:
"""
Assign unassigned triggers based on a number of conditions.
- Takes a triggerer_id, the capacity for that triggerer and the
Triggerer job heartrate,
- and assigns unassigned triggers until that capacity is reached, or
there are no more
- unassigned triggers.
+ Takes a triggerer_id, the capacity for that triggerer and the
Triggerer job heartrate
+ health check threshold, and assigns unassigned triggers until that
capacity is reached,
+ or there are no more unassigned triggers.
"""
from airflow.jobs.job import Job # To avoid circular import
@@ -214,9 +216,7 @@ class Trigger(Base):
if capacity <= 0:
return
- # we multiply heartrate by a grace_multiplier to give the triggerer
- # a chance to heartbeat before we consider it dead
- health_check_threshold = heartrate * 2.1
+
alive_triggerer_ids = session.scalars(
select(Job.id).where(
Job.end_date.is_(None),
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index 0e36bd4571..ee8e8e0b4f 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -21,7 +21,7 @@ import datetime
import sys
from unittest.mock import ANY, Mock, patch
-from pytest import raises
+import pytest
from sqlalchemy.exc import OperationalError
from airflow.executors.sequential_executor import SequentialExecutor
@@ -84,7 +84,7 @@ class TestJob:
job = Job()
job_runner = MockJobRunner(job=job, func=abort)
- with raises(RuntimeError):
+ with pytest.raises(RuntimeError):
run_job(job=job, execute_callable=job_runner._execute)
assert job.state == State.FAILED
@@ -148,8 +148,9 @@ class TestJob:
job.latest_heartbeat = timezone.utcnow() -
datetime.timedelta(seconds=10)
assert job.is_alive() is False, "Completed jobs even with recent
heartbeat should not be alive"
- def test_is_alive_scheduler(self):
- job = Job(heartrate=10, state=State.RUNNING, job_type="SchedulerJob")
+ @pytest.mark.parametrize("job_type", ["SchedulerJob", "TriggererJob"])
+ def test_is_alive_scheduler(self, job_type):
+ job = Job(heartrate=10, state=State.RUNNING, job_type=job_type)
assert job.is_alive() is True
job.latest_heartbeat = timezone.utcnow() -
datetime.timedelta(seconds=20)
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index 0c5e5eeadc..98c570c935 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -141,10 +141,11 @@ def test_assign_unassigned(session, create_task_instance):
"""
Tests that unassigned triggers of all appropriate states are assigned.
"""
+ time_now = timezone.utcnow()
triggerer_heartrate = 10
finished_triggerer = Job(heartrate=triggerer_heartrate,
state=State.SUCCESS)
TriggererJobRunner(finished_triggerer)
- finished_triggerer.end_date = timezone.utcnow() -
datetime.timedelta(hours=1)
+ finished_triggerer.end_date = time_now - datetime.timedelta(hours=1)
session.add(finished_triggerer)
assert not finished_triggerer.is_alive()
healthy_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING)
@@ -155,22 +156,37 @@ def test_assign_unassigned(session, create_task_instance):
TriggererJobRunner(new_triggerer)
session.add(new_triggerer)
assert new_triggerer.is_alive()
+ # This trigger's last heartbeat is older than the check threshold, expect
+ # its triggers to be taken by other healthy triggerers below
+ unhealthy_triggerer = Job(
+ heartrate=triggerer_heartrate,
+ state=State.RUNNING,
+ latest_heartbeat=time_now - datetime.timedelta(seconds=100),
+ )
+ TriggererJobRunner(unhealthy_triggerer)
+ session.add(unhealthy_triggerer)
+ # Triggerer is not healtht, its last heartbeat was too long ago
+ assert not unhealthy_triggerer.is_alive()
session.commit()
trigger_on_healthy_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_on_healthy_triggerer.id = 1
trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id
+ trigger_on_unhealthy_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+ trigger_on_unhealthy_triggerer.id = 2
+ trigger_on_unhealthy_triggerer.triggerer_id = unhealthy_triggerer.id
trigger_on_killed_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
- trigger_on_killed_triggerer.id = 2
+ trigger_on_killed_triggerer.id = 3
trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id
trigger_unassigned_to_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
- trigger_unassigned_to_triggerer.id = 3
+ trigger_unassigned_to_triggerer.id = 4
assert trigger_unassigned_to_triggerer.triggerer_id is None
session.add(trigger_on_healthy_triggerer)
+ session.add(trigger_on_unhealthy_triggerer)
session.add(trigger_on_killed_triggerer)
session.add(trigger_unassigned_to_triggerer)
session.commit()
- assert session.query(Trigger).count() == 3
- Trigger.assign_unassigned(new_triggerer.id, 100, session=session,
heartrate=triggerer_heartrate)
+ assert session.query(Trigger).count() == 4
+ Trigger.assign_unassigned(new_triggerer.id, 100, health_check_threshold=30)
session.expire_all()
# Check that trigger on killed triggerer and unassigned trigger are
assigned to new triggerer
assert (
@@ -186,61 +202,11 @@ def test_assign_unassigned(session, create_task_instance):
session.query(Trigger).filter(Trigger.id ==
trigger_on_healthy_triggerer.id).one().triggerer_id
== healthy_triggerer.id
)
-
-
[email protected]("check_triggerer_heartrate", [10, 60, 300])
-def test_assign_unassigned_missing_heartbeat(session, create_task_instance,
check_triggerer_heartrate):
- """
- Tests that the triggers assigned to a dead triggers are considered as
unassigned
- and they are assigned to an alive triggerer.
- """
- import time_machine
-
- block_triggerer_heartrate = 9999
- with time_machine.travel(datetime.datetime.utcnow(), tick=False) as t:
- first_triggerer = Job(heartrate=block_triggerer_heartrate,
state=State.RUNNING)
- TriggererJobRunner(first_triggerer)
- session.add(first_triggerer)
- assert first_triggerer.is_alive()
- second_triggerer = Job(heartrate=block_triggerer_heartrate,
state=State.RUNNING)
- TriggererJobRunner(second_triggerer)
- session.add(second_triggerer)
- assert second_triggerer.is_alive()
- session.commit()
- trigger_on_first_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
- trigger_on_first_triggerer.id = 1
- trigger_on_first_triggerer.triggerer_id = first_triggerer.id
- trigger_on_second_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
- trigger_on_second_triggerer.id = 2
- trigger_on_second_triggerer.triggerer_id = second_triggerer.id
- session.add(trigger_on_first_triggerer)
- session.add(trigger_on_second_triggerer)
- session.commit()
- assert session.query(Trigger).count() == 2
- triggers_ids = [
- (first_triggerer.id, second_triggerer.id),
- (first_triggerer.id, second_triggerer.id),
- (first_triggerer.id, second_triggerer.id),
- # Check that after more than 2.1 heartrates, the first triggerer
is considered dead
- # and the first trigger is assigned to the second triggerer
- (second_triggerer.id, second_triggerer.id),
- ]
- for i in range(4):
- Trigger.assign_unassigned(
- second_triggerer.id, 100, session=session,
heartrate=check_triggerer_heartrate
- )
- session.expire_all()
- # Check that trigger on killed triggerer and unassigned trigger
are assigned to new triggerer
- assert (
- session.query(Trigger).filter(Trigger.id ==
trigger_on_first_triggerer.id).one().triggerer_id
- == triggers_ids[i][0]
- )
- assert (
- session.query(Trigger).filter(Trigger.id ==
trigger_on_second_triggerer.id).one().triggerer_id
- == triggers_ids[i][1]
- )
- t.shift(datetime.timedelta(seconds=check_triggerer_heartrate))
- second_triggerer.latest_heartbeat +=
datetime.timedelta(seconds=check_triggerer_heartrate)
+ # Check that trigger on unhealthy triggerer is assigned to new triggerer
+ assert (
+ session.query(Trigger).filter(Trigger.id ==
trigger_on_unhealthy_triggerer.id).one().triggerer_id
+ == new_triggerer.id
+ )
def test_get_sorted_triggers(session, create_task_instance):