llamageddon83 opened a new issue, #32928:
URL: https://github.com/apache/airflow/issues/32928

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We are running Airflow on EKS with version 2.5.3. Airflow has been 
experiencing progressive slowness over a period of 2-3 weeks where DAGs start 
getting queued without ever executing and leads us to restart the scheduler 
pod. After the pod restart, problem goes away for a few days and then starts to 
slowly creep back up. 
   
   The pods, the logs and the dashboards all look healthy, the UI shows that no 
tasks are currently running, and that there are no worker pods alive. The 
resource usage graphs (CPU, memory) also look what they should if no DAGs are 
actually executing.
   
   During one such outage, we disabled all the DAGs and marked all the tasks as 
success just to see if scheduler is able to spin up new worker pods. Scheduler 
never recovered and we restarted the scheduler pod.
   
   However, there is one dashboard that shows metrics named `Executor running 
tasks` and `Executor open slots`. We noticed that this dashboard was accurately 
representing the slowness behavior. Over a period of time, number of open slots 
would decrease and vice versa for running tasks. These two would never reset 
even when nothing is running during a long period of time which is every day 
between 10:00 PM to 8:00 AM. 
   
   These 
[metrics](https://github.com/apache/airflow/blob/2.5.3/airflow/executors/base_executor.py#L166)
 are coming from `base_exeuctor` :
   
   ```python
           Stats.gauge("executor.open_slots", open_slots)
           Stats.gauge("executor.queued_tasks", num_queued_tasks)
           Stats.gauge("executor.running_tasks", num_running_tasks)
   ```
   
   and `num_running_tasks` is defined as `num_running_tasks = 
len(self.running)` in `base_executor`.
   
   <img width="860" alt="Screenshot 2023-07-28 at 3 11 30 PM" 
src="https://github.com/apache/airflow/assets/108699169/4f8bfade-36b2-41c5-9e33-1fc0c45e9e36";>
   
   So we enabled some logs from `KuberenetesExecutor` under this 
[method](https://github.com/apache/airflow/blob/2.5.3/airflow/executors/kubernetes_executor.py#L610C17-L610C17)
 to see what was in `self.running`:
   
   ```python
       def sync(self) -> None:
           """Synchronize task state."""
         ####
           if self.running:
               self.log.debug("self.running: %s", self.running)  #--> this log
          ###
           self.kube_scheduler.sync()
   ```
   
   where `self.running` is defined as `self.running: set[TaskInstanceKey] = 
set()`. The log showed that somehow the tasks that have been completed 
successfully in the past still exist in `self.running`. For example, a snippet 
of the log outputted on the 28th is holding on to the tasks that have already 
been successfully completed on the 24th and 27th:
   
   ```
   **time: Jul 28, 2023 @ 15:07:01.784**
   self.running: {TaskInstanceKey(dag_id='flight_history.py', 
task_id='load_file', run_id=**'manual__2023-07-24T01:06:18+00:00'**, 
try_number=1, map_index=17), TaskInstanceKey(dag_id='emd_load.py', 
task_id='processing.emd', run_id='**scheduled__2023-07-25T07:30:00+00:00'**, 
try_number=1, map_index=-1), 
   ```
   
   We validated that these tasks have been completed without any issue from the 
UI and Postgres DB (which we use as the metadata backend). 
   
   Once the scheduler pod is restarted, the problem goes away, the metrics in 
Grafana dashboard reset and tasks start executing. 
   
   ### What you think should happen instead
   
   Airflow's scheduler is keeping a track of currently running tasks and their 
state in memory. And that state in some cases is not getting cleared. The tasks 
that have been completed should eventually be cleared from `running` set in 
`KubernetesExecutor` once the worker pod exits.
   
   ### How to reproduce
   
   Beats me. Our initial assumption was that that is a DAG implementation issue 
and some particular DAG is misbehaving. But this problem has occurred with all 
sorts of DAGs, happens for scheduled and manual runs, and is sporadic. Tt here 
is some edge scenario that causes this to happen. But we are unable to nail it 
down any further.
   
   ### Operating System
   
   Debian GNU/ Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   aiofiles==23.1.0
   aiohttp==3.8.4
   airflow-dbt>=0.4.0
   airflow-exporter==1.5.3
   anytree==2.8.0
   apache-airflow-providers-ftp==2.0.1
   apache-airflow-providers-http>=2.0.3
   apache-airflow-providers-microsoft-mssql==2.1.3
   apache-airflow-providers-snowflake>=4.0.4
   apache-airflow-providers-hashicorp==3.3.0
   apache-airflow-providers-cncf-kubernetes==5.2.2
   apache-airflow>=2.2.3
   asgiref==3.5.0
   Authlib==0.15.5
   dbt-snowflake==1.5.2
   flatdict==4.0.1
   hvac==0.11.2
   jsonschema>=4.17.3
   pandas==1.3.5
   psycopg2-binary==2.9.3
   pyOpenSSL==23.1.1
   pysftp==0.2.9
   pysmbclient==0.1.5
   python-gnupg==0.5.0
   PyYAML~=5.4.1
   requests~=2.26.0
   smbprotocol==1.9.0
   snowflake-connector-python== 3.0.4
   snowflake-sqlalchemy==1.4.7
   statsd==3.3.0
   py7zr==0.20.5
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Airflow is deployed via helm charts on EKS in AWS. There are two scheduler 
pods with `AIRFLOW__CORE__PARALLELISM` set to `10`.
   
   ### Anything else
   
   N/A
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to