amagr opened a new issue, #29687:
URL: https://github.com/apache/airflow/issues/29687
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
**Airflow 2.4.2**
We run into a problem, where HttpSensor has an error because of deadlock. We
are running 3 different dags with 12 max_active_runs, that call api and check
for response if it should reshedule it or go to next task. All these sensors
have 1 minutes poke interval, so 36 of them are running at the same time.
Sometimes (like once in 20 runs) we get following deadlock error:
Task failed with exception Traceback (most recent call last): File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1803, in _execute_context cursor, statement, parameters, context File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
line 719, in do_execute cursor.execute(statement, parameters) File
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line
206, in execute res = self._query(query) File
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line
319, in _query db.query(q) File
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line
254, in query _mysql.connection.query(self, query) MySQLdb.OperationalError:
(1213, 'Deadlock found when trying to get lock; try restarting transaction')
The above exception was the direct cause of the following exception: Traceback
(most recent call last): File
"/home/airflow/.local/lib/python3.7/site-packages/airf
low/models/taskinstance.py", line 1457, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode) File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 1579, in _execute_task_with_callbacks
RenderedTaskInstanceFields.write(rtif) File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py",
line 75, in wrapper return func(*args, session=session, **kwargs) File
"/usr/local/lib/python3.7/contextlib.py", line 119, in __exit__ next(self.gen)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py",
line 36, in create_session session.commit() File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 1428, in commit self._transaction.commit(_to_root=self.future) File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 829, in commit self._prepare_impl() File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 808, in _prepare_impl self.session.flush() File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 3345, in flush self._flush(objects) File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 3485, in _flush transaction.rollback(_capture_exception=True) File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py",
line 72, in __exit__ with_traceback=exc_tb, File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_ raise exception File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 3445, in _flush flush_context.execute() File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 456, in execute rec.execute(self) File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 633, in execute uow, File
"/home/airflow/.local/lib/python3.7/site-packages/sqla
lchemy/orm/persistence.py", line 241, in save_obj update, File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
line 1001, in _emit_update_statements statement, multiparams,
execution_options=execution_options File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1614, in _execute_20 return meth(self, args_10style, kwargs_10style,
execution_options) File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py",
line 326, in _execute_on_connection self, multiparams, params,
execution_options File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1491, in _execute_clauseelement cache_hit=cache_hit, File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1846, in _execute_context e, statement, parameters, cursor, context File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 2027, in _handle_dbapi_excep
tion sqlalchemy_exception, with_traceback=exc_info[2], from_=e File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_ raise exception File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1803, in _execute_context cursor, statement, parameters, context File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
line 719, in do_execute cursor.execute(statement, parameters) File
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line
206, in execute res = self._query(query) File
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line
319, in _query db.query(q) File
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line
254, in query _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (1213, 'Deadlock
found when trying to get lock; try restarting transaction') [SQL: UPDAT
E rendered_task_instance_fields SET k8s_pod_yaml=%s WHERE
rendered_task_instance_fields.dag_id = %s AND
rendered_task_instance_fields.task_id = %s AND
rendered_task_instance_fields.run_id = %s AND
rendered_task_instance_fields.map_index = %s] [parameters: ('{"metadata":
{"annotations": {"dag_id": "bidder-joiner", "task_id": "capitest",
"try_number": "1", "run_id": "scheduled__2023-02-15T14:15:00+00:00"}, ... (511
characters truncated) ... e": "AIRFLOW_IS_K8S_EXECUTOR_POD", "value": "True"}],
"image": "artifactorymaster.outbrain.com:5005/datainfra/airflow:8cbd2a3d8c",
"name": "base"}]}}', 'bidder-joiner', 'capitest',
'scheduled__2023-02-15T14:15:00+00:00', -1)] (Background on this error at:
https://sqlalche.me/e/14/e3q8)
--
<br class="Apple-interchange-newline">
Failed to execute job 3966 for task capitest ((MySQLdb.OperationalError)
(1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE rendered_task_instance_fields SET k8s_pod_yaml=%s WHERE
rendered_task_instance_fields.dag_id = %s AND
rendered_task_instance_fields.task_id = %s AND
rendered_task_instance_fields.run_id = %s AND
rendered_task_instance_fields.map_index = %s] [parameters: ('{"metadata":
{"annotations": {"dag_id": "bidder-joiner", "task_id": "capitest",
"try_number": "1", "run_id": "scheduled__2023-02-15T14:15:00+00:00"}, ... (511
characters truncated) ... e": "AIRFLOW_IS_K8S_EXECUTOR_POD", "value": "True"}],
"image": "artifactorymaster.outbrain.com:5005/datainfra/airflow:8cbd2a3d8c",
"name": "base"}]}}', 'bidder-joiner', 'capitest',
'scheduled__2023-02-15T14:15:00+00:00', -1)] (Background on this error at:
https://sqlalche.me/e/14/e3q8); 68)
--
<br class="Apple-interchange-newline">
I checked MySql logs and deadlock is caused by query:
DELETE FROM rendered_task_instance_fields WHERE
rendered_task_instance_fields.dag_id = 'bidder-joiner-raw_data_2nd_pass_delay'
AND rendered_task_instance_fields.task_id = 'is_data_ready' AND
((rendered_task_instance_fields.dag_id, rendered_task_instance_fields.task_id,
rendered_task_instance_fields.run_id) NOT IN (SELECT subq2.dag_id,
subq2.task_id, subq2.run_id
FROM (SELECT subq1.dag_id AS dag_id, subq1.task_id AS task_id, subq1.run_id
AS run_id
FROM (SELECT DISTINCT rendered_task_instance_fields.dag_id AS dag_id,
rendered_task_instance_fields.task_id AS task_id,
rendered_task_instance_fields.run_id AS run_id, dag_run.execution_date AS
execution_date
FROM rendered_task_instance_fields INNER JOIN dag_run ON
rendered_task_instance_fields.dag_id = dag_run.dag_id AND
rendered_task_instance_fields.run_id = dag_run.run_id
WHERE rendered_task_instance_fields.dag_id = 'bidder-joiner-raw_data
### What you think should happen instead
I found similar issue open on github
(https://github.com/apache/airflow/issues/25765) so I think it should be
resolved in the same way - adding @retry_db_transaction annotation to function
that is executing this query
### How to reproduce
Create 3 dags with 12 max_active_runs that use HttpSensor at the same time,
same poke interval and mode reschedule.
### Operating System
Not available info right now
### Versions of Apache Airflow Providers
apache-airflow-providers-common-sql>=1.2.0
mysql-connector-python>=8.0.11
mysqlclient>=1.3.6
apache-airflow-providers-mysql==3.2.1
apache-airflow-providers-http==4.0.0
apache-airflow-providers-slack==6.0.0
apache-airflow-providers-apache-spark==3.0.0
### Deployment
Docker-Compose
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]