NickYadance opened a new issue, #27000:
URL: https://github.com/apache/airflow/issues/27000

   ### Apache Airflow version
   
   Other Airflow 2 version
   
   ### What happened
   
   There is discussion #22553 about this but without detailed trace. There is 
also a similar issue #23639. Trigger will occasionly die due to DB transaction 
deadlock. In my case the trigger dies 5-6 times per day.
   
   Mysql engine status
   ```
   ------------------------
   LATEST DETECTED DEADLOCK
   ------------------------
   2022-10-11 08:33:52 139737395513088
   *** (1) TRANSACTION:
   TRANSACTION 5858164555, ACTIVE 0 sec fetching rows
   mysql tables in use 1, locked 1
   LOCK WAIT 143 lock struct(s), heap size 24696, 2 row lock(s)
   MySQL thread id 3080443983, OS thread handle 139736854193920, query id 
89316169955 10.244.3.94 airflow Searching rows for update
   UPDATE task_instance SET state='scheduled', trigger_id=NULL, 
next_method='__fail__', next_kwargs='{\"__var\": {\"error\": 
\"Trigger/execution timeout\"}, \"__type\": \"dict\"}' WHERE 
task_instance.state = 'deferred' AND task_instance.trigger_timeout < 
'2022-10-11 08:33:52.708635'
   
   *** (1) HOLDS THE LOCK(S):
   RECORD LOCKS space id 522 page no 750798 n bits 200 index ti_state of table 
`airflow`.`task_instance` trx id 5858164555 lock_mode X locks rec but not gap
   Record lock, heap no 43 PHYSICAL RECORD: n_fields 5; compact format; info 
bits 0
    0: len 8; hex 6465666572726564; asc deferred;;
    1: len 30; hex 
64642d72616e6b65722d64645f66656176345f735f725f6f6d6f655f6634; asc dagid; (total 
42 bytes);
    2: len 19; hex 66675f6d61726b65725f73656e736f725f504c; asc taskid;;
    3: len 30; hex 
7363686564756c65645f5f323032322d31302d31305432333a30303a3030; asc 
scheduled__2022-10-10T23:00:00; (total 36 bytes);
    4: len 4; hex 7fffffff; asc     ;;
   
   
   *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
   RECORD LOCKS space id 522 page no 676208 n bits 104 index PRIMARY of table 
`airflow`.`task_instance` trx id 5858164555 lock_mode X locks rec but not gap 
waiting
   Record lock, heap no 35 PHYSICAL RECORD: n_fields 29; compact format; info 
bits 0
    0: len 30; hex 
64642d72616e6b65722d64645f66656176345f735f725f6f6d6f655f6634; asc dagid; (total 
42 bytes);
    1: len 19; hex 66675f6d61726b65725f73656e736f725f504c; asc taskid;;
    2: len 30; hex 
7363686564756c65645f5f323032322d31302d31305432333a30303a3030; asc 
scheduled__2022-10-10T23:00:00; (total 36 bytes);
    3: len 4; hex 7fffffff; asc     ;;
    4: len 6; hex 00015d2c7f56; asc   ], V;;
    5: len 7; hex 010000024b1824; asc     K $;;
    6: len 7; hex 6344b21f0a3ee2; asc cD   > ;;
    7: SQL NULL;
    8: SQL NULL;
    9: len 9; hex 7363686564756c6564; asc scheduled;;
    10: len 4; hex 80000000; asc     ;;
    11: len 30; hex 
616972666c6f772d63656c6572792d776f726b65722d342e616972666c6f; asc 
airflow-celery-worker-4.airflo; (total 71 bytes);
    12: len 4; hex 726f6f74; asc root;;
    13: len 4; hex 80a4fa25; asc    %;;
    14: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
    15: len 13; hex 63656c6572792d776f726b6572; asc celery-worker;;
    16: len 4; hex 80000005; asc     ;;
    17: len 22; hex 446570656e64656e63794d61726b657253656e736f72; asc Sensor;;
    18: len 7; hex 6345293d071e5a; asc cE)=  Z;;
    19: len 4; hex 8001e7ff; asc     ;;
    20: len 4; hex 80000002; asc     ;;
    21: len 5; hex 80047d942e; asc   } .;;
    22: len 4; hex 80000001; asc     ;;
    23: len 4; hex 808bc44c; asc    L;;
    24: len 30; hex 
35633163383065342d383438632d343564312d383265652d356233633766; asc 
5c1c80e4-848c-45d1-82ee-5b3c7f; (total 36 bytes);
    25: SQL NULL;
    26: len 5; hex 99ae180020; asc      ;;
    27: len 18; hex 657865637574655f6f6e5f74726967676572; asc 
execute_on_trigger;;
    28: len 30; hex 
00020060001200050017000600001d000c5b005f5f7661725f5f74797065; asc    `          
   [ __var__type; (total 97 bytes);
   
   
   *** (2) TRANSACTION:
   TRANSACTION 5858164566, ACTIVE 0 sec updating or deleting
   mysql tables in use 1, locked 1
   LOCK WAIT 3 lock struct(s), heap size 1128, 2 row lock(s), undo log entries 1
   MySQL thread id 3080393170, OS thread handle 139729109509888, query id 
89316170062 10.244.3.92 airflow updating
   UPDATE task_instance SET state='scheduled', trigger_id=NULL, 
next_kwargs='{\"__var\": {\"event\": {\"__var\": 1665477231.072732, \"__type\": 
\"datetime\"}}, \"__type\": \"dict\"}' WHERE task_instance.task_id = 'taskid' 
AND task_instance.dag_id = 'dagidwp_long-dag1' AND task_instance.run_id = 
'scheduled__2022-10-10T23:00:00+00:00' AND task_instance.map_index = -1
   
   *** (2) HOLDS THE LOCK(S):
   RECORD LOCKS space id 522 page no 676208 n bits 104 index PRIMARY of table 
`airflow`.`task_instance` trx id 5858164566 lock_mode X locks rec but not gap
   Record lock, heap no 35 PHYSICAL RECORD: n_fields 29; compact format; info 
bits 0
    0: len 30; hex 
64642d72616e6b65722d64645f66656176345f735f725f6f6d6f655f6634; asc dagid; (total 
42 bytes);
    1: len 19; hex 66675f6d61726b65725f73656e736f725f504c; asc taskid;;
    2: len 30; hex 
7363686564756c65645f5f323032322d31302d31305432333a30303a3030; asc 
scheduled__2022-10-10T23:00:00; (total 36 bytes);
    3: len 4; hex 7fffffff; asc     ;;
    4: len 6; hex 00015d2c7f56; asc   ], V;;
    5: len 7; hex 010000024b1824; asc     K $;;
    6: len 7; hex 6344b21f0a3ee2; asc cD   > ;;
    7: SQL NULL;
    8: SQL NULL;
    9: len 9; hex 7363686564756c6564; asc scheduled;;
    10: len 4; hex 80000000; asc     ;;
    11: len 30; hex 
