[
https://issues.apache.org/jira/browse/AIRFLOW-2219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873982#comment-16873982
]
t oo commented on AIRFLOW-2219:
-------------------------------
i get same error using trigger_dag in cli:
env localexecutor, mysql metastore, airflow 1.10.3 –
sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1062,
"Duplicate entry 'redact' for key 'PRIMARY'") [SQL: u'INSERT INTO task_instance
[2019-06-26 20:20:37,124] \{__init__.py:305} INFO - Filling up the DagBag from
/home/ec2-user/airflow/dags
Traceback (most recent call last):
File "/home/ec2-user/venv/bin/airflow", line 32, in <module>
args.func(args)
File
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/cli.py",
line 74, in wrapper
return f(*args, **kwargs)
File
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py",
line 233, in trigger_dag
execution_date=args.exec_date)
File
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/api/client/local_client.py",
line 33, in trigger_dag
execution_date=execution_date)
File
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 101, in trigger_dag
replace_microseconds=replace_microseconds,
File
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 77, in _trigger_dag
external_trigger=True,
File
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/db.py",
line 73, in wrapper
return func(*args, **kwargs)
File
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/models/__init__.py",
line 4095, in create_dagrun
run.verify_integrity(session=session)
File
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/db.py",
line 69, in wrapper
return func(*args, **kwargs)
File
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/models/__init__.py",
line 4934, in verify_integrity
session.commit()
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
line 1023, in commit
self.transaction.commit()
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
line 487, in commit
self._prepare_impl()
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
line 466, in _prepare_impl
self.session.flush()
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2446, in flush
self._flush(objects)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2584, in _flush
transaction.rollback(_capture_exception=True)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
line 67, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2544, in _flush
flush_context.execute()
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 416, in execute
rec.execute(self)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 583, in execute
uow,
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 245, in save_obj
insert,
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 1063, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
line 980, in execute
return meth(self, multiparams, params)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py",
line 273, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1099, in _execute_clauseelement
distilled_params,
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1240, in _execute_context
e, statement, parameters, cursor, context
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1458, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/util/compat.py",
line 296, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1216, in _execute_context
cursor, statement, parameters, context
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/dialects/mysql/mysqldb.py",
line 107, in do_executemany
rowcount = cursor.executemany(statement, parameters)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py",
line 234, in executemany
self._get_db().encoding)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py",
line 256, in _do_execute_many
rows += self.execute(sql + postfix)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py",
line 206, in execute
res = self._query(query)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py",
line 312, in _query
db.query(q)
File
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/connections.py",
line 224, in query
_mysql.connection.query(self, query)
> Race condition to DagRun.verify_integrity between Scheduler and Webserver
> -------------------------------------------------------------------------
>
> Key: AIRFLOW-2219
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2219
> Project: Apache Airflow
> Issue Type: Bug
> Components: database
> Affects Versions: 1.8.1, 1.9.0
> Reporter: Will Wong
> Priority: Trivial
>
> Symptoms:
> * Triggering dag causes the 404 nuke page with an error message along the
> lines of: {{psycopg2.IntegrityError: duplicate key value violates unique
> constraint "task_instance_pkey"}} when calling {{DagRun.verify_integrity}}
> Or
> * Similar error in scheduler log for dag file when scheduling a DAG.
> (Example exception at the end of description)
> This occurs because {{Dag.create_dagrun}} commits a the dag_run entry to the
> database and then runs {{verify_integrity}} to add the task_instances
> immediately. However, the scheduler already picks up a dag run before all
> task_instances are created and also calls {{verify_integrity}} to create
> task_instances at the same time.
> I don't _think_ this actually breaks anything in particular. The exception
> happens either on the webpage or in the scheduler logs:
> * If it occurs in the UI, it just scares people thinking something broke but
> the task_instances will be created by the scheduler.
> * If the error shows up in the scheduler, the task_instances are created by
> the webserver and it continues processing the DAG during the next loop.
>
> I'm not sure if {{DagRun.verify_integrity}} is necessary for both
> {{SchedulerJob._process_task_instances}} as well {{Dag.create_dagrun}} but
> perhaps we can just stick to one?
>
> {noformat}
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1170, in _execute_context
> context)
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
> line 683, in do_executemany
> cursor.executemany(statement, parameters)
> psycopg2.IntegrityError: duplicate key value violates unique constraint
> "task_instance_pkey"
> DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0,
> chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 371, in
> helper
> pickle_dags)
> File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50,
> in wrapper
> result = func(*args, **kwargs)
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1792,
> in process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1391,
> in _process_dags
> self._process_task_instances(dag, tis_out)
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 915, in
> _process_task_instances
> run.verify_integrity(session=session)
> File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50,
> in wrapper
> result = func(*args, **kwargs)
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 4786,
> in verify_integrity
> session.commit()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 943, in commit
> self.transaction.commit()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 467, in commit
> self._prepare_impl()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 447, in _prepare_impl
> self.session.flush()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 2254, in flush
> self._flush(objects)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 2380, in _flush
> transaction.rollback(_capture_exception=True)
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line
> 66, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",
> line 187, in reraise
> raise value
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 2344, in _flush
> flush_context.execute()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py",
> line 391, in execute
> rec.execute(self)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py",
> line 556, in execute
> uow
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line
> 181, in save_obj
> mapper, table, insert)
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line
> 830, in _emit_insert_statements
> execute(statement, multiparams)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 948, in execute
> return meth(self, multiparams, params)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py",
> line 269, in _execute_on_connection
> return connection._execute_clauseelement(self, multiparams, params)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1060, in _execute_clauseelement
> compiled_sql, distilled_params
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1200, in _execute_context
> context)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1413, in _handle_dbapi_exception
> exc_info
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",
> line 203, in raise_from_cause
> reraise(type(exception), exception, tb=exc_tb, cause=cause)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",
> line 186, in reraise
> raise value.with_traceback(tb)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1170, in _execute_context
> context)
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
> line 683, in do_executemany
> cursor.executemany(statement, parameters)
> DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0,
> chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)