This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e585b588fc Ensure that main triggerer thread exits if the async thread 
fails (#32092)
e585b588fc is described below

commit e585b588fc49b1b1c73a8952e9b257d7a9e13314
Author: tomrutter <[email protected]>
AuthorDate: Tue Jun 27 23:46:16 2023 +0200

    Ensure that main triggerer thread exits if the async thread fails (#32092)
    
    Co-authored-by: tom.rutter <[email protected]>
---
 airflow/jobs/triggerer_job_runner.py | 29 +++++++++++++++---------
 tests/jobs/test_triggerer_job.py     | 43 ++++++++++++++++++++++++++++++++++++
 2 files changed, 61 insertions(+), 11 deletions(-)

diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index 91bd3dbd02..32435cc1e1 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -351,6 +351,9 @@ class TriggererJobRunner(BaseJobRunner["Job | 
JobPydantic"], LoggingMixin):
         This runs synchronously and handles all database reads/writes.
         """
         while not self.trigger_runner.stop:
+            if not self.trigger_runner.is_alive():
+                self.log.error("Trigger runner thread has died! Exiting.")
+                break
             # Clean out unused triggers
             Trigger.clean_unused()
             # Load/delete triggers
@@ -470,17 +473,21 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         watchdog = asyncio.create_task(self.block_watchdog())
         last_status = time.time()
         while not self.stop:
-            # Run core logic
-            await self.create_triggers()
-            await self.cancel_triggers()
-            await self.cleanup_finished_triggers()
-            # Sleep for a bit
-            await asyncio.sleep(1)
-            # Every minute, log status
-            if time.time() - last_status >= 60:
-                count = len(self.triggers)
-                self.log.info("%i triggers currently running", count)
-                last_status = time.time()
+            try:
+                # Run core logic
+                await self.create_triggers()
+                await self.cancel_triggers()
+                await self.cleanup_finished_triggers()
+                # Sleep for a bit
+                await asyncio.sleep(1)
+                # Every minute, log status
+                if time.time() - last_status >= 60:
+                    count = len(self.triggers)
+                    self.log.info("%i triggers currently running", count)
+                    last_status = time.time()
+            except Exception:
+                self.stop = True
+                raise
         # Wait for watchdog to complete
         await watchdog
 
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index c3b4e69700..35ebe99b31 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -461,6 +461,49 @@ def test_trigger_from_expired_triggerer(session):
     assert [x for x, y in job_runner.trigger_runner.to_create] == [1]
 
 
+def test_trigger_runner_exception_stops_triggerer(session):
+    """
+    Checks that if an exception occurs when creating triggers, that the 
triggerer
+    process stops
+    """
+
+    class MockTriggerException(Exception):
+        pass
+
+    class TriggerRunner_(TriggerRunner):
+        async def create_triggers(self):
+            raise MockTriggerException("Trigger creation failed")
+
+    # Use a trigger that will immediately succeed
+    trigger = SuccessTrigger()
+    create_trigger_in_db(session, trigger)
+
+    # Make a TriggererJobRunner and have it retrieve DB tasks
+    job = Job()
+    job_runner = TriggererJobRunner(job)
+    job_runner.trigger_runner = TriggerRunner_()
+    thread = Thread(target=job_runner._execute)
+    thread.start()
+
+    # Wait 4 seconds for the triggerer to stop
+    try:
+        for _ in range(40):
+            time.sleep(0.1)
+            if not thread.is_alive():
+                break
+        else:
+            pytest.fail("TriggererJobRunner did not stop after exception in 
TriggerRunner")
+
+        if not job_runner.trigger_runner.stop:
+            pytest.fail("TriggerRunner not marked as stopped after exception 
in TriggerRunner")
+
+    finally:
+        job_runner.trigger_runner.stop = True
+        # with suppress(MockTriggerException):
+        job_runner.trigger_runner.join()
+        thread.join()
+
+
 def test_trigger_firing(session):
     """
     Checks that when a trigger fires, it correctly makes it into the

Reply via email to