This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 12e3355632f Fix flaky test_trigger_runner_exception_stops_triggerer 
race (#67782)
12e3355632f is described below

commit 12e3355632fee6c2667c0f8cfe5bf40a2fdb0434
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat May 30 18:07:17 2026 +0200

    Fix flaky test_trigger_runner_exception_stops_triggerer race (#67782)
    
    The test arms a SIGALRM timer for 0.1s, then calls _execute(); the
    handler reads job_runner.trigger_runner.pid to kill the subprocess.
    But trigger_runner is only assigned inside _execute() after the
    supervisor subprocess forks, so on a slow CI runner the timer could
    fire first and raise "'TriggererJobRunner' object has no attribute
    'trigger_runner'".
    
    Initialize trigger_runner to None in __init__ so signal handlers
    (on_kill, _exit_gracefully) and the _execute() finally block never
    hit AttributeError, and re-arm the timer in the test handler when the
    subprocess has not started yet.
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 airflow-core/src/airflow/jobs/triggerer_job_runner.py |  8 ++++++--
 airflow-core/tests/unit/jobs/test_triggerer_job.py    | 10 ++++++++--
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index cb8e45b343d..38f67a9d43e 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -193,6 +193,9 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
             raise ValueError(f"Capacity number {capacity!r} is invalid")
         self.queues = queues
         self.team_name = team_name
+        # Set up only when _execute() starts the subprocess; keep it defined 
so that
+        # signal handlers (or other code) firing before startup don't hit 
AttributeError.
+        self.trigger_runner: TriggerRunnerSupervisor | None = None
 
     def register_signals(self) -> None:
         """Register signals that stop child processes."""
@@ -256,8 +259,9 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
             self.log.info("Waiting for triggers to clean up")
             # Tell the subtproc to stop and then wait for it.
             # If the user interrupts/terms again, _graceful_exit will allow 
them
-            # to force-kill here.
-            self.trigger_runner.kill(escalation_delay=10, force=True)
+            # to force-kill here. trigger_runner may be None if start() raised.
+            if self.trigger_runner is not None:
+                self.trigger_runner.kill(escalation_delay=10, force=True)
             self.log.info("Exited trigger loop")
         return None
 
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index f42c84a2bf5..9a0084f5b14 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1149,13 +1149,19 @@ def test_trigger_runner_exception_stops_triggerer():
     import signal
 
     job_runner = TriggererJobRunner(Job())
-    time.sleep(0.1)
 
     # Wait 4 seconds for the triggerer to stop
     try:
 
         def on_timeout(signum, frame):
-            os.kill(job_runner.trigger_runner.pid, signal.SIGKILL)
+            # _execute() sets up trigger_runner asynchronously; on a slow 
runner the
+            # timer can fire before the subprocess exists. Re-arm and try 
again rather
+            # than dereferencing a not-yet-started runner.
+            runner = job_runner.trigger_runner
+            if runner is None:
+                signal.setitimer(signal.ITIMER_REAL, 0.1)
+                return
+            os.kill(runner.pid, signal.SIGKILL)
 
         signal.signal(signal.SIGALRM, on_timeout)
         signal.setitimer(signal.ITIMER_REAL, 0.1)

Reply via email to