tirkarthi commented on PR #44369:
URL: https://github.com/apache/airflow/pull/44369#issuecomment-2501212903

   On trigger restart or reassignment to another triggerer process the 
coroutine is cancelled and a check for `trigger_timeout` is done on 
`task_instance` where `task_instance` is None in this case and could be checked 
with sample patch as below to handle this.
   
   Traceback on trying this out locally with sample dag and ctrl+c to stop the 
triggerer
   
   ```
   [2024-11-26T20:58:28.870+0530] {base_events.py:1744} ERROR - unhandled 
exception during asyncio.run() shutdown
   task: <Task finished name='Task-3' coro=<TriggerRunner.run_trigger() done, 
defined at 
/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py:632>
 exception=AttributeError("'NoneType' object has no attribute 
'trigger_timeout'")>
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", 
line 639, in run_trigger
       async for event in trigger.run():
     File "/home/karthikeyan/stuff/python/airflow/airflow/triggers/file.py", 
line 87, in run
       await asyncio.sleep(self.poke_interval)
     File "/usr/lib/python3.10/asyncio/tasks.py", line 605, in sleep
       return await future
   asyncio.exceptions.CancelledError
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", 
line 644, in run_trigger
       if timeout := trigger.task_instance.trigger_timeout:
   AttributeError: 'NoneType' object has no attribute 'trigger_timeout'
   ```
   
   Checking for task_instance on trigger
   
   ```patch
   diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
   index 8c226334f7..d7e5dbc6b1 100644
   --- a/airflow/jobs/triggerer_job_runner.py
   +++ b/airflow/jobs/triggerer_job_runner.py
   @@ -641,7 +641,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                    self.triggers[trigger_id]["events"] += 1
                    self.events.append((trigger_id, event))
            except asyncio.CancelledError:
   -            if timeout := trigger.task_instance.trigger_timeout:
   +            if timeout := (trigger.task_instance and 
trigger.task_instance.trigger_timeout):
                    timeout = timeout.replace(tzinfo=timezone.utc) if not 
timeout.tzinfo else timeout
                    if timeout < timezone.utcnow():
                        self.log.error("Trigger cancelled due to timeout")
   ```
   
   Sample dag :
   
   ```python
   from __future__ import annotations
   
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.empty import EmptyOperator
   from airflow.triggers.file import FileTrigger
   from airflow.sdk.definitions.asset import Asset
   
   trigger = FileTrigger(filepath="/tmp/a")
   asset = Asset("test_asset_1", watchers=[trigger])
   
   with DAG(
       dag_id="file_trigger_timeout",
       start_date=datetime(2021, 1, 1),
       catchup=False,
       schedule=[asset],
   ) as dag:
       t1 = EmptyOperator(task_id="t1")
   
       t1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to