dstandish opened a new pull request, #33718:
URL: https://github.com/apache/airflow/pull/33718

   Alternative to https://github.com/apache/airflow/pull/32990
   
   resolves https://github.com/apache/airflow/issues/32638
   
   This is a less invasive approach.  Essentially, what we do here is, update 
`BaseOperator.resume_execution` so that when trigger times out then it raises 
special exception `AirflowDeferralTimeout`.
   
   Then, `BaseSensorOperator.resume_execution`, we reraise 
`AirflowDeferralTimeout` as a `AirflowSensorTimeout`.
   
   So, if a sensor resumes from a timed-out deferral, then it's interpreted as 
a sensor timeout.
   
   All that is required is for a sensor to add a timeout to the deferral.
   
   Example logs:
   
   ```
   ...
   [2023-08-25, 07:00:54 UTC] {taskinstance.py:1357} INFO - Resuming after 
deferral
   [2023-08-25, 07:00:54 UTC] {taskinstance.py:1380} INFO - Executing 
<Task(TimeDeltaSensorAsync): delta_sensor> on 2023-08-25 07:00:32.864128+00:00
   [2023-08-25, 07:00:54 UTC] {standard_task_runner.py:57} INFO - Started 
process 50543 to run task
   [2023-08-25, 07:00:54 UTC] {standard_task_runner.py:84} INFO - Running: 
['airflow', 'tasks', 'run', 'simple2', 'delta_sensor', 
'manual__2023-08-25T07:00:32.864128+00:00', '--job-id', '129', '--raw', 
'--subdir', 
'/Users/dstandish/code/airflow/airflow/example_dags/async_timeout.py', 
'--cfg-path', '/var/folders/9c/tknx7xx10qx92983y1r5djb40000gn/T/tmp72slvjz_']
   [2023-08-25, 07:00:54 UTC] {standard_task_runner.py:85} INFO - Job 129: 
Subtask delta_sensor
   [2023-08-25, 07:00:54 UTC] {task_command.py:415} INFO - Running 
<TaskInstance: simple2.delta_sensor manual__2023-08-25T07:00:32.864128+00:00 
[running]> on host daniels-mbp.lan
   [2023-08-25, 07:00:54 UTC] {taskinstance.py:1933} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 288, in 
resume_execution
       return super().resume_execution(next_method, next_kwargs, context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/Users/dstandish/code/airflow/airflow/models/baseoperator.py", line 
1605, in resume_execution
       raise AirflowDeferralTimeout(error)
   airflow.exceptions.AirflowDeferralTimeout: Trigger timeout
   The above exception was the direct cause of the following exception:
   Traceback (most recent call last):
     File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 290, in 
resume_execution
       raise AirflowSensorTimeout(*e.args) from e
   airflow.exceptions.AirflowSensorTimeout: Trigger timeout
   [2023-08-25, 07:00:54 UTC] {taskinstance.py:1398} INFO - Immediate failure 
requested. Marking task as FAILED. dag_id=simple2, task_id=delta_sensor, 
execution_date=20230825T070032, start_date=20230825T070034, 
end_date=20230825T070054
   [2023-08-25, 07:00:54 UTC] {standard_task_runner.py:104} ERROR - Failed to 
execute job 129 for task delta_sensor (Trigger timeout; 50543)
   [2023-08-25, 07:00:54 UTC] {local_task_job_runner.py:228} INFO - Task exited 
with return code 1
   [2023-08-25, 07:00:54 UTC] {taskinstance.py:2774} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:611} ERROR - Trigger 
cancelled due to timeout
   [2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:612} ERROR - Trigger 
cancelled; message=
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to