Birne94 opened a new issue, #44759:
URL: https://github.com/apache/airflow/issues/44759

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.8.1
   
   ### What happened?
   
   We are using deferred operators to execute jobs in databricks. These jobs 
utlize a common database so we use task pools to limit the concurrency to 1 
task. This pool includes deferred operators. In some cases we see task 
timeouts, even though the deferred task successfully finished. You can see 1.5h 
passing between trigger event and scheduling:
   
   ```
   [2024-12-06, 14:01:10 CET] {{taskinstance.py:2344}} INFO - Pausing task as 
DEFERRED. dag_id=my-dag, task_id=my-task, execution_date=20241205T130000, 
start_date=20241206T130108
   [2024-12-06, 14:01:10 CET] {{local_task_job_runner.py:231}} INFO - Task 
exited with return code 100 (task deferral)
   [2024-12-06, 14:01:11 CET] {{base.py:83}} INFO - Using connection ID 
'databricks' for task execution.
   [2024-12-06, 14:01:11 CET] {{databricks.py:94}} INFO - run-id 
847717920033451 in run state {'life_cycle_state': 'PENDING', 'result_state': 
'', 'state_message': 'Waiting for cluster'}. sleeping for 30 seconds
   ...
   [2024-12-06, 14:09:42 CET] {{databricks.py:94}} INFO - run-id 
847717920033451 in run state {'life_cycle_state': 'RUNNING', 'result_state': 
'', 'state_message': 'In run'}. sleeping for 30 seconds
   [2024-12-06, 14:10:12 CET] {{databricks.py:94}} INFO - run-id 
847717920033451 in run state {'life_cycle_state': 'RUNNING', 'result_state': 
'', 'state_message': 'In run'}. sleeping for 30 seconds
   [2024-12-06, 14:10:42 CET] {{triggerer_job_runner.py:602}} INFO - Trigger 
my-dag/scheduled__2024-12-05T13:00:00+00:00/my-task/-1/1 (ID 10030) fired: 
TriggerEvent<{'run_id': 847717920033451, 'run_page_url': '...', 'run_state': 
'{"life_cycle_state": "TERMINATED", "result_state": "SUCCESS", "state_message": 
""}'}>
   [2024-12-06, 15:38:27 CET] {{taskinstance.py:1956}} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: my-dag.my-task 
scheduled__2024-12-05T13:00:00+00:00 [queued]>
   ...
   [2024-12-06, 15:38:27 CET] {{taskinstance.py:2698}} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 425, in _execute_task
       raise AirflowTaskTimeout()
   airflow.exceptions.AirflowTaskTimeout
   ```
   Our assumption of what happens in the following:
   
   - Many tasks are waiting to be executed but are limited by the pool
   - Task starts running and is deferred (pool slot is consumed)
   - Deferred task is running in the triggerer (pool slot is consumed)
   - Deferred task emits trigger event and stops (pool slot is released)
   - As the pool slot is released, another task starts running (pool slot is 
consumed again)
   - The post-deferral task for our previous task is scheduled, but cannot run 
due to unavailable pool slots.
   - After the task that got scheduled in between finishes and the pool is 
released, the post-deferral task runs and times out immediately.
   
   ### What you think should happen instead?
   
   I see multiple things that could improve this behavior:
   
   - Tasks waking up after deferral do not consume slots within task pools.
   - Tasks waking up have priority over other tasks when making scheduling 
decisions.
   - Tasks waking up have their own timeout for the post-deferral trigger.
   
   
   ### How to reproduce
   
   - Create a DAG with many deferrable tasks sharing a single task pool.
   - Reduce pool capacity to 1 and enable `Include Deferred`.
   - Observe that sometimes a new task is scheduled before the post-deferral 
task is being scheduled.
   
   ### Operating System
   
   Amazon Linux 2
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to