tirkarthi commented on PR #44369:
URL: https://github.com/apache/airflow/pull/44369#issuecomment-2501212903
On trigger restart or reassignment to another triggerer process the
coroutine is cancelled and a check for `trigger_timeout` is done on
`task_instance` where `task_instance` is None in this case and could be checked
with sample patch as below to handle this.
Traceback on trying this out locally with sample dag and ctrl+c to stop the
triggerer
```
[2024-11-26T20:58:28.870+0530] {base_events.py:1744} ERROR - unhandled
exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<TriggerRunner.run_trigger() done,
defined at
/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py:632>
exception=AttributeError("'NoneType' object has no attribute
'trigger_timeout'")>
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py",
line 639, in run_trigger
async for event in trigger.run():
File "/home/karthikeyan/stuff/python/airflow/airflow/triggers/file.py",
line 87, in run
await asyncio.sleep(self.poke_interval)
File "/usr/lib/python3.10/asyncio/tasks.py", line 605, in sleep
return await future
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py",
line 644, in run_trigger
if timeout := trigger.task_instance.trigger_timeout:
AttributeError: 'NoneType' object has no attribute 'trigger_timeout'
```
Checking for task_instance on trigger
```patch
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index 8c226334f7..d7e5dbc6b1 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -641,7 +641,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
self.triggers[trigger_id]["events"] += 1
self.events.append((trigger_id, event))
except asyncio.CancelledError:
- if timeout := trigger.task_instance.trigger_timeout:
+ if timeout := (trigger.task_instance and
trigger.task_instance.trigger_timeout):
timeout = timeout.replace(tzinfo=timezone.utc) if not
timeout.tzinfo else timeout
if timeout < timezone.utcnow():
self.log.error("Trigger cancelled due to timeout")
```
Sample dag :
```python
from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.triggers.file import FileTrigger
from airflow.sdk.definitions.asset import Asset
trigger = FileTrigger(filepath="/tmp/a")
asset = Asset("test_asset_1", watchers=[trigger])
with DAG(
dag_id="file_trigger_timeout",
start_date=datetime(2021, 1, 1),
catchup=False,
schedule=[asset],
) as dag:
t1 = EmptyOperator(task_id="t1")
t1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]