[
https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
dooer updated AIRFLOW-492:
--------------------------
Comment: was deleted
(was: Hey guys, I'm seeing the same problem. I have a sub-dag (along with sub
dag operator) -- it contains two python operators. The individual python
operators are completing successfully, but the sub dag operator gets marked
failed because of the dup key error. This seems to be happening with some of my
backfill run -- not all of them. Is there a workaround or a solution?
)
> Insert into dag_stats table results into failed task while task itself
> succeeded
> --------------------------------------------------------------------------------
>
> Key: AIRFLOW-492
> URL: https://issues.apache.org/jira/browse/AIRFLOW-492
> Project: Apache Airflow
> Issue Type: Bug
> Reporter: Bolke de Bruin
> Assignee: Siddharth Anand
> Priority: Critical
> Fix For: 1.8.1
>
> Attachments: subdag_test.py
>
>
> In some occasions there seem to be a duplicate key being inserted in
> dag_stats that results in a task/dag run being marked failed while the task
> itself has succeeded.
> [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run <DagRun
> hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00:
> backfill_2016-04-21T00:00:00, externally triggered: False> successful
> [2016-09-07 18:44:17,671] {models.py:1450} ERROR -
> (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry
> 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL:
> u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s,
> %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> Traceback (most recent call last):
> File
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
> line 1409, in run
> result = task_copy.execute(context=context)
> File
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py",
> line 88, in execute
> executor=self.executor)
> File
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
> line 3244, in run
> job.run()
> File
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
> line 189, in run
> self._execute()
> File
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
> line 1855, in _execute
> models.DagStat.clean_dirty([run.dag_id], session=session)
> File
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py",
> line 54, in wrapper
> result = func(*args, **kwargs)
> File
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
> line 3695, in clean_dirty
> session.commit()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 801, in commit
> self.transaction.commit()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 392, in commit
> self._prepare_impl()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 372, in _prepare_impl
> self.session.flush()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 2019, in flush
> self._flush(objects)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 2137, in _flush
> transaction.rollback(_capture_exception=True)
> File
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line
> 60, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 2101, in _flush
> flush_context.execute()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py",
> line 373, in execute
> rec.execute(self)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py",
> line 532, in execute
> uow
> File
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line
> 174, in save_obj
> mapper, table, insert)
> File
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line
> 767, in _emit_insert_statements
> execute(statement, multiparams)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py",
> line 914, in execute
> return meth(self, multiparams, params)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py",
> line 323, in _execute_on_connection
> return connection._execute_clauseelement(self, multiparams, params)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py",
> line 1010, in _execute_clauseelement
> compiled_sql, distilled_params
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py",
> line 1146, in _execute_context
> context)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py",
> line 1341, in _handle_dbapi_exception
> exc_info
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py",
> line 200, in raise_from_cause
> reraise(type(exception), exception, tb=exc_tb, cause=cause)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py",
> line 1139, in _execute_context
> context)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py",
> line 450, in do_execute
> cursor.execute(statement, parameters)
> File "/usr/local/lib/python2.7/dist-packages/MySQLdb/cursors.py", line 226,
> in execute
> self.errorhandler(self, exc, value)
> File "/usr/local/lib/python2.7/dist-packages/MySQLdb/connections.py", line
> 36, in defaulterrorhandler
> raise errorvalue
> IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry
> 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL:
> u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s,
> %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> [2016-09-07 18:44:17,787] {models.py:1473} INFO - Marking task as FAILED.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)