[ 
https://issues.apache.org/jira/browse/AIRFLOW-738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15814291#comment-15814291
 ] 

ASF subversion and git services commented on AIRFLOW-738:
---------------------------------------------------------

Commit e18d67dec4774946a35f7c34953bdfd7138595bf in incubator-airflow's branch 
refs/heads/v1-8-test from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e18d67d ]

[AIRFLOW-738] Commit deleted xcom items before insert

A delete insert sequence within one transaction can lead
to a deadlocked transaction with Mariadb / MySQL.

The deletes, in case they affected no rows, all get a shared lock
(mode IX) on the end-of-table gap. Once the insert is executed,
the shared lock is still held by all threads,
and the insert intention waits for the release of this shared lock.

The solution is to not do the following in parallel:

1. Delete the rows you want to insert, when the rows aren't there.
2. Insert the rows

In this case the risk of not executing the delete and insert
is relatively low, as it was the users intention to run the
task. In case it fails in between the two transactions
the task can be tried.


> XCom: Deadlock found when trying to get lock; try restarting transaction
> ------------------------------------------------------------------------
>
>                 Key: AIRFLOW-738
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-738
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: Airflow 1.8
>            Reporter: Bolke de Bruin
>            Priority: Blocker
>
> When using the following dag:
> {code}
> from datetime import datetime, timedelta
> import logging
> import pprint
> import random
> # The DAG object; we'll need this to instantiate a DAG
> from airflow import DAG
> # Operators; we need this to operate!
> from airflow.operators.python_operator import PythonOperator
> start_time = datetime.now().replace(minute=0, second=0, microsecond=0)
> start_time += timedelta(hours=-1)  # timedelta(days=-2)
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': start_time,
>     'email': ['[email protected]'],
>     'email_on_failure': True,
>     'email_on_retry': True,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=1)
>     # 'queue': 'bash_queue',
>     # 'pool': 'backfill',
>     # 'priority_weight': 10,
>     # 'end_date': datetime(2016, 1, 1),
> }
> dag = DAG(
>     'xcom_test',
>     default_args=default_args,
>     schedule_interval='@once')
> def upload_activity_status(pgconn_id, **context):
>     upstream_task_ids = context['task'].upstream_task_ids
>     logging.info(
>         "Getting status from upstream task {}".format(upstream_task_ids))
>     status = context['ti'].xcom_pull(task_ids=upstream_task_ids)
>     logging.info("Xcom pull results:\n{}".format(pprint.pformat(status)))
>     logging.info("Upload to DB here")
> upload_ativity_status = PythonOperator(
>     task_id='upload_activity_status',
>     python_callable=upload_activity_status,
>     op_kwargs={'pgconn_id': 'postgres_conn'},
>     provide_context=True,
>     dag=dag)
> def poll_data(params, execution_date, **context):
>     logging.info("Test polling function for {data_stream}".format(**params))
>     status = random.random() < 0.5
>     output = dict(
>         data_stream=params['data_stream'],
>         timeperiod=execution_date + timedelta(hours=-1),
>         status=status
>     )
>     return output
> def poll_data_factory(data_stream, dag):
>     return PythonOperator(
>         task_id='poll_{}'.format(data_stream),
>         python_callable=poll_data,
>         params={u'data_stream': data_stream},
>         provide_context=True,
>         dag=dag
>     )
> poll_streams = []
> streams = ['stream' + str(i) for i in range(30)]
> for data_stream in streams:
>     poll = poll_data_factory(data_stream, dag)
>     poll_streams.append(poll)
>     upload_ativity_status.set_upstream(poll)
> {code}
> The following error is thrown:
> {code}
> 2017-01-06 21:41:35,824] {jobs.py:1433} INFO - Heartbeating the scheduler
> Traceback (most recent call last):
>   File "/Users/bolke/Documents/dev/airflow_env/bin/airflow", line 4, in 
> <module>
>     __import__('pkg_resources').run_script('airflow==1.7.2.dev0', 'airflow')
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py",
>  line 739, in run_script
>     self.require(requires)[0].run_script(script_name, ns)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py",
>  line 1494, in run_script
>     exec(code, namespace, namespace)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/EGG-INFO/scripts/airflow",
>  line 28, in <module>
>     args.func(args)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/bin/cli.py",
>  line 380, in run
>     pool=args.pool,
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/utils/db.py",
>  line 54, in wrapper
>     result = func(*args, **kwargs)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py",
>  line 1334, in run
>     self.handle_failure(e, test_mode, context)
>   File 
> "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py",
>  line 1407, in handle_failure
>     session.merge(self)
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 
> 1815, in merge
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 
> 1861, in _merge
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 
> 831, in get
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 
> 864, in _get_impl
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/loading.py", line 
> 223, in load_on_ident
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 
> 2756, in one
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 
> 2726, in one_or_none
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 
> 2797, in __iter__
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 
> 2818, in _execute_and_instances
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 
> 2827, in _get_bind_args
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 
> 2809, in _connection_from_session
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 
> 966, in connection
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 
> 971, in _connection_for_bind
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 
> 382, in _connection_for_bind
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 
> 276, in _assert_active
> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been 
> rolled back due to a previous exception during flush. To begin a new 
> transaction with this Session, first issue Session.rollback(). Original 
> exception was: (_mysql_exceptions.OperationalError) (1213, 'Deadlock found 
> when trying to get lock; try restarting transaction') [SQL: u'INSERT INTO 
> xcom (`key`, value, timestamp, execution_date, task_id, dag_id) VALUES (%s, 
> %s, now(), %s, %s, %s)'] [parameters: (u'return_value', 
> '\x80\x02}q\x00(U\x06statusq\x01\x89U\ntimeperiodq\x02cdatetime\ndatetime\nq\x03U\n\x07\xe1\x01\x06\x13\x00\x00\x00\x00\x00q\x04\x85q\x05Rq\x06U\x0bdata_streamq\x07U\x08stream26q\x08u.',
>  datetime.datetime(2017, 1, 6, 20, 0), 'poll_stream26', 'xcom_test')]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to