Matus Valo created AIRFLOW-1039: ----------------------------------- Summary: Airflow is raising IntegrityError when during parallel DAG trigger Key: AIRFLOW-1039 URL: https://issues.apache.org/jira/browse/AIRFLOW-1039 Project: Apache Airflow Issue Type: Bug Components: DagRun Affects Versions: Airflow 1.8 Reporter: Matus Valo Priority: Minor
When Two concurrent processes are trying to trigger the same dag with the same execution date at the same time, the IntegrityError is thrown by SQLAlchemy: uwsgi[15887]: [2017-03-24 12:51:38,074] {app.py:1587} ERROR - Exception on / [POST] uwsgi[15887]: Traceback (most recent call last): uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1988, in wsgi_app uwsgi[15887]: response = self.full_dispatch_request() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1641, in full_dispatch_request uwsgi[15887]: rv = self.handle_user_exception(e) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1544, in handle_user_exception uwsgi[15887]: reraise(exc_type, exc_value, tb) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1639, in full_dispatch_request uwsgi[15887]: rv = self.dispatch_request() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1625, in dispatch_request uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args) uwsgi[15887]: File "./ws.py", line 21, in hello uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files': input_files}), execution_date=datetime.now()) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 56, in trigger_dag uwsgi[15887]: external_trigger=True uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper uwsgi[15887]: result = func(*args, **kwargs) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", line 3377, in create_dagrun uwsgi[15887]: session.commit() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 874, in commit uwsgi[15887]: self.transaction.commit() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 461, in commit uwsgi[15887]: self._prepare_impl() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 441, in _prepare_impl uwsgi[15887]: self.session.flush() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2139, in flush uwsgi[15887]: self._flush(objects) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2259, in _flush uwsgi[15887]: transaction.rollback(_capture_exception=True) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__ uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2223, in _flush uwsgi[15887]: flush_context.execute() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 389, in execute uwsgi[15887]: rec.execute(self) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 548, in execute uwsgi[15887]: uow uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 181, in save_obj uwsgi[15887]: mapper, table, insert) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 835, in _emit_insert_statements uwsgi[15887]: execute(statement, params) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute uwsgi[15887]: return meth(self, multiparams, params) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, params) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement uwsgi[15887]: compiled_sql, distilled_params uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context uwsgi[15887]: context) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2139, in flush uwsgi[15887]: self._flush(objects) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2259, in _flush uwsgi[15887]: transaction.rollback(_capture_exception=True) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__ uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2223, in _flush uwsgi[15887]: flush_context.execute() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 389, in execute uwsgi[15887]: rec.execute(self) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 548, in execute uwsgi[15887]: uow uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 181, in save_obj uwsgi[15887]: mapper, table, insert) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 835, in _emit_insert_statements uwsgi[15887]: execute(statement, params) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute uwsgi[15887]: return meth(self, multiparams, params) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, params) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement uwsgi[15887]: compiled_sql, distilled_params uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context uwsgi[15887]: context) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception uwsgi[15887]: exc_info uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause uwsgi[15887]: reraise(type(exception), exception, tb=exc_tb, cause=cause) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context uwsgi[15887]: context) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute uwsgi[15887]: cursor.execute(statement, parameters) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/cursors.py", line 205, in execute uwsgi[15887]: self.errorhandler(self, exc, value) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler uwsgi[15887]: raise errorclass, errorvalue uwsgi[15887]: IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'poc_dag2-2017-03-24 12:51:37.000000' for key 'dag_id'") [SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, now(), %s, %s, %s, %s, %s)'] [parameters: ('poc_dag2', datetime.datetime(2017, 3, 24, 12, 51, 37), None, u'running', '4ac49276-10cb-11e7-8197-005056bc55dd', 1, '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/matus/dev/airflowtest/input2/data20:51:30.789572200.gzq\x03as.')] This is not consistent with AirflowException returned by trigger_dag() function. Moreover, the session is not rolled back, hence also another exception is occurring: uwsgi[15887]: [2017-03-24 12:55:54,105] ERROR in app: Exception on / [POST] uwsgi[15887]: Traceback (most recent call last): uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1988, in wsgi_app uwsgi[15887]: response = self.full_dispatch_request() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1641, in full_dispatch_request uwsgi[15887]: rv = self.handle_user_exception(e) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1544, in handle_user_exception uwsgi[15887]: reraise(exc_type, exc_value, tb) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1639, in full_dispatch_request uwsgi[15887]: rv = self.dispatch_request() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1625, in dispatch_request uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args) uwsgi[15887]: File "./ws.py", line 21, in hello uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files': input_files}), execution_date=datetime.now()) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 29, in trigger_dag uwsgi[15887]: dag = dagbag.get_dag(dag_id) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", line 200, in get_dag uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", line 2549, in get_current uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2731, in first uwsgi[15887]: ret = list(self[0:1]) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2523, in __getitem__ uwsgi[15887]: return list(res) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2831, in __iter__ uwsgi[15887]: [2017-03-24 12:55:54,105] {app.py:1587} ERROR - Exception on / [POST] uwsgi[15887]: Traceback (most recent call last): uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1988, in wsgi_app uwsgi[15887]: response = self.full_dispatch_request() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1641, in full_dispatch_request uwsgi[15887]: rv = self.handle_user_exception(e) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1544, in handle_user_exception uwsgi[15887]: reraise(exc_type, exc_value, tb) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1639, in full_dispatch_request uwsgi[15887]: rv = self.dispatch_request() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 1625, in dispatch_request uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args) uwsgi[15887]: File "./ws.py", line 21, in hello uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files': input_files}), execution_date=datetime.now()) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 29, in trigger_dag uwsgi[15887]: dag = dagbag.get_dag(dag_id) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", line 200, in get_dag uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", line 2549, in get_current uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2731, in first uwsgi[15887]: ret = list(self[0:1]) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2523, in __getitem__ uwsgi[15887]: return list(res) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2831, in __iter__ uwsgi[15887]: return self._execute_and_instances(context) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2852, in _execute_and_instances uwsgi[15887]: close_with_result=True) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2861, in _get_bind_args uwsgi[15887]: **kw uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2843, in _connection_from_session uwsgi[15887]: conn = self.session.connection(**kw) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 966, in connection uwsgi[15887]: execution_options=execution_options) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 971, in _connection_for_bind uwsgi[15887]: engine, execution_options) uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 382, in _connection_for_bind uwsgi[15887]: self._assert_active() uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 276, in _assert_active uwsgi[15887]: % self._rollback_exception uwsgi[15887]: InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'poc_dag2-2017-03-24 12:55:51.000000' for key 'dag_id'") [SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, now(), %s, %s, %s, %s, %s)'] [parameters: ('poc_dag2', datetime.datetime(2017, 3, 24, 12, 55, 51), None, u'running', 'e1c78296-10cb-11e7-9e34-005056bc55dd', 1, '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/home/matus/dev/airflowtest/input2/data20:55:49.589767900.gzq\x03as.')] As example, here is the simple example web service causing exceptions when multiple parallel clients tries to process file: from uuid import uuid1 import json from os.path import join from datetime import datetime from flask import Flask from flask import request app = Flask(__name__) @app.route("/", methods=['POST']) def hello(): input_files = list() for f in request.files.values(): fname = join('/home/matus/dev/airflowtest/input', f.filename) f.save(fname) input_files.append(fname) from airflow.api.common.experimental.trigger_dag import trigger_dag trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files': input_files}), execution_date=datetime.now()) return '{"status": "OK"}' if __name__ == "__main__": app.run() -- This message was sent by Atlassian JIRA (v6.3.15#6346)