616972666c6f772d63656c6572792d776f726b65722d342e616972666c6f; asc 
airflow-celery-worker-4.airflo; (total 71 bytes);
    12: len 4; hex 726f6f74; asc root;;
    13: len 4; hex 80a4fa25; asc    %;;
    14: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
    15: len 13; hex 63656c6572792d776f726b6572; asc celery-worker;;
    16: len 4; hex 80000005; asc     ;;
    17: len 22; hex 446570656e64656e63794d61726b657253656e736f72; asc Sensor;;
    18: len 7; hex 6345293d071e5a; asc cE)=  Z;;
    19: len 4; hex 8001e7ff; asc     ;;
    20: len 4; hex 80000002; asc     ;;
    21: len 5; hex 80047d942e; asc   } .;;
    22: len 4; hex 80000001; asc     ;;
    23: len 4; hex 808bc44c; asc    L;;
    24: len 30; hex 
35633163383065342d383438632d343564312d383265652d356233633766; asc 
5c1c80e4-848c-45d1-82ee-5b3c7f; (total 36 bytes);
    25: SQL NULL;
    26: len 5; hex 99ae180020; asc      ;;
    27: len 18; hex 657865637574655f6f6e5f74726967676572; asc 
execute_on_trigger;;
    28: len 30; hex 
