This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c3ac6c6d0b BUGFIX: Fixed timeout_after in run_trigger method of 
TriggerRunner (#58282)
1c3ac6c6d0b is described below

commit 1c3ac6c6d0b169cb57f92b6b14d8bd03361adddc
Author: David Blain <[email protected]>
AuthorDate: Tue Nov 25 09:40:56 2025 +0000

    BUGFIX: Fixed timeout_after in run_trigger method of TriggerRunner (#58282)
---
 airflow-core/src/airflow/jobs/triggerer_job_runner.py | 7 ++++---
 airflow-core/tests/unit/jobs/test_triggerer_job.py    | 7 +++++--
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index cd8b19622d2..5833593ee82 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -27,6 +27,7 @@ import time
 from collections import deque
 from collections.abc import Generator, Iterable
 from contextlib import suppress
+from datetime import datetime
 from socket import socket
 from traceback import format_exception
 from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
@@ -962,7 +963,7 @@ class TriggerRunner:
             )
             self.triggers[trigger_id] = {
                 "task": asyncio.create_task(
-                    self.run_trigger(trigger_id, trigger_instance), 
name=trigger_name
+                    self.run_trigger(trigger_id, trigger_instance, 
workload.timeout_after), name=trigger_name
                 ),
                 "is_watcher": isinstance(trigger_instance, 
events.BaseEventTrigger),
                 "name": trigger_name,
@@ -1097,7 +1098,7 @@ class TriggerRunner:
                 )
                 Stats.incr("triggers.blocked_main_thread")
 
-    async def run_trigger(self, trigger_id, trigger):
+    async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, 
timeout_after: datetime | None = None):
         """Run a trigger (they are async generators) and push their events 
into our outbound event deque."""
         if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() 
== "true":
             import greenback
@@ -1118,7 +1119,7 @@ class TriggerRunner:
         except asyncio.CancelledError:
             # We get cancelled by the scheduler changing the task state. But 
if we do lets give a nice error
             # message about it
-            if timeout := trigger.timeout_after:
+            if timeout := timeout_after:
                 timeout = timeout.replace(tzinfo=timezone.utc) if not 
timeout.tzinfo else timeout
                 if timeout < timezone.utcnow():
                     await self.log.aerror("Trigger cancelled due to timeout")
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 29a187eb5a8..2dfb735bc34 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -325,11 +325,14 @@ class TestTriggerRunner:
             1: {"task": MagicMock(spec=asyncio.Task), "name": "mock_name", 
"events": 0}
         }
         mock_trigger = MagicMock(spec=BaseTrigger)
-        mock_trigger.timeout_after = timezone.utcnow() - 
datetime.timedelta(hours=1)
         mock_trigger.run.side_effect = asyncio.CancelledError()
 
         with pytest.raises(asyncio.CancelledError):
-            asyncio.run(trigger_runner.run_trigger(1, mock_trigger))
+            asyncio.run(
+                trigger_runner.run_trigger(
+                    1, mock_trigger, timeout_after=timezone.utcnow() - 
datetime.timedelta(hours=1)
+                )
+            )
         assert {"event": "Trigger cancelled due to timeout", "log_level": 
"error"} in cap_structlog
 
     @patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs")

Reply via email to