This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cf31c6f1368b69957d7a1e84538c5e336c1bd28d Author: Hussein Awala <[email protected]> AuthorDate: Fri Jun 30 22:19:06 2023 +0200 Fix triggerers alive check and add a new conf for triggerer heartbeat rate (#32123) Signed-off-by: Hussein Awala <[email protected]> (cherry picked from commit d117728cd6f337266bebcf4916325d5de815fe03) --- airflow/cli/commands/triggerer_command.py | 3 +- airflow/config_templates/config.yml | 7 +++ airflow/config_templates/default_airflow.cfg | 3 ++ airflow/jobs/triggerer_job_runner.py | 2 +- airflow/models/trigger.py | 13 +++--- tests/models/test_trigger.py | 64 ++++++++++++++++++++++++++-- 6 files changed, 81 insertions(+), 11 deletions(-) diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py index 7bfd77bf8e..d1a63e9c22 100644 --- a/airflow/cli/commands/triggerer_command.py +++ b/airflow/cli/commands/triggerer_command.py @@ -55,7 +55,8 @@ def triggerer(args): """Starts Airflow Triggerer.""" settings.MASK_SECRETS_IN_LOGS = True print(settings.HEADER) - triggerer_job_runner = TriggererJobRunner(job=Job(), capacity=args.capacity) + triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") + triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity) if args.daemon: pid, stdout, stderr, log_file = setup_locations( diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 4653a82416..897ed3b402 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2459,6 +2459,13 @@ triggerer: type: string example: ~ default: "1000" + job_heartbeat_sec: + description: | + How often to heartbeat the Triggerer job to ensure it hasn't been killed. + version_added: 2.6.3 + type: float + example: ~ + default: "5" kerberos: description: ~ options: diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c765f17ead..1908c39bf5 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1254,6 +1254,9 @@ allowed_run_id_pattern = ^[A-Za-z0-9_.~:+-]+$ # How many triggers a single Triggerer will run at once, by default. default_capacity = 1000 +# How often to heartbeat the Triggerer job to ensure it hasn't been killed. +job_heartbeat_sec = 5 + [kerberos] ccache = /tmp/airflow_krb5_ccache diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 8cbc17b228..c1168a09b1 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -374,7 +374,7 @@ class TriggererJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin): adds them to our runner, and then removes ones from it we no longer need. """ - Trigger.assign_unassigned(self.job.id, self.capacity) + Trigger.assign_unassigned(self.job.id, self.capacity, self.job.heartrate) ids = Trigger.ids_for_triggerer(self.job.id) self.trigger_runner.update_triggers(set(ids)) diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index adad7ba1c3..e65a65505a 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -196,10 +196,11 @@ class Trigger(Base): @classmethod @internal_api_call @provide_session - def assign_unassigned(cls, triggerer_id, capacity, session: Session = NEW_SESSION) -> None: + def assign_unassigned(cls, triggerer_id, capacity, heartrate, session: Session = NEW_SESSION) -> None: """ - Takes a triggerer_id and the capacity for that triggerer and assigns unassigned - triggers until that capacity is reached, or there are no more unassigned triggers. + Takes a triggerer_id, the capacity for that triggerer and the Triggerer job heartrate, + and assigns unassigned triggers until that capacity is reached, or there are no more + unassigned triggers. """ from airflow.jobs.job import Job # To avoid circular import @@ -208,12 +209,14 @@ class Trigger(Base): if capacity <= 0: return - + # we multiply heartrate by a grace_multiplier to give the triggerer + # a chance to heartbeat before we consider it dead + health_check_threshold = heartrate * 2.1 alive_triggerer_ids = [ row[0] for row in session.query(Job.id).filter( Job.end_date.is_(None), - Job.latest_heartbeat > timezone.utcnow() - datetime.timedelta(seconds=30), + Job.latest_heartbeat > timezone.utcnow() - datetime.timedelta(seconds=health_check_threshold), Job.job_type == "TriggererJob", ) ] diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py index ac5bba4606..6b434933dd 100644 --- a/tests/models/test_trigger.py +++ b/tests/models/test_trigger.py @@ -140,16 +140,17 @@ def test_assign_unassigned(session, create_task_instance): """ Tests that unassigned triggers of all appropriate states are assigned. """ - finished_triggerer = Job(heartrate=10, state=State.SUCCESS) + triggerer_heartrate = 10 + finished_triggerer = Job(heartrate=triggerer_heartrate, state=State.SUCCESS) TriggererJobRunner(finished_triggerer) finished_triggerer.end_date = timezone.utcnow() - datetime.timedelta(hours=1) session.add(finished_triggerer) assert not finished_triggerer.is_alive() - healthy_triggerer = Job(heartrate=10, state=State.RUNNING) + healthy_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING) TriggererJobRunner(healthy_triggerer) session.add(healthy_triggerer) assert healthy_triggerer.is_alive() - new_triggerer = Job(heartrate=10, state=State.RUNNING) + new_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING) TriggererJobRunner(new_triggerer) session.add(new_triggerer) assert new_triggerer.is_alive() @@ -168,7 +169,7 @@ def test_assign_unassigned(session, create_task_instance): session.add(trigger_unassigned_to_triggerer) session.commit() assert session.query(Trigger).count() == 3 - Trigger.assign_unassigned(new_triggerer.id, 100, session=session) + Trigger.assign_unassigned(new_triggerer.id, 100, session=session, heartrate=triggerer_heartrate) session.expire_all() # Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer assert ( @@ -184,3 +185,58 @@ def test_assign_unassigned(session, create_task_instance): session.query(Trigger).filter(Trigger.id == trigger_on_healthy_triggerer.id).one().triggerer_id == healthy_triggerer.id ) + + [email protected]("check_triggerer_heartrate", [10, 60, 300]) +def test_assign_unassigned_missing_heartbeat(session, create_task_instance, check_triggerer_heartrate): + """ + Tests that the triggers assigned to a dead triggers are considered as unassigned + and they are assigned to an alive triggerer. + """ + import time_machine + + block_triggerer_heartrate = 9999 + with time_machine.travel(datetime.datetime.utcnow(), tick=False) as t: + first_triggerer = Job(heartrate=block_triggerer_heartrate, state=State.RUNNING) + TriggererJobRunner(first_triggerer) + session.add(first_triggerer) + assert first_triggerer.is_alive() + second_triggerer = Job(heartrate=block_triggerer_heartrate, state=State.RUNNING) + TriggererJobRunner(second_triggerer) + session.add(second_triggerer) + assert second_triggerer.is_alive() + session.commit() + trigger_on_first_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) + trigger_on_first_triggerer.id = 1 + trigger_on_first_triggerer.triggerer_id = first_triggerer.id + trigger_on_second_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) + trigger_on_second_triggerer.id = 2 + trigger_on_second_triggerer.triggerer_id = second_triggerer.id + session.add(trigger_on_first_triggerer) + session.add(trigger_on_second_triggerer) + session.commit() + assert session.query(Trigger).count() == 2 + triggers_ids = [ + (first_triggerer.id, second_triggerer.id), + (first_triggerer.id, second_triggerer.id), + (first_triggerer.id, second_triggerer.id), + # Check that after more than 2.1 heartrates, the first triggerer is considered dead + # and the first trigger is assigned to the second triggerer + (second_triggerer.id, second_triggerer.id), + ] + for i in range(4): + Trigger.assign_unassigned( + second_triggerer.id, 100, session=session, heartrate=check_triggerer_heartrate + ) + session.expire_all() + # Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer + assert ( + session.query(Trigger).filter(Trigger.id == trigger_on_first_triggerer.id).one().triggerer_id + == triggers_ids[i][0] + ) + assert ( + session.query(Trigger).filter(Trigger.id == trigger_on_second_triggerer.id).one().triggerer_id + == triggers_ids[i][1] + ) + t.shift(datetime.timedelta(seconds=check_triggerer_heartrate)) + second_triggerer.latest_heartbeat += datetime.timedelta(seconds=check_triggerer_heartrate)
