[
https://issues.apache.org/jira/browse/AIRFLOW-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16998737#comment-16998737
]
Zhidong commented on AIRFLOW-2516:
----------------------------------
Hi [~higrys], the problem occurs at a very high frequency, especially when
task_instance table is too large.
The deadlock happens when both worker and scheduler try to update task_instance
at the same time. MySQL rollback one of the transactions, so both worker and
scheduler could suffer from this problem.
Workarounds:
# try/except the deadlock exception in application code, and retry immediately
after.
# drop `ti_state` index to avoid race conditions on the record locks of the
index.
Engine innodb status
------------------------
LATEST DETECTED DEADLOCK
------------------------
191216 8:52:09
*** (1) TRANSACTION:
TRANSACTION 4AB6056BF, ACTIVE 0 sec fetching rows
mysql tables in use 2, locked 2
LOCK WAIT 5 lock struct(s), heap size 1248, 5 row lock(s)
MySQL thread id 256467507, OS thread handle 0x7f973d815700, query id
14205100829 127.0.0.1 airflow Sending data
UPDATE task_instance, dag_run SET task_instance.state=NULL WHERE
task_instance.dag_id IN ('A_Dag', 'B_Dag') AND task_instance.state IN
('queued', 'scheduled') AND dag_run.dag_id = task_instance.dag_id AND
dag_run.execution_date = task_instance.execution_date AND dag_run.state !=
'running'
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 112745 n bits 128 index `PRIMARY` of table
`airflow`.`task_instance` trx id 4AB6056BF lock_mode X locks rec but not gap
waiting
*** (2) TRANSACTION:
TRANSACTION 4AB6056BC, ACTIVE 0 sec updating or deleting
mysql tables in use 1, locked 1
4 lock struct(s), heap size 1248, 2 row lock(s), undo log entries 2
MySQL thread id 256620571, OS thread handle 0x7f973d531700, query id
14205100833 192.168.1.1 airflow Updating
UPDATE task_instance SET start_date='2019-12-16 08:51:35.486074',
state='running', try_number=1, hostname='192.168.1.1', unixname='airflow',
job_id=119942, operator='PythonOperator', pid=60059 WHERE task_instance.task_id
= 'X_Task' AND task_instance.dag_id = 'X_Dag' AND task_instance.execution_date
= '2019-12-16 08:50:00'
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 0 page no 112745 n bits 128 index `PRIMARY` of table
`airflow`.`task_instance` trx id 4AB6056BC lock_mode X locks rec but not gap
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 112079 n bits 232 index `ti_state` of table
`airflow`.`task_instance` trx id 4AB6056BC lock_mode X locks rec but not gap
waiting
*** WE ROLL BACK TRANSACTION (1)
MySQL general.log
191216 8:52:09
256467507 Query SELECT 1
256620571 Connect [email protected] as anonymous on
airflow
256620571 Query set autocommit=0
256467507 Query UPDATE task_instance, dag_run SET
task_instance.state='failed' WHERE task_instance.dag_id IN ('A_Dag', 'B_Dag')
AND task_instance.state IN ('up_for_retry') AND dag_run.dag_id =
task_instance.dag_id AND dag_run.execution_date = task_instance.execution_date
AND dag_run.state != 'running'
256620571 Query SELECT 1
256620571 Query SELECT task_instance.try_number AS
task_instance_try_number, task_instance.task_id AS task_instance_task_id,
task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS
task_instance_execution_date, task_instance.start_date AS
task_instance_start_date, task_instance.end_date AS task_instance_end_date,
task_instance.duration AS task_instance_duration, task_instance.state AS
task_instance_state, task_instance.max_tries AS task_instance_max_tries,
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS
task_instance_unixname, task_instance.job_id AS task_instance_job_id,
task_instance.pool AS task_instance_pool, task_instance.queue AS
task_instance_queue, task_instance.priority_weight AS
task_instance_priority_weight, task_instance.operator AS
task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm,
task_instance.pid AS task_instance_pid FROM task_instance WHERE
task_instance.task_id = 'X_Task' AND task_instance.dag_id = 'X_Dag' AND
task_instance.execution_date = '2019-12-16 08:50:00'
256467507 Query commit
256467507 Query rollback
256467507 Query SELECT 1
256620571 Query INSERT INTO log (dttm, dag_id, task_id, event,
execution_date, owner, extra) VALUES ('2019-12-16 08:51:35.491292', 'X_Dag',
'X_Task', 'running', '2019-12-16 08:50:00', 'Airflow', NULL)
256467507 Query UPDATE task_instance, dag_run SET
task_instance.state=NULL WHERE task_instance.dag_id IN ('A_Dag', 'B_Dag') AND
task_instance.state IN ('queued', 'scheduled') AND dag_run.dag_id =
task_instance.dag_id AND dag_run.execution_date = task_instance.execution_date
AND dag_run.state != 'running'
256620571 Query UPDATE task_instance SET start_date='2019-12-16
08:51:35.486074', state='running', try_number=1, hostname='192.168.1.1',
unixname='airflow', job_id=119942, operator='PythonOperator', pid=60059 WHERE
task_instance.task_id = 'X_Task' AND task_instance.dag_id = 'X_Dag' AND
task_instance.execution_date = '2019-12-16 08:50:00'
256620571 Query commit
256620571 Query rollback
256620571 Quit
> Deadlock found when trying to update task_instance table
> --------------------------------------------------------
>
> Key: AIRFLOW-2516
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2516
> Project: Apache Airflow
> Issue Type: Bug
> Components: DagRun
> Affects Versions: 1.8.0
> Reporter: Jeff Liu
> Priority: Major
>
>
>
> {code:java}
> [2018-05-23 17:59:57,218] {base_task_runner.py:98} INFO - Subtask:
> [2018-05-23 17:59:57,217] {base_executor.py:49} INFO - Adding to queue:
> airflow run production_wipeout_wipe_manager.Carat Carat_20180227
> 2018-05-23T17:41:18.815809 --local -sd DAGS_FOLDER/wipeout/wipeout.py
> [2018-05-23 17:59:57,231] {base_task_runner.py:98} INFO - Subtask: Traceback
> (most recent call last):
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/bin/airflow", line 27, in <module>
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask:
> args.func(args)
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask:
> pool=args.pool,
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in
> wrapper
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: result =
> func(*args, **kwargs)
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1532, in
> _run_raw_task
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask:
> self.handle_failure(e, test_mode, context)
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1641, in
> handle_failure
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask:
> session.merge(self)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1920, in merge
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask:
> _resolve_conflict_map=_resolve_conflict_map)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1974, in _merge
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: merged =
> self.query(mapper.class_).get(key[1])
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 882,
> in get
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: ident,
> loading.load_on_pk_identity)
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 952,
> in _get_impl
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return
> db_load_fn(self, primary_key_identity)
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 247,
> in load_on_pk_i
> dentity
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return
> q.one()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2884,
> in one
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret =
> self.one_or_none()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2854,
> in one_or_none
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret =
> list(self)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2925,
> in __iter__
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: return
> self._execute_and_instances(context)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2946,
> in _execute_and_instances
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask:
> close_with_result=True)
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2955,
> in _get_bind_ar
> s
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: **kw
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2937,
> in _connection_f
> rom_session
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: conn =
> self.session.connection(**kw)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1035, in connection
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask:
> execution_options=execution_options)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1040, in _connection
> _for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: engine,
> execution_options)
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 388,
> in _connection_
> for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask:
> self._assert_active()
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 276,
> in _assert_acti
> ve
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: %
> self._rollback_exception
> [2018-05-23 17:59:57,244] {base_task_runner.py:98} INFO - Subtask:
> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been
> rolled back due to a previou
> s exception during flush. To begin a new transaction with this Session, first
> issue Session.rollback(). Original exception was:
> (_mysql_exceptions.OperationalError) (1
> 213, 'Deadlock found when trying to get lock; try restarting transaction')
> [SQL: u'UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s
> AND task_instance
> .dag_id = %s AND task_instance.execution_date = %s'] [parameters: (u'queued',
> 'Carat_20180227', 'production_wipeout_wipe_manager.Carat',
> datetime.datetime(2018, 5, 23,
> 17, 41, 18, 815809))] (Background on this error at:
> http://sqlalche.me/e/e3q8){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)