llamageddon83 opened a new issue, #32928: URL: https://github.com/apache/airflow/issues/32928
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened We are running Airflow on EKS with version 2.5.3. Airflow has been experiencing progressive slowness over a period of 2-3 weeks where DAGs start getting queued without ever executing and leads us to restart the scheduler pod. After the pod restart, problem goes away for a few days and then starts to slowly creep back up. The pods, the logs and the dashboards all look healthy, the UI shows that no tasks are currently running, and that there are no worker pods alive. The resource usage graphs (CPU, memory) also look what they should if no DAGs are actually executing. During one such outage, we disabled all the DAGs and marked all the tasks as success just to see if scheduler is able to spin up new worker pods. Scheduler never recovered and we restarted the scheduler pod. However, there is one dashboard that shows metrics named `Executor running tasks` and `Executor open slots`. We noticed that this dashboard was accurately representing the slowness behavior. Over a period of time, number of open slots would decrease and vice versa for running tasks. These two would never reset even when nothing is running during a long period of time which is every day between 10:00 PM to 8:00 AM. These [metrics](https://github.com/apache/airflow/blob/2.5.3/airflow/executors/base_executor.py#L166) are coming from `base_exeuctor` : ```python Stats.gauge("executor.open_slots", open_slots) Stats.gauge("executor.queued_tasks", num_queued_tasks) Stats.gauge("executor.running_tasks", num_running_tasks) ``` and `num_running_tasks` is defined as `num_running_tasks = len(self.running)` in `base_executor`. <img width="860" alt="Screenshot 2023-07-28 at 3 11 30 PM" src="https://github.com/apache/airflow/assets/108699169/4f8bfade-36b2-41c5-9e33-1fc0c45e9e36"> So we enabled some logs from `KuberenetesExecutor` under this [method](https://github.com/apache/airflow/blob/2.5.3/airflow/executors/kubernetes_executor.py#L610C17-L610C17) to see what was in `self.running`: ```python def sync(self) -> None: """Synchronize task state.""" #### if self.running: self.log.debug("self.running: %s", self.running) #--> this log ### self.kube_scheduler.sync() ``` where `self.running` is defined as `self.running: set[TaskInstanceKey] = set()`. The log showed that somehow the tasks that have been completed successfully in the past still exist in `self.running`. For example, a snippet of the log outputted on the 28th is holding on to the tasks that have already been successfully completed on the 24th and 27th: ``` **time: Jul 28, 2023 @ 15:07:01.784** self.running: {TaskInstanceKey(dag_id='flight_history.py', task_id='load_file', run_id=**'manual__2023-07-24T01:06:18+00:00'**, try_number=1, map_index=17), TaskInstanceKey(dag_id='emd_load.py', task_id='processing.emd', run_id='**scheduled__2023-07-25T07:30:00+00:00'**, try_number=1, map_index=-1), ``` We validated that these tasks have been completed without any issue from the UI and Postgres DB (which we use as the metadata backend). Once the scheduler pod is restarted, the problem goes away, the metrics in Grafana dashboard reset and tasks start executing. ### What you think should happen instead Airflow's scheduler is keeping a track of currently running tasks and their state in memory. And that state in some cases is not getting cleared. The tasks that have been completed should eventually be cleared from `running` set in `KubernetesExecutor` once the worker pod exits. ### How to reproduce Beats me. Our initial assumption was that that is a DAG implementation issue and some particular DAG is misbehaving. But this problem has occurred with all sorts of DAGs, happens for scheduled and manual runs, and is sporadic. Tt here is some edge scenario that causes this to happen. But we are unable to nail it down any further. ### Operating System Debian GNU/ Linux 11 (bullseye) ### Versions of Apache Airflow Providers aiofiles==23.1.0 aiohttp==3.8.4 airflow-dbt>=0.4.0 airflow-exporter==1.5.3 anytree==2.8.0 apache-airflow-providers-ftp==2.0.1 apache-airflow-providers-http>=2.0.3 apache-airflow-providers-microsoft-mssql==2.1.3 apache-airflow-providers-snowflake>=4.0.4 apache-airflow-providers-hashicorp==3.3.0 apache-airflow-providers-cncf-kubernetes==5.2.2 apache-airflow>=2.2.3 asgiref==3.5.0 Authlib==0.15.5 dbt-snowflake==1.5.2 flatdict==4.0.1 hvac==0.11.2 jsonschema>=4.17.3 pandas==1.3.5 psycopg2-binary==2.9.3 pyOpenSSL==23.1.1 pysftp==0.2.9 pysmbclient==0.1.5 python-gnupg==0.5.0 PyYAML~=5.4.1 requests~=2.26.0 smbprotocol==1.9.0 snowflake-connector-python== 3.0.4 snowflake-sqlalchemy==1.4.7 statsd==3.3.0 py7zr==0.20.5 ### Deployment Official Apache Airflow Helm Chart ### Deployment details Airflow is deployed via helm charts on EKS in AWS. There are two scheduler pods with `AIRFLOW__CORE__PARALLELISM` set to `10`. ### Anything else N/A ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
