This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 0522dc0e9f4 [v3-1-test] BUGFIX: Fixed timeout_after in run_trigger
method of TriggerRunner (#58282) (#58703)
0522dc0e9f4 is described below
commit 0522dc0e9f4ddd3fd2b95e4f70595dcfcf3ae042
Author: Wei Lee <[email protected]>
AuthorDate: Wed Nov 26 12:55:55 2025 +0800
[v3-1-test] BUGFIX: Fixed timeout_after in run_trigger method of
TriggerRunner (#58282) (#58703)
Co-authored-by: David Blain <[email protected]>
---
airflow-core/src/airflow/jobs/triggerer_job_runner.py | 7 ++++---
airflow-core/tests/unit/jobs/test_triggerer_job.py | 7 +++++--
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 617643c3f24..292c18f8799 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -27,6 +27,7 @@ import time
from collections import deque
from collections.abc import Generator, Iterable
from contextlib import suppress
+from datetime import datetime
from socket import socket
from traceback import format_exception
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
@@ -962,7 +963,7 @@ class TriggerRunner:
)
self.triggers[trigger_id] = {
"task": asyncio.create_task(
- self.run_trigger(trigger_id, trigger_instance),
name=trigger_name
+ self.run_trigger(trigger_id, trigger_instance,
workload.timeout_after), name=trigger_name
),
"is_watcher": isinstance(trigger_instance,
events.BaseEventTrigger),
"name": trigger_name,
@@ -1097,7 +1098,7 @@ class TriggerRunner:
)
Stats.incr("triggers.blocked_main_thread")
- async def run_trigger(self, trigger_id, trigger):
+ async def run_trigger(self, trigger_id: int, trigger: BaseTrigger,
timeout_after: datetime | None = None):
"""Run a trigger (they are async generators) and push their events
into our outbound event deque."""
if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower()
== "true":
import greenback
@@ -1118,7 +1119,7 @@ class TriggerRunner:
except asyncio.CancelledError:
# We get cancelled by the scheduler changing the task state. But
if we do lets give a nice error
# message about it
- if timeout := trigger.timeout_after:
+ if timeout := timeout_after:
timeout = timeout.replace(tzinfo=timezone.utc) if not
timeout.tzinfo else timeout
if timeout < timezone.utcnow():
await self.log.aerror("Trigger cancelled due to timeout")
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 14302e36a83..3181afa540c 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -325,11 +325,14 @@ class TestTriggerRunner:
1: {"task": MagicMock(spec=asyncio.Task), "name": "mock_name",
"events": 0}
}
mock_trigger = MagicMock(spec=BaseTrigger)
- mock_trigger.timeout_after = timezone.utcnow() -
datetime.timedelta(hours=1)
mock_trigger.run.side_effect = asyncio.CancelledError()
with pytest.raises(asyncio.CancelledError):
- asyncio.run(trigger_runner.run_trigger(1, mock_trigger))
+ asyncio.run(
+ trigger_runner.run_trigger(
+ 1, mock_trigger, timeout_after=timezone.utcnow() -
datetime.timedelta(hours=1)
+ )
+ )
assert {"event": "Trigger cancelled due to timeout", "log_level":
"error"} in cap_structlog
@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs")