dstandish opened a new pull request, #33718: URL: https://github.com/apache/airflow/pull/33718
Alternative to https://github.com/apache/airflow/pull/32990 resolves https://github.com/apache/airflow/issues/32638 This is a less invasive approach. Essentially, what we do here is, update `BaseOperator.resume_execution` so that when trigger times out then it raises special exception `AirflowDeferralTimeout`. Then, `BaseSensorOperator.resume_execution`, we reraise `AirflowDeferralTimeout` as a `AirflowSensorTimeout`. So, if a sensor resumes from a timed-out deferral, then it's interpreted as a sensor timeout. All that is required is for a sensor to add a timeout to the deferral. Example logs: ``` ... [2023-08-25, 07:00:54 UTC] {taskinstance.py:1357} INFO - Resuming after deferral [2023-08-25, 07:00:54 UTC] {taskinstance.py:1380} INFO - Executing <Task(TimeDeltaSensorAsync): delta_sensor> on 2023-08-25 07:00:32.864128+00:00 [2023-08-25, 07:00:54 UTC] {standard_task_runner.py:57} INFO - Started process 50543 to run task [2023-08-25, 07:00:54 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'simple2', 'delta_sensor', 'manual__2023-08-25T07:00:32.864128+00:00', '--job-id', '129', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/async_timeout.py', '--cfg-path', '/var/folders/9c/tknx7xx10qx92983y1r5djb40000gn/T/tmp72slvjz_'] [2023-08-25, 07:00:54 UTC] {standard_task_runner.py:85} INFO - Job 129: Subtask delta_sensor [2023-08-25, 07:00:54 UTC] {task_command.py:415} INFO - Running <TaskInstance: simple2.delta_sensor manual__2023-08-25T07:00:32.864128+00:00 [running]> on host daniels-mbp.lan [2023-08-25, 07:00:54 UTC] {taskinstance.py:1933} ERROR - Task failed with exception Traceback (most recent call last): File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 288, in resume_execution return super().resume_execution(next_method, next_kwargs, context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/dstandish/code/airflow/airflow/models/baseoperator.py", line 1605, in resume_execution raise AirflowDeferralTimeout(error) airflow.exceptions.AirflowDeferralTimeout: Trigger timeout The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 290, in resume_execution raise AirflowSensorTimeout(*e.args) from e airflow.exceptions.AirflowSensorTimeout: Trigger timeout [2023-08-25, 07:00:54 UTC] {taskinstance.py:1398} INFO - Immediate failure requested. Marking task as FAILED. dag_id=simple2, task_id=delta_sensor, execution_date=20230825T070032, start_date=20230825T070034, end_date=20230825T070054 [2023-08-25, 07:00:54 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 129 for task delta_sensor (Trigger timeout; 50543) [2023-08-25, 07:00:54 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1 [2023-08-25, 07:00:54 UTC] {taskinstance.py:2774} INFO - 0 downstream tasks scheduled from follow-on schedule check [2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:611} ERROR - Trigger cancelled due to timeout [2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:612} ERROR - Trigger cancelled; message= ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
