amagr opened a new issue, #29687:
URL: https://github.com/apache/airflow/issues/29687

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   **Airflow 2.4.2**
   We run into a problem, where HttpSensor has an error because of deadlock. We 
are running 3 different dags with 12 max_active_runs, that call api and check 
for response if it should reshedule it or go to next task. All these sensors 
have 1 minutes poke interval, so 36 of them are running at the same time. 
Sometimes (like once in 20 runs) we get following deadlock error:
   
   Task failed with exception Traceback (most recent call last): File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1803, in _execute_context cursor, statement, parameters, context File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 719, in do_execute cursor.execute(statement, parameters) File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 
206, in execute res = self._query(query) File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 
319, in _query db.query(q) File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line 
254, in query _mysql.connection.query(self, query) MySQLdb.OperationalError: 
(1213, 'Deadlock found when trying to get lock; try restarting transaction') 
The above exception was the direct cause of the following exception: Traceback 
(most recent call last): File 
"/home/airflow/.local/lib/python3.7/site-packages/airf
 low/models/taskinstance.py", line 1457, in _run_raw_task 
self._execute_task_with_callbacks(context, test_mode) File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1579, in _execute_task_with_callbacks 
RenderedTaskInstanceFields.write(rtif) File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 75, in wrapper return func(*args, session=session, **kwargs) File 
"/usr/local/lib/python3.7/contextlib.py", line 119, in __exit__ next(self.gen) 
File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 36, in create_session session.commit() File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 1428, in commit self._transaction.commit(_to_root=self.future) File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 829, in commit self._prepare_impl() File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
 line 808, in _prepare_impl self.session.flush() File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3345, in flush self._flush(objects) File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3485, in _flush transaction.rollback(_capture_exception=True) File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py",
 line 72, in __exit__ with_traceback=exc_tb, File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 207, in raise_ raise exception File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3445, in _flush flush_context.execute() File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute rec.execute(self) File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
 line 633, in execute uow, File 
"/home/airflow/.local/lib/python3.7/site-packages/sqla
 lchemy/orm/persistence.py", line 241, in save_obj update, File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
 line 1001, in _emit_update_statements statement, multiparams, 
execution_options=execution_options File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1614, in _execute_20 return meth(self, args_10style, kwargs_10style, 
execution_options) File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", 
line 326, in _execute_on_connection self, multiparams, params, 
execution_options File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1491, in _execute_clauseelement cache_hit=cache_hit, File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1846, in _execute_context e, statement, parameters, cursor, context File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 2027, in _handle_dbapi_excep
 tion sqlalchemy_exception, with_traceback=exc_info[2], from_=e File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 207, in raise_ raise exception File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1803, in _execute_context cursor, statement, parameters, context File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 719, in do_execute cursor.execute(statement, parameters) File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 
206, in execute res = self._query(query) File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 
319, in _query db.query(q) File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line 
254, in query _mysql.connection.query(self, query) 
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (1213, 'Deadlock 
found when trying to get lock; try restarting transaction') [SQL: UPDAT
 E rendered_task_instance_fields SET k8s_pod_yaml=%s WHERE 
rendered_task_instance_fields.dag_id = %s AND 
rendered_task_instance_fields.task_id = %s AND 
rendered_task_instance_fields.run_id = %s AND 
rendered_task_instance_fields.map_index = %s] [parameters: ('{"metadata": 
{"annotations": {"dag_id": "bidder-joiner", "task_id": "capitest", 
"try_number": "1", "run_id": "scheduled__2023-02-15T14:15:00+00:00"}, ... (511 
characters truncated) ... e": "AIRFLOW_IS_K8S_EXECUTOR_POD", "value": "True"}], 
"image": "artifactorymaster.outbrain.com:5005/datainfra/airflow:8cbd2a3d8c", 
"name": "base"}]}}', 'bidder-joiner', 'capitest', 
'scheduled__2023-02-15T14:15:00+00:00', -1)] (Background on this error at: 
https://sqlalche.me/e/14/e3q8)
   --
   
   
   <br class="Apple-interchange-newline">
   
   
   
   Failed to execute job 3966 for task capitest ((MySQLdb.OperationalError) 
(1213, 'Deadlock found when trying to get lock; try restarting transaction') 
[SQL: UPDATE rendered_task_instance_fields SET k8s_pod_yaml=%s WHERE 
rendered_task_instance_fields.dag_id = %s AND 
rendered_task_instance_fields.task_id = %s AND 
rendered_task_instance_fields.run_id = %s AND 
rendered_task_instance_fields.map_index = %s] [parameters: ('{"metadata": 
{"annotations": {"dag_id": "bidder-joiner", "task_id": "capitest", 
"try_number": "1", "run_id": "scheduled__2023-02-15T14:15:00+00:00"}, ... (511 
characters truncated) ... e": "AIRFLOW_IS_K8S_EXECUTOR_POD", "value": "True"}], 
"image": "artifactorymaster.outbrain.com:5005/datainfra/airflow:8cbd2a3d8c", 
"name": "base"}]}}', 'bidder-joiner', 'capitest', 
'scheduled__2023-02-15T14:15:00+00:00', -1)] (Background on this error at: 
https://sqlalche.me/e/14/e3q8); 68)
   --
   
   
   <br class="Apple-interchange-newline">
   
   I checked MySql logs and deadlock is caused by query: 
   DELETE FROM rendered_task_instance_fields WHERE 
rendered_task_instance_fields.dag_id = 'bidder-joiner-raw_data_2nd_pass_delay' 
AND rendered_task_instance_fields.task_id = 'is_data_ready' AND 
((rendered_task_instance_fields.dag_id, rendered_task_instance_fields.task_id, 
rendered_task_instance_fields.run_id) NOT IN (SELECT subq2.dag_id, 
subq2.task_id, subq2.run_id
   FROM (SELECT subq1.dag_id AS dag_id, subq1.task_id AS task_id, subq1.run_id 
AS run_id
   FROM (SELECT DISTINCT rendered_task_instance_fields.dag_id AS dag_id, 
rendered_task_instance_fields.task_id AS task_id, 
rendered_task_instance_fields.run_id AS run_id, dag_run.execution_date AS 
execution_date
   FROM rendered_task_instance_fields INNER JOIN dag_run ON 
rendered_task_instance_fields.dag_id = dag_run.dag_id AND 
rendered_task_instance_fields.run_id = dag_run.run_id
   WHERE rendered_task_instance_fields.dag_id = 'bidder-joiner-raw_data
   
   ### What you think should happen instead
   
   I found similar issue open on github 
(https://github.com/apache/airflow/issues/25765) so I think it should be 
resolved in the same way - adding @retry_db_transaction annotation to function 
that is executing this query
   
   ### How to reproduce
   
   Create 3 dags with 12 max_active_runs that use HttpSensor at the same time, 
same poke interval and mode reschedule.
   
   ### Operating System
   
   Not available info right now
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-common-sql>=1.2.0
   mysql-connector-python>=8.0.11
   mysqlclient>=1.3.6
   apache-airflow-providers-mysql==3.2.1
   apache-airflow-providers-http==4.0.0
   apache-airflow-providers-slack==6.0.0
   apache-airflow-providers-apache-spark==3.0.0
   
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to