This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e585b588fc Ensure that main triggerer thread exits if the async thread
fails (#32092)
e585b588fc is described below
commit e585b588fc49b1b1c73a8952e9b257d7a9e13314
Author: tomrutter <[email protected]>
AuthorDate: Tue Jun 27 23:46:16 2023 +0200
Ensure that main triggerer thread exits if the async thread fails (#32092)
Co-authored-by: tom.rutter <[email protected]>
---
airflow/jobs/triggerer_job_runner.py | 29 +++++++++++++++---------
tests/jobs/test_triggerer_job.py | 43 ++++++++++++++++++++++++++++++++++++
2 files changed, 61 insertions(+), 11 deletions(-)
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index 91bd3dbd02..32435cc1e1 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -351,6 +351,9 @@ class TriggererJobRunner(BaseJobRunner["Job |
JobPydantic"], LoggingMixin):
This runs synchronously and handles all database reads/writes.
"""
while not self.trigger_runner.stop:
+ if not self.trigger_runner.is_alive():
+ self.log.error("Trigger runner thread has died! Exiting.")
+ break
# Clean out unused triggers
Trigger.clean_unused()
# Load/delete triggers
@@ -470,17 +473,21 @@ class TriggerRunner(threading.Thread, LoggingMixin):
watchdog = asyncio.create_task(self.block_watchdog())
last_status = time.time()
while not self.stop:
- # Run core logic
- await self.create_triggers()
- await self.cancel_triggers()
- await self.cleanup_finished_triggers()
- # Sleep for a bit
- await asyncio.sleep(1)
- # Every minute, log status
- if time.time() - last_status >= 60:
- count = len(self.triggers)
- self.log.info("%i triggers currently running", count)
- last_status = time.time()
+ try:
+ # Run core logic
+ await self.create_triggers()
+ await self.cancel_triggers()
+ await self.cleanup_finished_triggers()
+ # Sleep for a bit
+ await asyncio.sleep(1)
+ # Every minute, log status
+ if time.time() - last_status >= 60:
+ count = len(self.triggers)
+ self.log.info("%i triggers currently running", count)
+ last_status = time.time()
+ except Exception:
+ self.stop = True
+ raise
# Wait for watchdog to complete
await watchdog
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index c3b4e69700..35ebe99b31 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -461,6 +461,49 @@ def test_trigger_from_expired_triggerer(session):
assert [x for x, y in job_runner.trigger_runner.to_create] == [1]
+def test_trigger_runner_exception_stops_triggerer(session):
+ """
+ Checks that if an exception occurs when creating triggers, that the
triggerer
+ process stops
+ """
+
+ class MockTriggerException(Exception):
+ pass
+
+ class TriggerRunner_(TriggerRunner):
+ async def create_triggers(self):
+ raise MockTriggerException("Trigger creation failed")
+
+ # Use a trigger that will immediately succeed
+ trigger = SuccessTrigger()
+ create_trigger_in_db(session, trigger)
+
+ # Make a TriggererJobRunner and have it retrieve DB tasks
+ job = Job()
+ job_runner = TriggererJobRunner(job)
+ job_runner.trigger_runner = TriggerRunner_()
+ thread = Thread(target=job_runner._execute)
+ thread.start()
+
+ # Wait 4 seconds for the triggerer to stop
+ try:
+ for _ in range(40):
+ time.sleep(0.1)
+ if not thread.is_alive():
+ break
+ else:
+ pytest.fail("TriggererJobRunner did not stop after exception in
TriggerRunner")
+
+ if not job_runner.trigger_runner.stop:
+ pytest.fail("TriggerRunner not marked as stopped after exception
in TriggerRunner")
+
+ finally:
+ job_runner.trigger_runner.stop = True
+ # with suppress(MockTriggerException):
+ job_runner.trigger_runner.join()
+ thread.join()
+
+
def test_trigger_firing(session):
"""
Checks that when a trigger fires, it correctly makes it into the