This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d6cc9e4bb1efe9713eccd8e62e46f11bad294a36 Author: tomrutter <[email protected]> AuthorDate: Tue Jun 27 23:46:16 2023 +0200 Ensure that main triggerer thread exits if the async thread fails (#32092) Co-authored-by: tom.rutter <[email protected]> (cherry picked from commit e585b588fc49b1b1c73a8952e9b257d7a9e13314) --- airflow/jobs/triggerer_job_runner.py | 29 +++++++++++++++--------- tests/jobs/test_triggerer_job.py | 43 ++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 0d56244709..7051dec5a0 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -350,6 +350,9 @@ class TriggererJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin): This runs synchronously and handles all database reads/writes. """ while not self.trigger_runner.stop: + if not self.trigger_runner.is_alive(): + self.log.error("Trigger runner thread has died! Exiting.") + break # Clean out unused triggers Trigger.clean_unused() # Load/delete triggers @@ -466,17 +469,21 @@ class TriggerRunner(threading.Thread, LoggingMixin): watchdog = asyncio.create_task(self.block_watchdog()) last_status = time.time() while not self.stop: - # Run core logic - await self.create_triggers() - await self.cancel_triggers() - await self.cleanup_finished_triggers() - # Sleep for a bit - await asyncio.sleep(1) - # Every minute, log status - if time.time() - last_status >= 60: - count = len(self.triggers) - self.log.info("%i triggers currently running", count) - last_status = time.time() + try: + # Run core logic + await self.create_triggers() + await self.cancel_triggers() + await self.cleanup_finished_triggers() + # Sleep for a bit + await asyncio.sleep(1) + # Every minute, log status + if time.time() - last_status >= 60: + count = len(self.triggers) + self.log.info("%i triggers currently running", count) + last_status = time.time() + except Exception: + self.stop = True + raise # Wait for watchdog to complete await watchdog diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 0e19927f17..dd35dd4cc8 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -412,6 +412,49 @@ def test_trigger_from_expired_triggerer(session): assert [x for x, y in job_runner.trigger_runner.to_create] == [1] +def test_trigger_runner_exception_stops_triggerer(session): + """ + Checks that if an exception occurs when creating triggers, that the triggerer + process stops + """ + + class MockTriggerException(Exception): + pass + + class TriggerRunner_(TriggerRunner): + async def create_triggers(self): + raise MockTriggerException("Trigger creation failed") + + # Use a trigger that will immediately succeed + trigger = SuccessTrigger() + create_trigger_in_db(session, trigger) + + # Make a TriggererJobRunner and have it retrieve DB tasks + job = Job() + job_runner = TriggererJobRunner(job) + job_runner.trigger_runner = TriggerRunner_() + thread = Thread(target=job_runner._execute) + thread.start() + + # Wait 4 seconds for the triggerer to stop + try: + for _ in range(40): + time.sleep(0.1) + if not thread.is_alive(): + break + else: + pytest.fail("TriggererJobRunner did not stop after exception in TriggerRunner") + + if not job_runner.trigger_runner.stop: + pytest.fail("TriggerRunner not marked as stopped after exception in TriggerRunner") + + finally: + job_runner.trigger_runner.stop = True + # with suppress(MockTriggerException): + job_runner.trigger_runner.join() + thread.join() + + def test_trigger_firing(session): """ Checks that when a trigger fires, it correctly makes it into the
