This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit fdb96229b61c3e380bb84886cdc757a0e83b2c2a Author: Niko Oliveira <[email protected]> AuthorDate: Sun Aug 6 15:25:53 2023 -0700 Configurable health check threshold for triggerer (#33089) Recently the Triggerer was forced to a health check threshold of `trigger_heartbeat * 2.1` with a default of 5s for the heartbeat, this generates a threshold of 10.5s, the previous threshold was 30s, this leads to a very unstable situation where Triggerers are not given a reasonable amount of time to heartbeat and their triggers are taken from them. This change allows the user to configure this threshold the same way we do for the scheduler. (cherry picked from commit 6ec3b9abb67617ebbea2129421c05e45dc863bee) --- airflow/config_templates/config.yml | 10 +++++ airflow/jobs/job.py | 2 + airflow/jobs/triggerer_job_runner.py | 4 +- airflow/models/trigger.py | 14 +++--- tests/jobs/test_base_job.py | 9 ++-- tests/models/test_trigger.py | 86 +++++++++++------------------------- 6 files changed, 53 insertions(+), 72 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 28c1942041..8b3188c062 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2311,6 +2311,16 @@ triggerer: type: float example: ~ default: "5" + triggerer_health_check_threshold: + description: | + If the last triggerer heartbeat happened more than triggerer_health_check_threshold + ago (in seconds), triggerer is considered unhealthy. + This is used by the health check in the "/health" endpoint and in `airflow jobs check` CLI + for TriggererJob. + version_added: 2.7.0 + type: float + example: ~ + default: "30" kerberos: description: ~ options: diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index add43e0aa5..bf404fc821 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -129,6 +129,8 @@ class Job(Base, LoggingMixin): """ if self.job_type == "SchedulerJob": health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold") + elif self.job_type == "TriggererJob": + health_check_threshold: int = conf.getint("triggerer", "triggerer_health_check_threshold") else: health_check_threshold: int = self.heartrate * grace_multiplier return ( diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 9fcc8d14d9..9f8dac6184 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -257,6 +257,8 @@ class TriggererJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin): else: raise ValueError(f"Capacity number {capacity} is invalid") + self.health_check_threshold = conf.getint("triggerer", "triggerer_health_check_threshold") + should_queue = True if DISABLE_WRAPPER: self.log.warning( @@ -363,7 +365,7 @@ class TriggererJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin): def load_triggers(self): """Query the database for the triggers we're supposed to be running and update the runner.""" - Trigger.assign_unassigned(self.job.id, self.capacity, self.job.heartrate) + Trigger.assign_unassigned(self.job.id, self.capacity, self.health_check_threshold) ids = Trigger.ids_for_triggerer(self.job.id) self.trigger_runner.update_triggers(set(ids)) diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index bba60c9449..0161d5c9fe 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -199,13 +199,15 @@ class Trigger(Base): @classmethod @internal_api_call @provide_session - def assign_unassigned(cls, triggerer_id, capacity, heartrate, session: Session = NEW_SESSION) -> None: + def assign_unassigned( + cls, triggerer_id, capacity, health_check_threshold, session: Session = NEW_SESSION + ) -> None: """ Assign unassigned triggers based on a number of conditions. - Takes a triggerer_id, the capacity for that triggerer and the Triggerer job heartrate, - and assigns unassigned triggers until that capacity is reached, or there are no more - unassigned triggers. + Takes a triggerer_id, the capacity for that triggerer and the Triggerer job heartrate + health check threshold, and assigns unassigned triggers until that capacity is reached, + or there are no more unassigned triggers. """ from airflow.jobs.job import Job # To avoid circular import @@ -214,9 +216,7 @@ class Trigger(Base): if capacity <= 0: return - # we multiply heartrate by a grace_multiplier to give the triggerer - # a chance to heartbeat before we consider it dead - health_check_threshold = heartrate * 2.1 + alive_triggerer_ids = session.scalars( select(Job.id).where( Job.end_date.is_(None), diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index 0e36bd4571..ee8e8e0b4f 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -21,7 +21,7 @@ import datetime import sys from unittest.mock import ANY, Mock, patch -from pytest import raises +import pytest from sqlalchemy.exc import OperationalError from airflow.executors.sequential_executor import SequentialExecutor @@ -84,7 +84,7 @@ class TestJob: job = Job() job_runner = MockJobRunner(job=job, func=abort) - with raises(RuntimeError): + with pytest.raises(RuntimeError): run_job(job=job, execute_callable=job_runner._execute) assert job.state == State.FAILED @@ -148,8 +148,9 @@ class TestJob: job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10) assert job.is_alive() is False, "Completed jobs even with recent heartbeat should not be alive" - def test_is_alive_scheduler(self): - job = Job(heartrate=10, state=State.RUNNING, job_type="SchedulerJob") + @pytest.mark.parametrize("job_type", ["SchedulerJob", "TriggererJob"]) + def test_is_alive_scheduler(self, job_type): + job = Job(heartrate=10, state=State.RUNNING, job_type=job_type) assert job.is_alive() is True job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20) diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py index 0c5e5eeadc..98c570c935 100644 --- a/tests/models/test_trigger.py +++ b/tests/models/test_trigger.py @@ -141,10 +141,11 @@ def test_assign_unassigned(session, create_task_instance): """ Tests that unassigned triggers of all appropriate states are assigned. """ + time_now = timezone.utcnow() triggerer_heartrate = 10 finished_triggerer = Job(heartrate=triggerer_heartrate, state=State.SUCCESS) TriggererJobRunner(finished_triggerer) - finished_triggerer.end_date = timezone.utcnow() - datetime.timedelta(hours=1) + finished_triggerer.end_date = time_now - datetime.timedelta(hours=1) session.add(finished_triggerer) assert not finished_triggerer.is_alive() healthy_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING) @@ -155,22 +156,37 @@ def test_assign_unassigned(session, create_task_instance): TriggererJobRunner(new_triggerer) session.add(new_triggerer) assert new_triggerer.is_alive() + # This trigger's last heartbeat is older than the check threshold, expect + # its triggers to be taken by other healthy triggerers below + unhealthy_triggerer = Job( + heartrate=triggerer_heartrate, + state=State.RUNNING, + latest_heartbeat=time_now - datetime.timedelta(seconds=100), + ) + TriggererJobRunner(unhealthy_triggerer) + session.add(unhealthy_triggerer) + # Triggerer is not healtht, its last heartbeat was too long ago + assert not unhealthy_triggerer.is_alive() session.commit() trigger_on_healthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) trigger_on_healthy_triggerer.id = 1 trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id + trigger_on_unhealthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) + trigger_on_unhealthy_triggerer.id = 2 + trigger_on_unhealthy_triggerer.triggerer_id = unhealthy_triggerer.id trigger_on_killed_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) - trigger_on_killed_triggerer.id = 2 + trigger_on_killed_triggerer.id = 3 trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id trigger_unassigned_to_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) - trigger_unassigned_to_triggerer.id = 3 + trigger_unassigned_to_triggerer.id = 4 assert trigger_unassigned_to_triggerer.triggerer_id is None session.add(trigger_on_healthy_triggerer) + session.add(trigger_on_unhealthy_triggerer) session.add(trigger_on_killed_triggerer) session.add(trigger_unassigned_to_triggerer) session.commit() - assert session.query(Trigger).count() == 3 - Trigger.assign_unassigned(new_triggerer.id, 100, session=session, heartrate=triggerer_heartrate) + assert session.query(Trigger).count() == 4 + Trigger.assign_unassigned(new_triggerer.id, 100, health_check_threshold=30) session.expire_all() # Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer assert ( @@ -186,61 +202,11 @@ def test_assign_unassigned(session, create_task_instance): session.query(Trigger).filter(Trigger.id == trigger_on_healthy_triggerer.id).one().triggerer_id == healthy_triggerer.id ) - - [email protected]("check_triggerer_heartrate", [10, 60, 300]) -def test_assign_unassigned_missing_heartbeat(session, create_task_instance, check_triggerer_heartrate): - """ - Tests that the triggers assigned to a dead triggers are considered as unassigned - and they are assigned to an alive triggerer. - """ - import time_machine - - block_triggerer_heartrate = 9999 - with time_machine.travel(datetime.datetime.utcnow(), tick=False) as t: - first_triggerer = Job(heartrate=block_triggerer_heartrate, state=State.RUNNING) - TriggererJobRunner(first_triggerer) - session.add(first_triggerer) - assert first_triggerer.is_alive() - second_triggerer = Job(heartrate=block_triggerer_heartrate, state=State.RUNNING) - TriggererJobRunner(second_triggerer) - session.add(second_triggerer) - assert second_triggerer.is_alive() - session.commit() - trigger_on_first_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) - trigger_on_first_triggerer.id = 1 - trigger_on_first_triggerer.triggerer_id = first_triggerer.id - trigger_on_second_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) - trigger_on_second_triggerer.id = 2 - trigger_on_second_triggerer.triggerer_id = second_triggerer.id - session.add(trigger_on_first_triggerer) - session.add(trigger_on_second_triggerer) - session.commit() - assert session.query(Trigger).count() == 2 - triggers_ids = [ - (first_triggerer.id, second_triggerer.id), - (first_triggerer.id, second_triggerer.id), - (first_triggerer.id, second_triggerer.id), - # Check that after more than 2.1 heartrates, the first triggerer is considered dead - # and the first trigger is assigned to the second triggerer - (second_triggerer.id, second_triggerer.id), - ] - for i in range(4): - Trigger.assign_unassigned( - second_triggerer.id, 100, session=session, heartrate=check_triggerer_heartrate - ) - session.expire_all() - # Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer - assert ( - session.query(Trigger).filter(Trigger.id == trigger_on_first_triggerer.id).one().triggerer_id - == triggers_ids[i][0] - ) - assert ( - session.query(Trigger).filter(Trigger.id == trigger_on_second_triggerer.id).one().triggerer_id - == triggers_ids[i][1] - ) - t.shift(datetime.timedelta(seconds=check_triggerer_heartrate)) - second_triggerer.latest_heartbeat += datetime.timedelta(seconds=check_triggerer_heartrate) + # Check that trigger on unhealthy triggerer is assigned to new triggerer + assert ( + session.query(Trigger).filter(Trigger.id == trigger_on_unhealthy_triggerer.id).one().triggerer_id + == new_triggerer.id + ) def test_get_sorted_triggers(session, create_task_instance):
