kristoffern opened a new issue #19699:
URL: https://github.com/apache/airflow/issues/19699


   ### Apache Airflow version
   
   2.2.2 (latest released)
   
   ### Operating System
   
   apache/airflow:2.2.2-python3.8
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-celery = "2.1.0"
   apache-airflow-providers-papermill = "^2.1.0"
   apache-airflow-providers-postgres = "^2.2.0"
   apache-airflow-providers-google = "^6.1.0"
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Docker-compose deploys into our GCP k8s cluster
   
   ### What happened
   
   Hi,
   
   we're running Airflow for our ETL pipelines. 
   
   Our DAGs run in parallel and we spawn a fair bit of parallel DAGs and tasks 
every morning for our pipelines.
   We run our Airflow in a k8s cluster in GCP and we use Celery for our 
executors.
   And we use autopilot to dynamically scale up and down the cluster as the 
workload increases or decreases, thereby sometimes tearing down airflow workers.
   
   Ever since upgrading to Airflow 2.0 we've had a lot of problems with tasks 
getting stuck in "queued" or "running", and we've had to clean it up by 
manually failing the stuck tasks and re-running the DAGs.
   Following the discussions here over the last months it looks we've not been 
alone :-)
   
   But, after upgrading to Airflow 2.2.1 we saw a significant decrease in the 
number of tasks getting stuck (yay!), something we hoped for given the bug 
fixes for the scheduler in that release.
   However, we still have a few tasks getting stuck (Stuck = "Task in queued") 
on most mornings that require the same manual intervention. 
   
   I've started digging in the Airflow DB trying to see where there's a 
discrepancy, and every time a task gets stuck it's missing a correspondning 
task in the table "celery_taskmeta".
   This is a consistent pattern for the tasks that are stuck with us at this 
point. The task has rows in the tables "task_instance", "job", and "dag_run" 
with IDs referencing each other. 
   
   But the "external_executor_id" in "task_instance" is missing a corresponding 
entry in the "celery_taskmeta" table. So nothing ever gets executed and the 
task_instance is forever stuck in "queued" and never cleaned up by the 
scheduler.
   
   I can see in "dag_run::last_scheduling_decision" that the scheduler is 
continuously re-evaluating this task as the timestamp is updated, so it's 
inspecting it at least, but it leaves everything in the "queued" state. 
   
   The other day I bumped our Airflow to 2.2.2, but we still get the same 
behavior.
   And finally, whenever we get tasks that are stuck in "Queued" in this way 
they usually occur within the same few seconds timestamp-wise, and it 
correlates timewise to a timestamp when autopilot scaled down the number of 
airflow-workers.
   
   Would it be possible (and a good idea?) to include in the scheduler a check 
if a "task_instance" row has a corresponding row in the "celery_taskmeta", and 
if its missing in "celery_taskmeta" after a given amount of time clean it up?
   After reading about and watching Ash Berlin-Taylor's most excellent video on 
a deep dive into the Airflow's scheduler this does seem exactly like the check 
that we should add to the scheduler?
   
   Also if there's any data I can dig out and provide for this, don't hesitate 
to let me know.
   
   ### What you expected to happen
   
   I expect orphaned tasks in the state queued and that are missing a 
corresponding entry in celery_taskmeta to be cleaned up and re-executed by the 
scheduler.
   
   ### How to reproduce
   
   Currently no deterministic way to reproduce other than a large amount of 
tasks and then remove a worker at just the right time.
   Occurs every morning in a handful of tasks, but no deterministic way to 
reproduce it.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to