[
https://issues.apache.org/jira/browse/AIRFLOW-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17003546#comment-17003546
]
Jarek Potiuk commented on AIRFLOW-2516:
---------------------------------------
Thanks for pointing to it [~toopt4] . I do not think this query has similar
problem.
In the scheduler_job the state is part of filter/selection criteria (
.filter(models.TaskInstance.state.in_(old_states))).
In the query below from models the selection criteria is based on primary index
only (dag_id, task_id, execution_date) which means that the primary_index will
be locked first most likely. It looks like the problem with the scheduler_job
query was that the query engined needed to lock state index first, then select
the rows that match the query, find all rows that match, and only after that it
locked the primary index (to find all rows to update).
I will do some EXPLAINS when I am back from Xmas and try to verify all the
above statements.
{code:java}
session.query(TaskInstance).filter(
TaskInstance.dag_id == dag_run.dag_id,
TaskInstance.execution_date == dag_run.execution_date,
TaskInstance.task_id.in_(task_ids)
).update({TaskInstance.state : State.SKIPPED,
TaskInstance.start_date: now,
TaskInstance.end_date: now},
synchronize_session=False)
{code}
> Deadlock found when trying to update task_instance table
> --------------------------------------------------------
>
> Key: AIRFLOW-2516
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2516
> Project: Apache Airflow
> Issue Type: Bug
> Components: DagRun
> Affects Versions: 1.8.0
> Reporter: Jeff Liu
> Priority: Major
>
>
>
> {code:java}
> [2018-05-23 17:59:57,218] {base_task_runner.py:98} INFO - Subtask:
> [2018-05-23 17:59:57,217] {base_executor.py:49} INFO - Adding to queue:
> airflow run production_wipeout_wipe_manager.Carat Carat_20180227
> 2018-05-23T17:41:18.815809 --local -sd DAGS_FOLDER/wipeout/wipeout.py
> [2018-05-23 17:59:57,231] {base_task_runner.py:98} INFO - Subtask: Traceback
> (most recent call last):
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/bin/airflow", line 27, in <module>
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask:
> args.func(args)
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask:
> pool=args.pool,
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in
> wrapper
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: result =
> func(*args, **kwargs)
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1532, in
> _run_raw_task
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask:
> self.handle_failure(e, test_mode, context)
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1641, in
> handle_failure
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask:
> session.merge(self)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1920, in merge
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask:
> _resolve_conflict_map=_resolve_conflict_map)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1974, in _merge
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: merged =
> self.query(mapper.class_).get(key[1])
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 882,
> in get
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: ident,
> loading.load_on_pk_identity)
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 952,
> in _get_impl
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return
> db_load_fn(self, primary_key_identity)
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 247,
> in load_on_pk_i
> dentity
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return
> q.one()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2884,
> in one
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret =
> self.one_or_none()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2854,
> in one_or_none
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret =
> list(self)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2925,
> in __iter__
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: return
> self._execute_and_instances(context)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2946,
> in _execute_and_instances
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask:
> close_with_result=True)
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2955,
> in _get_bind_ar
> s
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: **kw
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2937,
> in _connection_f
> rom_session
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: conn =
> self.session.connection(**kw)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1035, in connection
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask:
> execution_options=execution_options)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1040, in _connection
> _for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: engine,
> execution_options)
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 388,
> in _connection_
> for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask:
> self._assert_active()
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 276,
> in _assert_acti
> ve
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: %
> self._rollback_exception
> [2018-05-23 17:59:57,244] {base_task_runner.py:98} INFO - Subtask:
> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been
> rolled back due to a previou
> s exception during flush. To begin a new transaction with this Session, first
> issue Session.rollback(). Original exception was:
> (_mysql_exceptions.OperationalError) (1
> 213, 'Deadlock found when trying to get lock; try restarting transaction')
> [SQL: u'UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s
> AND task_instance
> .dag_id = %s AND task_instance.execution_date = %s'] [parameters: (u'queued',
> 'Carat_20180227', 'production_wipeout_wipe_manager.Carat',
> datetime.datetime(2018, 5, 23,
> 17, 41, 18, 815809))] (Background on this error at:
> http://sqlalche.me/e/e3q8){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)