michaelmicheal opened a new issue, #28480:
URL: https://github.com/apache/airflow/issues/28480

   ### Apache Airflow version
   
   2.5.0
   
   ### What happened
   
   Airflow Version: 2.4.3
   
   In one of our instances, after upgrading to Airflow 2.4.3, we started seeing 
the 
[`purge_inactive_dag_warnings`](https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L598)
 query failing a subset of parsing loops, but consistently happening within 500 
parsing loops. 
   
   ```python
   self._deactivate_stale_dags()
   DagWarning.purge_inactive_dag_warnings()
   refreshed_dag_dir = self._refresh_dag_dir()
   ```
   This caused the DagFileProcessorManager to exit consistently before 
processing all DAG files.
   
   ```python
   Traceback (most recent call last):
     File "/usr/local/lib/python3.9/multiprocessing/process.py", line 315, in 
_bootstrap
       self.run()
     File "/usr/local/lib/python3.9/multiprocessing/process.py", line 108, in 
run
       self._target(*self._args, **self._kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", 
line 257, in _run_processor_manager
       processor_manager.start()
     File 
"/usr/local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", 
line 489, in start
       return self._run_parsing_loop()
     File 
"/usr/local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", 
line 609, in _run_parsing_loop
       DagWarning.purge_inactive_dag_warnings()
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 75, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/dagwarning.py", line 82, 
in purge_inactive_dag_warnings
       query.delete(synchronize_session=False)
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", 
line 3191, in delete
       result = self.session.execute(
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 1689, in execute
       result = conn._execute_20(statement, params or {}, execution_options)
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1614, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", 
line 325, in _execute_on_connection
       return connection._execute_clauseelement(
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1481, in _execute_clauseelement
       ret = self._execute_context(
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1845, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 2026, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 207, in raise_
       raise exception
     File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1802, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 
719, in do_execute
       cursor.execute(statement, parameters)
     File "/usr/local/lib/python3.9/site-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File "/usr/local/lib/python3.9/site-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File "/usr/local/lib/python3.9/site-packages/MySQLdb/connections.py", line 
254, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost 
connection to MySQL server during query')
   [SQL: DELETE FROM dag_warning USING dag_warning, dag WHERE 
dag_warning.dag_id = dag.dag_id AND dag.is_active = false]
   ```
   
   
   ### What you think should happen instead
   
   To resolve this, we wrapped the query in a try-except block to avoid the 
unhandled exception. I think we should 
   
   1. Wrap the `purge_inactive_dag_warnings` in a try-except block.
   2. Consider not calling the `purge_inactive_dag_warnings` method every 
parsing loop. Maybe we could do it when we've parsed all the DAG files.
   
   ### How to reproduce
   
   Hard to reproduce, but will happen if the `purge_inactive_dag_warnings` 
query throws an exception
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   Other 3rd-party Helm chart
   
   ### Anything else
   
   Airflow 2.4.3 on Kubernetes
   MySQL Version: 8.0.18
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to