This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 12e3355632f Fix flaky test_trigger_runner_exception_stops_triggerer
race (#67782)
12e3355632f is described below
commit 12e3355632fee6c2667c0f8cfe5bf40a2fdb0434
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat May 30 18:07:17 2026 +0200
Fix flaky test_trigger_runner_exception_stops_triggerer race (#67782)
The test arms a SIGALRM timer for 0.1s, then calls _execute(); the
handler reads job_runner.trigger_runner.pid to kill the subprocess.
But trigger_runner is only assigned inside _execute() after the
supervisor subprocess forks, so on a slow CI runner the timer could
fire first and raise "'TriggererJobRunner' object has no attribute
'trigger_runner'".
Initialize trigger_runner to None in __init__ so signal handlers
(on_kill, _exit_gracefully) and the _execute() finally block never
hit AttributeError, and re-arm the timer in the test handler when the
subprocess has not started yet.
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
airflow-core/src/airflow/jobs/triggerer_job_runner.py | 8 ++++++--
airflow-core/tests/unit/jobs/test_triggerer_job.py | 10 ++++++++--
2 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index cb8e45b343d..38f67a9d43e 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -193,6 +193,9 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
raise ValueError(f"Capacity number {capacity!r} is invalid")
self.queues = queues
self.team_name = team_name
+ # Set up only when _execute() starts the subprocess; keep it defined
so that
+ # signal handlers (or other code) firing before startup don't hit
AttributeError.
+ self.trigger_runner: TriggerRunnerSupervisor | None = None
def register_signals(self) -> None:
"""Register signals that stop child processes."""
@@ -256,8 +259,9 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
self.log.info("Waiting for triggers to clean up")
# Tell the subtproc to stop and then wait for it.
# If the user interrupts/terms again, _graceful_exit will allow
them
- # to force-kill here.
- self.trigger_runner.kill(escalation_delay=10, force=True)
+ # to force-kill here. trigger_runner may be None if start() raised.
+ if self.trigger_runner is not None:
+ self.trigger_runner.kill(escalation_delay=10, force=True)
self.log.info("Exited trigger loop")
return None
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index f42c84a2bf5..9a0084f5b14 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1149,13 +1149,19 @@ def test_trigger_runner_exception_stops_triggerer():
import signal
job_runner = TriggererJobRunner(Job())
- time.sleep(0.1)
# Wait 4 seconds for the triggerer to stop
try:
def on_timeout(signum, frame):
- os.kill(job_runner.trigger_runner.pid, signal.SIGKILL)
+ # _execute() sets up trigger_runner asynchronously; on a slow
runner the
+ # timer can fire before the subprocess exists. Re-arm and try
again rather
+ # than dereferencing a not-yet-started runner.
+ runner = job_runner.trigger_runner
+ if runner is None:
+ signal.setitimer(signal.ITIMER_REAL, 0.1)
+ return
+ os.kill(runner.pid, signal.SIGKILL)
signal.signal(signal.SIGALRM, on_timeout)
signal.setitimer(signal.ITIMER_REAL, 0.1)