benrifkind opened a new issue, #43448:
URL: https://github.com/apache/airflow/issues/43448

   ### Apache Airflow version
   
   2.10.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I am running Airflow 2.10.2 on an AWS EKS cluster with the CeleryExecutor. I 
ran into an issue where the Airflow workers were dying because they were 
OOMing. The workers had triggered many KubernetesPodOperator tasks which were 
in the process of running when the workers died. The workers would restart but 
the Airflow scheduler would get stuck in a bad state thinking it had no slots 
available even when the tasks that were triggered by the now dead worker had 
completed. New tasks would fail to get scheduled and I would see repeated logs 
like
   ```
   [2024-10-23T18:37:10.786+0000] {base_executor.py:292} INFO - Executor 
parallelism limit reached. 0 open slots.
   ```
   Looking at the logs, it seems like the scheduler would realize that these 
are zombie tasks and would set them to failed/up for retry but it still would 
not free up slots for it to schedule new tasks
   ```
    ERROR - Detected zombie job:
   ```
   
   
   
   ### What you think should happen instead?
   
   If the scheduler realizes that the tasks are zombie tasks and sets them to 
failed/up_for_retry, it should also free up the slots that those tasks are 
taking up on the scheduler
   
   ### How to reproduce
   
   Deploy Airflow on kubernetes with the AIRFLOW__CORE__PARALLELISM set to a 
lower number and one worker whose concurrency is greater than or equal to 
AIRFLOW__CORE__PARALLELISM. Then create a DAG that runs the max number of 
KubernetesPodOperator tasks concurrently. Wait until the tasks have started and 
then delete the worker with 
   ```
   kubectl delete pod airflow-worker-0 --grace-period=0 --force
   ```
   The task pods will continue running and succeed. And the scheduler will 
register that these tasks are zombies and mark them as failed. But it will not 
open up slots for these tasks and so it will get stuck. Restarting the 
scheduler will unstick it. 
   
   Note also that we don't have to max out the number of tasks that the worker 
is running relative to AIRFLOW__CORE__PARALLELISM to create this issue. 
Whatever number of tasks that the worker is running when it fails will 
indefinitely take up scheduler slots. I checked this by killing the worker 
while it was running less than `AIRFLOW__CORE__PARALLELISM` multiple times. 
This eventually recreates the issue once all the scheduler slots are taken up.
   
   
   
   Here's a configuration and a DAG that I used to create the issue
   
   ```
       AIRFLOW__CORE__PARALLELISM: 4
       AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD: 120
       AIRFLOW__CELERY__WORKER_CONCURRENCY: 4
   ```
   
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.empty import EmptyOperator
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   from kubernetes.client.models import V1ResourceRequirements
   
   dag = DAG(
       dag_id="high_task_dag_1",
       schedule_interval="* * * * *",
       start_date=datetime(2024, 10, 1),
       max_active_runs=1,
       concurrency=4,
       catchup=False,
   )
   
   empty_operator = EmptyOperator(task_id="start", dag=dag)
   
   
   for i in range(10):
       (
           KubernetesPodOperator(
               dag=dag,
               task_id=f"task_{i}",
               cmds=["sleep"],
               arguments=["30"],
               image="busybox:latest",
               container_resources=V1ResourceRequirements(
                   requests={"cpu": "0.1", "memory": "10Mi"},
                   limits={"cpu": "0.1", "memory": "10Mi"},
               ),
               service_account_name="salesman",
           )
           >> empty_operator
       )
   ```
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==8.28.0
   apache-airflow-providers-celery==3.8.1
   apache-airflow-providers-cncf-kubernetes==8.4.1
   apache-airflow-providers-common-compat==1.2.0
   apache-airflow-providers-common-io==1.4.0
   apache-airflow-providers-common-sql==1.16.0
   apache-airflow-providers-docker==3.13.0
   apache-airflow-providers-elasticsearch==5.5.0
   apache-airflow-providers-fab==1.3.0
   apache-airflow-providers-ftp==3.11.0
   apache-airflow-providers-google==10.22.0
   apache-airflow-providers-grpc==3.6.0
   apache-airflow-providers-hashicorp==3.8.0
   apache-airflow-providers-http==4.13.0
   apache-airflow-providers-imap==3.7.0
   apache-airflow-providers-microsoft-azure==10.4.0
   apache-airflow-providers-mysql==5.7.0
   apache-airflow-providers-odbc==4.7.0
   apache-airflow-providers-openlineage==1.11.0
   apache-airflow-providers-postgres==5.12.0
   apache-airflow-providers-redis==3.8.0
   apache-airflow-providers-sendgrid==3.6.0
   apache-airflow-providers-sftp==4.11.0
   apache-airflow-providers-slack==8.9.0
   apache-airflow-providers-smtp==1.8.0
   apache-airflow-providers-snowflake==5.7.0
   apache-airflow-providers-sqlite==3.9.0
   apache-airflow-providers-ssh==3.13.1
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   I am using this helm chart 
https://github.com/airflow-helm/charts/tree/main/charts/airflow
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to