00020060001200050017000600001d000c5b005f5f7661725f5f74797065; asc    `          
   [ __var__type; (total 97 bytes);
   
   
   *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
   RECORD LOCKS space id 522 page no 750798 n bits 200 index ti_state of table 
`airflow`.`task_instance` trx id 5858164566 lock_mode X locks rec but not gap 
waiting
   Record lock, heap no 43 PHYSICAL RECORD: n_fields 5; compact format; info 
bits 0
    0: len 8; hex 6465666572726564; asc deferred;;
    1: len 30; hex 
64642d72616e6b65722d64645f66656176345f735f725f6f6d6f655f6634; asc dagid; (total 
42 bytes);
    2: len 19; hex 66675f6d61726b65725f73656e736f725f504c; asc taskid;;
    3: len 30; hex 
7363686564756c65645f5f323032322d31302d31305432333a30303a3030; asc 
scheduled__2022-10-10T23:00:00; (total 36 bytes);
    4: len 4; hex 7fffffff; asc     ;;
   
   *** WE ROLL BACK TRANSACTION (2)
   ```
   
   Trigger exit log
   ```
    File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", 
line 319, in _query
       db.query(q)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line 
254, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) 
(1213, 'Deadlock found when trying to get lock; try restarting transaction')
   [SQL: UPDATE task_instance SET state=%s, trigger_id=%s, next_kwargs=%s WHERE 
task_instance.task_id = %s AND task_instance.dag_id = %s AND 
task_instance.run_id = %s AND task_instance.map_index = %s]
   [parameters: (<TaskInstanceState.SCHEDULED: 'scheduled'>, None, '{"__var": 
{"event": {"__var": 1665477231.072732, "__type": "datetime"}}, "__type": 
"dict"}', 'taskid', 'dagid', 'scheduled__2022-10-10T23:00:00+00:00', -1)]
   (Background on this error at: http://sqlalche.me/e/14/e3q8)
   [2022-10-11 08:33:52,729] {triggerer_job.py:111} INFO - Waiting for triggers 
to clean up
   [2022-10-11 08:33:53,569] {triggerer_job.py:117} INFO - Exited trigger loop
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1706, in _execute_context
       cursor, statement, parameters, context
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 716, in do_execute
       cursor.execute(statement, parameters)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line 
254, in query
       _mysql.connection.query(self, query)
   MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to 
get lock; try restarting transaction')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/usr/share/miniconda2/envs/airflow/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 
38, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", 
line 51, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 
99, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/triggerer_command.py",
 line 68, in triggerer
       job.run()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py", 
line 244, in run
       self._execute()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job.py",
 line 106, in _execute
       self._run_trigger_loop()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job.py",
 line 131, in _run_trigger_loop
       self.handle_events()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job.py",
 line 160, in handle_events
       Trigger.submit_event(trigger_id=trigger_id, event=event)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 71, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/miniconda2/envs/airflow/lib/python3.7/contextlib.py", line 119, 
in __exit__
       next(self.gen)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 33, in create_session
       session.commit()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 1423, in commit
       self._transaction.commit(_to_root=self.future)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 829, in commit
       self._prepare_impl()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 808, in _prepare_impl
       self.session.flush()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3255, in flush
       self._flush(objects)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3395, in _flush
       transaction.rollback(_capture_exception=True)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py",
 line 72, in __exit__
       with_traceback=exc_tb,
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 3355, in _flush
       flush_context.execute()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
 line 453, in execute
       rec.execute(self)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
       uow,
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
 line 239, in save_obj
       update,
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
 line 999, in _emit_update_statements
       statement, multiparams, execution_options=execution_options
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1520, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", 
line 314, in _execute_on_connection
       self, multiparams, params, execution_options
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1399, in _execute_clauseelement
       cache_hit=cache_hit,
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1749, in _execute_context
       e, statement, parameters, cursor, context
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1930, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1706, in _execute_context
       cursor, statement, parameters, context
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 716, in do_execute
       cursor.execute(statement, parameters)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line 
254, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) 
(1213, 'Deadlock found when trying to get lock; try restarting transaction')
   [SQL: UPDATE task_instance SET state=%s, trigger_id=%s, next_kwargs=%s WHERE 
task_instance.task_id = %s AND task_instance.dag_id = %s AND 
task_instance.run_id = %s AND task_instance.map_index = %s]
   [parameters: (<TaskInstanceState.SCHEDULED: 'scheduled'>, None, '{"__var": 
{"event": {"__var": 1665477231.072732, "__type": "datetime"}}, "__type": 
"dict"}', 'taskid', 'dagid', 'scheduled__2022-10-10T23:00:00+00:00', -1)]
   (Background on this error at: http://sqlalche.me/e/14/e3q8)
   ```
   
   There two queries are causing the deadlock. This query holds row lock in 
primary index (`dag_id`,`task_id`,`run_id`,`map_index`), waiting for secondary 
index lock.
   
https://github.com/apache/airflow/blob/0d78ba560dec2e7ea2670744800864906622a4a4/airflow/models/trigger.py#L118-L135
   
   This query holds row lock in secondary index as engine status telled 
(`state`), waiting for primary index lock, causing the deadlock.
   
https://github.com/apache/airflow/blob/0d78ba560dec2e7ea2670744800864906622a4a4/airflow/jobs/scheduler_job.py#L1461-L1484
   
   #22553 and #23639 offer different solutions towards this.
   1. add `with_row_lock` to queries so selected rows will be pre-locked, 
without lock contention.
   2. add retry.
   
   As for retry,  there is already retry in previous methods.
   
https://github.com/apache/airflow/blob/0d78ba560dec2e7ea2670744800864906622a4a4/airflow/models/trigger.py#L94-L116
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   ubuntu
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to