[
https://issues.apache.org/jira/browse/AIRFLOW-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17005196#comment-17005196
]
Zhidong commented on AIRFLOW-2516:
----------------------------------
Hi [~higrys],
The problem still exists after changing synchronize_session to 'fetch',
deadlock error logs are the same as before.
I debugged *QUERY ONE* by setting engine.echo to True, find that all executed
statements are as follows:
*QUERY THREE* (synchronize_session=False)
In []: session.query(TaskInstance).filter(TaskInstance.state ==
'failed').update(\{TaskInstance.state: 'unknown'}, synchronize_session=False)
INFO sqlalchemy.engine.base.Engine UPDATE task_instance SET state=%(state)s
WHERE task_instance.state = %(state_1)s
INFO sqlalchemy.engine.base.Engine \{u'state_1': 'failed', 'state': 'unknown'}
*QUERY FOUR* (synchronize_session='fetch')
In []: session.query(TaskInstance).filter(TaskInstance.state ==
'unknown').update(\{TaskInstance.state: 'failed'}, synchronize_session='fetch')
INFO sqlalchemy.engine.base.Engine SELECT task_instance.task_id AS
task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
task_instance.execution_date AS task_instance_execution_date
FROM task_instance
WHERE task_instance.state = %(state_1)s
INFO sqlalchemy.engine.base.Engine \{u'state_1': 'unknown'}
INFO sqlalchemy.engine.base.Engine UPDATE task_instance SET state=%(state)s
WHERE task_instance.state = %(state_1)s
INFO sqlalchemy.engine.base.Engine \{u'state_1': 'unknown', 'state': 'failed'}
`fetch` did perform a select, and return primary keys for matched rows, but
those primary keys are not used to update `task_instance.state` in database.
So, for *UPDATE task_instance SET ...* part of these two queries, `fetch` and
`False` did the same thing.
From
[sqlalchemy.orm.persistence|https://github.com/sqlalchemy/sqlalchemy/blob/4aa43ecbd78e5a7dd3d983ca46a377af4e01877e/lib/sqlalchemy/orm/persistence.py#L1875]
we can see that `fetch` and `False` use the same logic to update values to
database.
What `fetch` does extra is that, it fetchs all primary keys for matched rows
before update, and use those primary keys to expire attributes of matched
instances in local session when update is completed.
We may need to manually split *QUERY ONE* to two queries to resolve this
problem.
> Deadlock found when trying to update task_instance table
> --------------------------------------------------------
>
> Key: AIRFLOW-2516
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2516
> Project: Apache Airflow
> Issue Type: Bug
> Components: DagRun
> Affects Versions: 1.8.0
> Reporter: Jeff Liu
> Priority: Major
>
>
>
> {code:java}
> [2018-05-23 17:59:57,218] {base_task_runner.py:98} INFO - Subtask:
> [2018-05-23 17:59:57,217] {base_executor.py:49} INFO - Adding to queue:
> airflow run production_wipeout_wipe_manager.Carat Carat_20180227
> 2018-05-23T17:41:18.815809 --local -sd DAGS_FOLDER/wipeout/wipeout.py
> [2018-05-23 17:59:57,231] {base_task_runner.py:98} INFO - Subtask: Traceback
> (most recent call last):
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/bin/airflow", line 27, in <module>
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask:
> args.func(args)
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask:
> pool=args.pool,
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in
> wrapper
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: result =
> func(*args, **kwargs)
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1532, in
> _run_raw_task
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask:
> self.handle_failure(e, test_mode, context)
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1641, in
> handle_failure
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask:
> session.merge(self)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1920, in merge
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask:
> _resolve_conflict_map=_resolve_conflict_map)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1974, in _merge
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: merged =
> self.query(mapper.class_).get(key[1])
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 882,
> in get
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: ident,
> loading.load_on_pk_identity)
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 952,
> in _get_impl
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return
> db_load_fn(self, primary_key_identity)
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 247,
> in load_on_pk_i
> dentity
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return
> q.one()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2884,
> in one
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret =
> self.one_or_none()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2854,
> in one_or_none
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret =
> list(self)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2925,
> in __iter__
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: return
> self._execute_and_instances(context)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2946,
> in _execute_and_instances
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask:
> close_with_result=True)
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2955,
> in _get_bind_ar
> s
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: **kw
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2937,
> in _connection_f
> rom_session
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: conn =
> self.session.connection(**kw)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1035, in connection
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask:
> execution_options=execution_options)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line
> 1040, in _connection
> _for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: engine,
> execution_options)
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 388,
> in _connection_
> for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask:
> self._assert_active()
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: File
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 276,
> in _assert_acti
> ve
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: %
> self._rollback_exception
> [2018-05-23 17:59:57,244] {base_task_runner.py:98} INFO - Subtask:
> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been
> rolled back due to a previou
> s exception during flush. To begin a new transaction with this Session, first
> issue Session.rollback(). Original exception was:
> (_mysql_exceptions.OperationalError) (1
> 213, 'Deadlock found when trying to get lock; try restarting transaction')
> [SQL: u'UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s
> AND task_instance
> .dag_id = %s AND task_instance.execution_date = %s'] [parameters: (u'queued',
> 'Carat_20180227', 'production_wipeout_wipe_manager.Carat',
> datetime.datetime(2018, 5, 23,
> 17, 41, 18, 815809))] (Background on this error at:
> http://sqlalche.me/e/e3q8){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)