[
https://issues.apache.org/jira/browse/AIRFLOW-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014594#comment-17014594
]
ASF subversion and git services commented on AIRFLOW-2516:
----------------------------------------------------------
Commit 1a52182ea0fabb5c941e5e8c990db71097eb87a4 in airflow's branch
refs/heads/master from Jarek Potiuk
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=1a52182 ]
[AIRFLOW-2516] Fix mysql deadlocks (#6988)
Deadlocks were occuring in mysql when task_instance was modified
by two queries at the same time. One query used state as selection
criteria and updated it in the same query where second query just
updated the state for the same table. The first query locked state
index first and primary index afterwards, the second query locked
primary index first and state afterwards - leading to deadlocks.
This change splits the first query into two independent ones.
First query makes select FOR UPDATE and selects all the task
instances to act on (this will lock primary index only)
and second updates all affected task instances.
Note that performance impact for that is neglectable because this
query is only run once every scheduler loop and the second part
of it (looping through task instances) will only happen in case
there are some manually modified DagRun states - so it is only
run to correct some wrong states of DagRun. This should happen
very infrequently.
> Deadlock found when trying to update task_instance table
> --------------------------------------------------------
>
> Key: AIRFLOW-2516
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2516
> Project: Apache Airflow
> Issue Type: Bug
> Components: DagRun
> Affects Versions: 1.8.0, 1.9.0, 1.10.0, 1.10.1, 1.10.2, 1.10.3, 1.10.4,
> 1.10.5, 1.10.6, 1.10.7
> Reporter: Jeff Liu
> Assignee: Jarek Potiuk
> Priority: Major
> Fix For: 1.10.8
>
> Attachments: Screenshot 2019-12-30 at 10.42.52.png,
> image-2019-12-30-10-48-41-313.png, image-2019-12-30-10-58-02-610.png,
> jobs.py, jobs_fixed_deadlock_possibly_1.9.py,
> jobs_fixed_deadlock_possibly_1.9.py,
> scheduler_job_fixed_deadlock_possibly_1.10.6.py
>
>
>
>
> {code:java}
> [2018-05-23 17:59:57,218] {base_task_runner.py:98} INFO - Subtask:
> [2018-05-23 17:59:57,217] {base_executor.py:49} INFO - Adding to queue:
> airflow run production_wipeout_wipe_manager.Carat Carat_20180227
> 2018-05-23T17:41:18.815809 --local -sd DAGS_FOLDER/wipeout/wipeout.py
> [2018-05-23 17:59:57,231] {base_task_runner.py:98} INFO - Subtask: Traceback
> (most recent call last):
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/bin/airflow", line 27, in <module>
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask:
> args.func(args)
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask:
> pool=args.pool,
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in
> wrapper
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: result =
> func(*args, **kwargs)
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1532, in
> _run_raw_task
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask:
> self.handle_failure(e, test_mode, context)
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1641, in
> handle_failure
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask:
> session.merge(self)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1920, in merge
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask:
> _resolve_conflict_map=_resolve_conflict_map)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1974, in _merge
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: merged =
> self.query(mapper.class_).get(key[1])
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 882,
> in get
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: ident,
> loading.load_on_pk_identity)
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 952,
> in _get_impl
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return
> db_load_fn(self, primary_key_identity)
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 247,
> in load_on_pk_i
> dentity
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return
> q.one()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2884,
> in one
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret =
> self.one_or_none()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2854,
> in one_or_none
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret =
> list(self)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2925,
> in __iter__
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: return
> self._execute_and_instances(context)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2946,
> in _execute_and_instances
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask:
> close_with_result=True)
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2955,
> in _get_bind_ar
> s
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: **kw
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2937,
> in _connection_f
> rom_session
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: conn =
> self.session.connection(**kw)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1035, in connection
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask:
> execution_options=execution_options)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1040, in _connection
> _for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: engine,
> execution_options)
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 388,
> in _connection_
> for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask:
> self._assert_active()
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 276,
> in _assert_acti
> ve
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: %
> self._rollback_exception
> [2018-05-23 17:59:57,244] {base_task_runner.py:98} INFO - Subtask:
> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been
> rolled back due to a previou
> s exception during flush. To begin a new transaction with this Session, first
> issue Session.rollback(). Original exception was:
> (_mysql_exceptions.OperationalError) (1
> 213, 'Deadlock found when trying to get lock; try restarting transaction')
> [SQL: u'UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s
> AND task_instance
> .dag_id = %s AND task_instance.execution_date = %s'] [parameters: (u'queued',
> 'Carat_20180227', 'production_wipeout_wipe_manager.Carat',
> datetime.datetime(2018, 5, 23,
> 17, 41, 18, 815809))] (Background on this error at:
> http://sqlalche.me/e/e3q8){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)