vincbeck commented on PR #44369:
URL: https://github.com/apache/airflow/pull/44369#issuecomment-2501840580
> On trigger restart or reassignment to another triggerer process the
coroutine is cancelled and a check for `trigger_timeout` is done on
`task_instance` where `task_instance` is None in this case and could be checked
with sample patch as below to handle this.
>
> Traceback on trying this out locally with sample dag and ctrl+c to stop
the triggerer
>
> ```
> [2024-11-26T20:58:28.870+0530] {base_events.py:1744} ERROR - unhandled
exception during asyncio.run() shutdown
> task: <Task finished name='Task-3' coro=<TriggerRunner.run_trigger() done,
defined at
/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py:632>
exception=AttributeError("'NoneType' object has no attribute
'trigger_timeout'")>
> Traceback (most recent call last):
> File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py",
line 639, in run_trigger
> async for event in trigger.run():
> File "/home/karthikeyan/stuff/python/airflow/airflow/triggers/file.py",
line 87, in run
> await asyncio.sleep(self.poke_interval)
> File "/usr/lib/python3.10/asyncio/tasks.py", line 605, in sleep
> return await future
> asyncio.exceptions.CancelledError
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
> File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py",
line 644, in run_trigger
> if timeout := trigger.task_instance.trigger_timeout:
> AttributeError: 'NoneType' object has no attribute 'trigger_timeout'
> ```
>
> Checking for task_instance on trigger
>
> ```diff
> diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
> index 8c226334f7..d7e5dbc6b1 100644
> --- a/airflow/jobs/triggerer_job_runner.py
> +++ b/airflow/jobs/triggerer_job_runner.py
> @@ -641,7 +641,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
> self.triggers[trigger_id]["events"] += 1
> self.events.append((trigger_id, event))
> except asyncio.CancelledError:
> - if timeout := trigger.task_instance.trigger_timeout:
> + if timeout := (trigger.task_instance and
trigger.task_instance.trigger_timeout):
> timeout = timeout.replace(tzinfo=timezone.utc) if not
timeout.tzinfo else timeout
> if timeout < timezone.utcnow():
> self.log.error("Trigger cancelled due to timeout")
> ```
>
> Sample dag :
>
> ```python
> from __future__ import annotations
>
> from datetime import datetime
>
> from airflow import DAG
> from airflow.operators.empty import EmptyOperator
> from airflow.triggers.file import FileTrigger
> from airflow.sdk.definitions.asset import Asset
>
> trigger = FileTrigger(filepath="/tmp/a")
> asset = Asset("test_asset_1", watchers=[trigger])
>
> with DAG(
> dag_id="file_trigger_timeout",
> start_date=datetime(2021, 1, 1),
> catchup=False,
> schedule=[asset],
> ) as dag:
> t1 = EmptyOperator(task_id="t1")
>
> t1
> ```
It should be fixed now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]