humit0 opened a new issue, #23639:
URL: https://github.com/apache/airflow/issues/23639
### Apache Airflow version
2.2.5
### What happened
When create many Deferrable operator (eg. `TimeDeltaSensorAsync`), triggerer
component died because of DB Deadlock issue.
```
[2022-05-11 02:45:08,420] {triggerer_job.py:358} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00>
(ID 5397) starting
[2022-05-11 02:45:08,421] {triggerer_job.py:358} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00>
(ID 5398) starting
[2022-05-11 02:45:09,459] {triggerer_job.py:358} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00>
(ID 5400) starting
[2022-05-11 02:45:09,461] {triggerer_job.py:358} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00>
(ID 5399) starting
[2022-05-11 02:45:10,503] {triggerer_job.py:358} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00>
(ID 5401) starting
[2022-05-11 02:45:10,504] {triggerer_job.py:358} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00>
(ID 5402) starting
[2022-05-11 02:45:11,113] {triggerer_job.py:108} ERROR - Exception when
executing TriggererJob._run_trigger_loop
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1276, in _execute_context
self.dialect.do_execute(
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line
608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
206, in execute
res = self._query(query)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
319, in _query
db.query(q)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line
254, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to
get lock; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line
106, in _execute
self._run_trigger_loop()
File
"/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line
127, in _run_trigger_loop
Trigger.clean_unused()
File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py",
line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/models/trigger.py",
line 91, in clean_unused
session.query(TaskInstance).filter(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py",
line 4063, in update
update_op.exec_()
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line
1697, in exec_
self._do_exec()
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line
1895, in _do_exec
self._execute_stmt(update_stmt)
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line
1702, in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py",
line 3568, in _execute_crud
return conn.execute(stmt, self._params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py",
line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1316, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1510, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1276, in _execute_context
self.dialect.do_execute(
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line
608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
206, in execute
res = self._query(query)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
319, in _query
db.query(q)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line
254, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError)
(1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET trigger_id=%s WHERE task_instance.state != %s
AND task_instance.trigger_id IS NOT NULL]
[parameters: (None, <TaskInstanceState.DEFERRED: 'deferred'>)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2022-05-11 02:45:11,118] {triggerer_job.py:111} INFO - Waiting for triggers
to clean up
[2022-05-11 02:45:11,592] {triggerer_job.py:117} INFO - Exited trigger loop
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1276, in _execute_context
self.dialect.do_execute(
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line
608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
206, in execute
res = self._query(query)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
319, in _query
db.query(q)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line
254, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to
get lock; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/site-packages/airflow/__main__.py", line
48, in main
args.func(args)
File "/usr/local/lib/python3.8/site-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/utils/cli.py", line
92, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.8/site-packages/airflow/cli/commands/triggerer_command.py",
line 56, in triggerer
job.run()
File "/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py",
line 246, in run
self._execute()
File
"/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line
106, in _execute
self._run_trigger_loop()
File
"/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line
127, in _run_trigger_loop
Trigger.clean_unused()
File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py",
line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/models/trigger.py",
line 91, in clean_unused
session.query(TaskInstance).filter(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py",
line 4063, in update
update_op.exec_()
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line
1697, in exec_
self._do_exec()
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line
1895, in _do_exec
self._execute_stmt(update_stmt)
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line
1702, in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py",
line 3568, in _execute_crud
return conn.execute(stmt, self._params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py",
line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1316, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1510, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1276, in _execute_context
self.dialect.do_execute(
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line
608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
206, in execute
res = self._query(query)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
319, in _query
db.query(q)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line
254, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError)
(1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET trigger_id=%s WHERE task_instance.state != %s
AND task_instance.trigger_id IS NOT NULL]
[parameters: (None, <TaskInstanceState.DEFERRED: 'deferred'>)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
```
### What you think should happen instead
Triggerer processor does not raise Deadlock error.
### How to reproduce
Create "test_timedelta" DAG and run it.
```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.time_delta import TimeDeltaSensorAsync
default_args = {
"owner": "user",
"start_date": datetime(2021, 2, 8),
"retries": 2,
"retry_delay": timedelta(minutes=20),
"depends_on_past": False,
}
with DAG(
dag_id="test_timedelta",
default_args=default_args,
schedule_interval="10 11 * * *",
max_active_runs=1,
max_active_tasks=2,
catchup=False,
) as dag:
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")
for idx in range(800):
tx = TimeDeltaSensorAsync(
task_id=f"sleep_{idx}",
delta=timedelta(days=3),
)
start >> tx >> end
```
### Operating System
uname_result(system='Linux', node='d2845d6331fd',
release='5.10.104-linuxkit', version='#1 SMP Thu Mar 17 17:08:06 UTC 2022',
machine='x86_64', processor='')
### Versions of Apache Airflow Providers
apache-airflow-providers-apache-druid | 2.3.3
apache-airflow-providers-apache-hive | 2.3.2
apache-airflow-providers-apache-spark | 2.1.3
apache-airflow-providers-celery | 2.1.3
apache-airflow-providers-ftp | 2.1.2
apache-airflow-providers-http | 2.1.2
apache-airflow-providers-imap | 2.2.3
apache-airflow-providers-jdbc | 2.1.3
apache-airflow-providers-mysql | 2.2.3
apache-airflow-providers-postgres | 4.1.0
apache-airflow-providers-redis | 2.0.4
apache-airflow-providers-sqlite | 2.1.3
apache-airflow-providers-ssh | 2.4.3
### Deployment
Other Docker-based deployment
### Deployment details
webserver: 1 instance
scheduler: 1 instance
worker: 1 instance (Celery)
triggerer: 1 instance
redis: 1 instance
Database: 1 instance (mysql)
### Anything else
webserver: 172.19.0.9
scheduler: 172.19.0.7
triggerer: 172.19.0.5
worker: 172.19.0.8
MYSQL (`SHOW ENGINE INNODB STATUS;`)
```
------------------------
LATEST DETECTED DEADLOCK
------------------------
2022-05-11 07:47:49 139953955817216
*** (1) TRANSACTION:
TRANSACTION 544772, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 7 lock struct(s), heap size 1128, 2 row lock(s)
MySQL thread id 20, OS thread handle 139953861383936, query id 228318
172.19.0.5 airflow_user updating
UPDATE task_instance SET trigger_id=NULL WHERE task_instance.state !=
'deferred' AND task_instance.trigger_id IS NOT NULL
*** (1) HOLDS THE LOCK(S):
RECORD LOCKS space id 125 page no 231 n bits 264 index ti_state of table
`airflow_db`.`task_instance` trx id 544772 lock_mode X locks rec but not gap
Record lock, heap no 180 PHYSICAL RECORD: n_fields 4; compact format; info
bits 0
0: len 6; hex 717565756564; asc queued;;
1: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
2: len 9; hex 736c6565705f323436; asc sleep_246;;
3: len 30; hex
7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc
scheduled__2022-05-09T11:10:00; (total 36 bytes);
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 125 page no 47 n bits 128 index PRIMARY of table
`airflow_db`.`task_instance` trx id 544772 lock_mode X locks rec but not gap
waiting
Record lock, heap no 55 PHYSICAL RECORD: n_fields 28; compact format; info
bits 0
0: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
1: len 9; hex 736c6565705f323436; asc sleep_246;;
2: len 30; hex
7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc
scheduled__2022-05-09T11:10:00; (total 36 bytes);
3: len 6; hex 000000085001; asc P ;;
4: len 7; hex 01000001411e2f; asc A /;;
5: len 7; hex 627b6a250b612d; asc b{j% a-;;
6: SQL NULL;
7: SQL NULL;
8: len 7; hex 72756e6e696e67; asc running;;
9: len 4; hex 80000001; asc ;;
10: len 12; hex 643238343564363333316664; asc d2845d6331fd;;
11: len 4; hex 726f6f74; asc root;;
12: len 4; hex 8000245e; asc $^;;
13: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
14: len 7; hex 64656661756c74; asc default;;
15: len 4; hex 80000002; asc ;;
16: len 20; hex 54696d6544656c746153656e736f724173796e63; asc
TimeDeltaSensorAsync;;
17: len 7; hex 627b6a240472e0; asc b{j$ r ;;
18: SQL NULL;
19: len 4; hex 80000002; asc ;;
20: len 5; hex 80057d942e; asc } .;;
21: len 4; hex 80000001; asc ;;
22: len 4; hex 800021c7; asc ! ;;
23: len 30; hex
36353061663737642d363762372d343166382d383439342d636637333061; asc
650af77d-67b7-41f8-8494-cf730a; (total 36 bytes);
24: SQL NULL;
25: SQL NULL;
26: SQL NULL;
27: len 2; hex 0400; asc ;;
*** (2) TRANSACTION:
TRANSACTION 544769, ACTIVE 0 sec updating or deleting
mysql tables in use 1, locked 1
LOCK WAIT 7 lock struct(s), heap size 1128, 4 row lock(s), undo log entries 2
MySQL thread id 12010, OS thread handle 139953323235072, query id 228319
172.19.0.8 airflow_user updating
UPDATE task_instance SET start_date='2022-05-11 07:47:49.745773',
state='running', try_number=1, hostname='d2845d6331fd', job_id=9310 WHERE
task_instance.task_id = 'sleep_246' AND task_instance.dag_id = 'test_timedelta'
AND task_instance.run_id = 'scheduled__2022-05-09T11:10:00+00:00'
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 125 page no 47 n bits 120 index PRIMARY of table
`airflow_db`.`task_instance` trx id 544769 lock_mode X locks rec but not gap
Record lock, heap no 55 PHYSICAL RECORD: n_fields 28; compact format; info
bits 0
0: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
1: len 9; hex 736c6565705f323436; asc sleep_246;;
2: len 30; hex
7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc
scheduled__2022-05-09T11:10:00; (total 36 bytes);
3: len 6; hex 000000085001; asc P ;;
4: len 7; hex 01000001411e2f; asc A /;;
5: len 7; hex 627b6a250b612d; asc b{j% a-;;
6: SQL NULL;
7: SQL NULL;
8: len 7; hex 72756e6e696e67; asc running;;
9: len 4; hex 80000001; asc ;;
10: len 12; hex 643238343564363333316664; asc d2845d6331fd;;
11: len 4; hex 726f6f74; asc root;;
12: len 4; hex 8000245e; asc $^;;
13: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
14: len 7; hex 64656661756c74; asc default;;
15: len 4; hex 80000002; asc ;;
16: len 20; hex 54696d6544656c746153656e736f724173796e63; asc
TimeDeltaSensorAsync;;
17: len 7; hex 627b6a240472e0; asc b{j$ r ;;
18: SQL NULL;
19: len 4; hex 80000002; asc ;;
20: len 5; hex 80057d942e; asc } .;;
21: len 4; hex 80000001; asc ;;
22: len 4; hex 800021c7; asc ! ;;
23: len 30; hex
36353061663737642d363762372d343166382d383439342d636637333061; asc
650af77d-67b7-41f8-8494-cf730a; (total 36 bytes);
24: SQL NULL;
25: SQL NULL;
26: SQL NULL;
27: len 2; hex 0400; asc ;;
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 125 page no 231 n bits 264 index ti_state of table
`airflow_db`.`task_instance` trx id 544769 lock_mode X locks rec but not gap
waiting
Record lock, heap no 180 PHYSICAL RECORD: n_fields 4; compact format; info
bits 0
0: len 6; hex 717565756564; asc queued;;
1: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
2: len 9; hex 736c6565705f323436; asc sleep_246;;
3: len 30; hex
7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc
scheduled__2022-05-09T11:10:00; (total 36 bytes);
*** WE ROLL BACK TRANSACTION (1)
```
Airflow env
```
AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow_user:airflow_pass@mysql/airflow_db
AIRFLOW__CORE__DEFAULT_TIMEZONE=KST
AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=KST
AIRFLOW_HOME=/home/deploy/airflow
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=30
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__WEBSERVER__SECRET_KEY=aoiuwernholo
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow_user:airflow_pass@mysql/airflow_db
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]