humit0 opened a new issue, #23639:
URL: https://github.com/apache/airflow/issues/23639

   ### Apache Airflow version
   
   2.2.5
   
   ### What happened
   
   When create many Deferrable operator (eg. `TimeDeltaSensorAsync`), triggerer 
component died because of DB Deadlock issue.
   
   ```
   [2022-05-11 02:45:08,420] {triggerer_job.py:358} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> 
(ID 5397) starting
   [2022-05-11 02:45:08,421] {triggerer_job.py:358} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> 
(ID 5398) starting
   [2022-05-11 02:45:09,459] {triggerer_job.py:358} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> 
(ID 5400) starting
   [2022-05-11 02:45:09,461] {triggerer_job.py:358} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> 
(ID 5399) starting
   [2022-05-11 02:45:10,503] {triggerer_job.py:358} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> 
(ID 5401) starting
   [2022-05-11 02:45:10,504] {triggerer_job.py:358} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2022-05-13T11:10:00+00:00> 
(ID 5402) starting
   [2022-05-11 02:45:11,113] {triggerer_job.py:108} ERROR - Exception when 
executing TriggererJob._run_trigger_loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1276, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
       cursor.execute(statement, parameters)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 
254, in query
       _mysql.connection.query(self, query)
   MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to 
get lock; try restarting transaction')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 
106, in _execute
       self._run_trigger_loop()
     File 
"/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 
127, in _run_trigger_loop
       Trigger.clean_unused()
     File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.8/site-packages/airflow/models/trigger.py", 
line 91, in clean_unused
       session.query(TaskInstance).filter(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", 
line 4063, in update
       update_op.exec_()
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 
1697, in exec_
       self._do_exec()
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 
1895, in _do_exec
       self._execute_stmt(update_stmt)
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 
1702, in _execute_stmt
       self.result = self.query._execute_crud(stmt, self.mapper)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", 
line 3568, in _execute_crud
       return conn.execute(stmt, self._params)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1011, in execute
       return meth(self, multiparams, params)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1124, in _execute_clauseelement
       ret = self._execute_context(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1316, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1510, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1276, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
       cursor.execute(statement, parameters)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 
254, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) 
(1213, 'Deadlock found when trying to get lock; try restarting transaction')
   [SQL: UPDATE task_instance SET trigger_id=%s WHERE task_instance.state != %s 
AND task_instance.trigger_id IS NOT NULL]
   [parameters: (None, <TaskInstanceState.DEFERRED: 'deferred'>)]
   (Background on this error at: http://sqlalche.me/e/13/e3q8)
   [2022-05-11 02:45:11,118] {triggerer_job.py:111} INFO - Waiting for triggers 
to clean up
   [2022-05-11 02:45:11,592] {triggerer_job.py:117} INFO - Exited trigger loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1276, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
       cursor.execute(statement, parameters)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 
254, in query
       _mysql.connection.query(self, query)
   MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to 
get lock; try restarting transaction')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.8/site-packages/airflow/__main__.py", line 
48, in main
       args.func(args)
     File "/usr/local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.8/site-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.8/site-packages/airflow/cli/commands/triggerer_command.py",
 line 56, in triggerer
       job.run()
     File "/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py", 
line 246, in run
       self._execute()
     File 
"/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 
106, in _execute
       self._run_trigger_loop()
     File 
"/usr/local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py", line 
127, in _run_trigger_loop
       Trigger.clean_unused()
     File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.8/site-packages/airflow/models/trigger.py", 
line 91, in clean_unused
       session.query(TaskInstance).filter(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", 
line 4063, in update
       update_op.exec_()
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 
1697, in exec_
       self._do_exec()
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 
1895, in _do_exec
       self._execute_stmt(update_stmt)
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 
1702, in _execute_stmt
       self.result = self.query._execute_crud(stmt, self.mapper)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", 
line 3568, in _execute_crud
       return conn.execute(stmt, self._params)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1011, in execute
       return meth(self, multiparams, params)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1124, in _execute_clauseelement
       ret = self._execute_context(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1316, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1510, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1276, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
       cursor.execute(statement, parameters)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", line 
254, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) 
(1213, 'Deadlock found when trying to get lock; try restarting transaction')
   [SQL: UPDATE task_instance SET trigger_id=%s WHERE task_instance.state != %s 
AND task_instance.trigger_id IS NOT NULL]
   [parameters: (None, <TaskInstanceState.DEFERRED: 'deferred'>)]
   (Background on this error at: http://sqlalche.me/e/13/e3q8)
   ```
   
   ### What you think should happen instead
   
   Triggerer processor does not raise Deadlock error.
   
   ### How to reproduce
   
   Create "test_timedelta" DAG and run it.
   
   ```python
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.operators.dummy import DummyOperator
   from airflow.sensors.time_delta import TimeDeltaSensorAsync
   
   default_args = {
       "owner": "user",
       "start_date": datetime(2021, 2, 8),
       "retries": 2,
       "retry_delay": timedelta(minutes=20),
       "depends_on_past": False,
   }
   
   with DAG(
       dag_id="test_timedelta",
       default_args=default_args,
       schedule_interval="10 11 * * *",
       max_active_runs=1,
       max_active_tasks=2,
       catchup=False,
   ) as dag:
       start =  DummyOperator(task_id="start")
       end = DummyOperator(task_id="end")
       for idx in range(800):
           tx = TimeDeltaSensorAsync(
               task_id=f"sleep_{idx}",
               delta=timedelta(days=3),
           )
           start >> tx >> end
   
   ```
   
   ### Operating System
   
   uname_result(system='Linux', node='d2845d6331fd', 
release='5.10.104-linuxkit', version='#1 SMP Thu Mar 17 17:08:06 UTC 2022', 
machine='x86_64', processor='')
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-druid | 2.3.3
   apache-airflow-providers-apache-hive  | 2.3.2
   apache-airflow-providers-apache-spark | 2.1.3
   apache-airflow-providers-celery       | 2.1.3
   apache-airflow-providers-ftp          | 2.1.2
   apache-airflow-providers-http         | 2.1.2
   apache-airflow-providers-imap         | 2.2.3
   apache-airflow-providers-jdbc         | 2.1.3
   apache-airflow-providers-mysql        | 2.2.3
   apache-airflow-providers-postgres     | 4.1.0
   apache-airflow-providers-redis        | 2.0.4
   apache-airflow-providers-sqlite       | 2.1.3
   apache-airflow-providers-ssh          | 2.4.3
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   webserver: 1 instance
   scheduler: 1 instance
   worker: 1 instance (Celery)
   triggerer: 1 instance
   redis: 1 instance
   Database: 1 instance (mysql)
   
   
   ### Anything else
   
   webserver: 172.19.0.9
   scheduler: 172.19.0.7
   triggerer: 172.19.0.5
   worker: 172.19.0.8
   
   MYSQL (`SHOW ENGINE INNODB STATUS;`)
   ```
   ------------------------
   LATEST DETECTED DEADLOCK
   ------------------------
   2022-05-11 07:47:49 139953955817216
   *** (1) TRANSACTION:
   TRANSACTION 544772, ACTIVE 0 sec starting index read
   mysql tables in use 1, locked 1
   LOCK WAIT 7 lock struct(s), heap size 1128, 2 row lock(s)
   MySQL thread id 20, OS thread handle 139953861383936, query id 228318 
172.19.0.5 airflow_user updating
   UPDATE task_instance SET trigger_id=NULL WHERE task_instance.state != 
'deferred' AND task_instance.trigger_id IS NOT NULL
   
   *** (1) HOLDS THE LOCK(S):
   RECORD LOCKS space id 125 page no 231 n bits 264 index ti_state of table 
`airflow_db`.`task_instance` trx id 544772 lock_mode X locks rec but not gap
   Record lock, heap no 180 PHYSICAL RECORD: n_fields 4; compact format; info 
bits 0
    0: len 6; hex 717565756564; asc queued;;
    1: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
    2: len 9; hex 736c6565705f323436; asc sleep_246;;
    3: len 30; hex 
7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc 
scheduled__2022-05-09T11:10:00; (total 36 bytes);
   
   
   *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
   RECORD LOCKS space id 125 page no 47 n bits 128 index PRIMARY of table 
`airflow_db`.`task_instance` trx id 544772 lock_mode X locks rec but not gap 
waiting
   Record lock, heap no 55 PHYSICAL RECORD: n_fields 28; compact format; info 
bits 0
    0: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
    1: len 9; hex 736c6565705f323436; asc sleep_246;;
    2: len 30; hex 
7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc 
scheduled__2022-05-09T11:10:00; (total 36 bytes);
    3: len 6; hex 000000085001; asc     P ;;
    4: len 7; hex 01000001411e2f; asc     A /;;
    5: len 7; hex 627b6a250b612d; asc b{j% a-;;
    6: SQL NULL;
    7: SQL NULL;
    8: len 7; hex 72756e6e696e67; asc running;;
    9: len 4; hex 80000001; asc     ;;
    10: len 12; hex 643238343564363333316664; asc d2845d6331fd;;
    11: len 4; hex 726f6f74; asc root;;
    12: len 4; hex 8000245e; asc   $^;;
    13: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
    14: len 7; hex 64656661756c74; asc default;;
    15: len 4; hex 80000002; asc     ;;
    16: len 20; hex 54696d6544656c746153656e736f724173796e63; asc 
TimeDeltaSensorAsync;;
    17: len 7; hex 627b6a240472e0; asc b{j$ r ;;
    18: SQL NULL;
    19: len 4; hex 80000002; asc     ;;
    20: len 5; hex 80057d942e; asc   } .;;
    21: len 4; hex 80000001; asc     ;;
    22: len 4; hex 800021c7; asc   ! ;;
    23: len 30; hex 
36353061663737642d363762372d343166382d383439342d636637333061; asc 
650af77d-67b7-41f8-8494-cf730a; (total 36 bytes);
    24: SQL NULL;
    25: SQL NULL;
    26: SQL NULL;
    27: len 2; hex 0400; asc   ;;
   
   
   *** (2) TRANSACTION:
   TRANSACTION 544769, ACTIVE 0 sec updating or deleting
   mysql tables in use 1, locked 1
   LOCK WAIT 7 lock struct(s), heap size 1128, 4 row lock(s), undo log entries 2
   MySQL thread id 12010, OS thread handle 139953323235072, query id 228319 
172.19.0.8 airflow_user updating
   UPDATE task_instance SET start_date='2022-05-11 07:47:49.745773', 
state='running', try_number=1, hostname='d2845d6331fd', job_id=9310 WHERE 
task_instance.task_id = 'sleep_246' AND task_instance.dag_id = 'test_timedelta' 
AND task_instance.run_id = 'scheduled__2022-05-09T11:10:00+00:00'
   
   *** (2) HOLDS THE LOCK(S):
   RECORD LOCKS space id 125 page no 47 n bits 120 index PRIMARY of table 
`airflow_db`.`task_instance` trx id 544769 lock_mode X locks rec but not gap
   Record lock, heap no 55 PHYSICAL RECORD: n_fields 28; compact format; info 
bits 0
    0: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
    1: len 9; hex 736c6565705f323436; asc sleep_246;;
    2: len 30; hex 
7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc 
scheduled__2022-05-09T11:10:00; (total 36 bytes);
    3: len 6; hex 000000085001; asc     P ;;
    4: len 7; hex 01000001411e2f; asc     A /;;
    5: len 7; hex 627b6a250b612d; asc b{j% a-;;
    6: SQL NULL;
    7: SQL NULL;
    8: len 7; hex 72756e6e696e67; asc running;;
    9: len 4; hex 80000001; asc     ;;
    10: len 12; hex 643238343564363333316664; asc d2845d6331fd;;
    11: len 4; hex 726f6f74; asc root;;
    12: len 4; hex 8000245e; asc   $^;;
    13: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
    14: len 7; hex 64656661756c74; asc default;;
    15: len 4; hex 80000002; asc     ;;
    16: len 20; hex 54696d6544656c746153656e736f724173796e63; asc 
TimeDeltaSensorAsync;;
    17: len 7; hex 627b6a240472e0; asc b{j$ r ;;
    18: SQL NULL;
    19: len 4; hex 80000002; asc     ;;
    20: len 5; hex 80057d942e; asc   } .;;
    21: len 4; hex 80000001; asc     ;;
    22: len 4; hex 800021c7; asc   ! ;;
    23: len 30; hex 
36353061663737642d363762372d343166382d383439342d636637333061; asc 
650af77d-67b7-41f8-8494-cf730a; (total 36 bytes);
    24: SQL NULL;
    25: SQL NULL;
    26: SQL NULL;
    27: len 2; hex 0400; asc   ;;
   
   
   *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
   RECORD LOCKS space id 125 page no 231 n bits 264 index ti_state of table 
`airflow_db`.`task_instance` trx id 544769 lock_mode X locks rec but not gap 
waiting
   Record lock, heap no 180 PHYSICAL RECORD: n_fields 4; compact format; info 
bits 0
    0: len 6; hex 717565756564; asc queued;;
    1: len 14; hex 746573745f74696d6564656c7461; asc test_timedelta;;
    2: len 9; hex 736c6565705f323436; asc sleep_246;;
    3: len 30; hex 
7363686564756c65645f5f323032322d30352d30395431313a31303a3030; asc 
scheduled__2022-05-09T11:10:00; (total 36 bytes);
   
   *** WE ROLL BACK TRANSACTION (1)
   ```
   
   Airflow env
   ```
   
AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow_user:airflow_pass@mysql/airflow_db
   AIRFLOW__CORE__DEFAULT_TIMEZONE=KST
   AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
   AIRFLOW__CORE__LOAD_EXAMPLES=False
   AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=KST
   AIRFLOW_HOME=/home/deploy/airflow
   AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=30
   AIRFLOW__CORE__EXECUTOR=CeleryExecutor
   AIRFLOW__WEBSERVER__SECRET_KEY=aoiuwernholo
   AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False
   
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow_user:airflow_pass@mysql/airflow_db
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to