This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1c3ac6c6d0b BUGFIX: Fixed timeout_after in run_trigger method of
TriggerRunner (#58282)
1c3ac6c6d0b is described below
commit 1c3ac6c6d0b169cb57f92b6b14d8bd03361adddc
Author: David Blain <[email protected]>
AuthorDate: Tue Nov 25 09:40:56 2025 +0000
BUGFIX: Fixed timeout_after in run_trigger method of TriggerRunner (#58282)
---
airflow-core/src/airflow/jobs/triggerer_job_runner.py | 7 ++++---
airflow-core/tests/unit/jobs/test_triggerer_job.py | 7 +++++--
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index cd8b19622d2..5833593ee82 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -27,6 +27,7 @@ import time
from collections import deque
from collections.abc import Generator, Iterable
from contextlib import suppress
+from datetime import datetime
from socket import socket
from traceback import format_exception
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
@@ -962,7 +963,7 @@ class TriggerRunner:
)
self.triggers[trigger_id] = {
"task": asyncio.create_task(
- self.run_trigger(trigger_id, trigger_instance),
name=trigger_name
+ self.run_trigger(trigger_id, trigger_instance,
workload.timeout_after), name=trigger_name
),
"is_watcher": isinstance(trigger_instance,
events.BaseEventTrigger),
"name": trigger_name,
@@ -1097,7 +1098,7 @@ class TriggerRunner:
)
Stats.incr("triggers.blocked_main_thread")
- async def run_trigger(self, trigger_id, trigger):
+ async def run_trigger(self, trigger_id: int, trigger: BaseTrigger,
timeout_after: datetime | None = None):
"""Run a trigger (they are async generators) and push their events
into our outbound event deque."""
if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower()
== "true":
import greenback
@@ -1118,7 +1119,7 @@ class TriggerRunner:
except asyncio.CancelledError:
# We get cancelled by the scheduler changing the task state. But
if we do lets give a nice error
# message about it
- if timeout := trigger.timeout_after:
+ if timeout := timeout_after:
timeout = timeout.replace(tzinfo=timezone.utc) if not
timeout.tzinfo else timeout
if timeout < timezone.utcnow():
await self.log.aerror("Trigger cancelled due to timeout")
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 29a187eb5a8..2dfb735bc34 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -325,11 +325,14 @@ class TestTriggerRunner:
1: {"task": MagicMock(spec=asyncio.Task), "name": "mock_name",
"events": 0}
}
mock_trigger = MagicMock(spec=BaseTrigger)
- mock_trigger.timeout_after = timezone.utcnow() -
datetime.timedelta(hours=1)
mock_trigger.run.side_effect = asyncio.CancelledError()
with pytest.raises(asyncio.CancelledError):
- asyncio.run(trigger_runner.run_trigger(1, mock_trigger))
+ asyncio.run(
+ trigger_runner.run_trigger(
+ 1, mock_trigger, timeout_after=timezone.utcnow() -
datetime.timedelta(hours=1)
+ )
+ )
assert {"event": "Trigger cancelled due to timeout", "log_level":
"error"} in cap_structlog
@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs")