[ 
https://issues.apache.org/jira/browse/AIRFLOW-1370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873983#comment-16873983
 ] 

t oo commented on AIRFLOW-1370:
-------------------------------

i get same error using trigger_dag in cli:

 

env localexecutor, mysql metastore, airflow 1.10.3 – 
sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1062, 
"Duplicate entry 'redact' for key 'PRIMARY'") [SQL: u'INSERT INTO task_instance 

 

[2019-06-26 20:20:37,124] \{__init__.py:305} INFO - Filling up the DagBag from 
/home/ec2-user/airflow/dags
Traceback (most recent call last):
File "/home/ec2-user/venv/bin/airflow", line 32, in <module>
args.func(args)
File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/cli.py", 
line 74, in wrapper
return f(*args, **kwargs)
File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", 
line 233, in trigger_dag
execution_date=args.exec_date)
File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/api/client/local_client.py",
 line 33, in trigger_dag
execution_date=execution_date)
File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
 line 101, in trigger_dag
replace_microseconds=replace_microseconds,
File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
 line 77, in _trigger_dag
external_trigger=True,
File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", 
line 73, in wrapper
return func(*args, **kwargs)
File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/models/__init__.py",
 line 4095, in create_dagrun
run.verify_integrity(session=session)
File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", 
line 69, in wrapper
return func(*args, **kwargs)
File 
"/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/models/__init__.py",
 line 4934, in verify_integrity
session.commit()
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
 line 1023, in commit
self.transaction.commit()
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
 line 487, in commit
self._prepare_impl()
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
 line 466, in _prepare_impl
self.session.flush()
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
 line 2446, in flush
self._flush(objects)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
 line 2584, in _flush
transaction.rollback(_capture_exception=True)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
 line 67, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py",
 line 2544, in _flush
flush_context.execute()
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
 line 416, in execute
rec.execute(self)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
 line 583, in execute
uow,
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/persistence.py",
 line 245, in save_obj
insert,
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/persistence.py",
 line 1063, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
 line 980, in execute
return meth(self, multiparams, params)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py",
 line 273, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
 line 1099, in _execute_clauseelement
distilled_params,
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
 line 1240, in _execute_context
e, statement, parameters, cursor, context
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
 line 1458, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/util/compat.py",
 line 296, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py",
 line 1216, in _execute_context
cursor, statement, parameters, context
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/dialects/mysql/mysqldb.py",
 line 107, in do_executemany
rowcount = cursor.executemany(statement, parameters)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py", 
line 234, in executemany
self._get_db().encoding)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py", 
line 256, in _do_execute_many
rows += self.execute(sql + postfix)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py", 
line 206, in execute
res = self._query(query)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py", 
line 312, in _query
db.query(q)
File 
"/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/connections.py",
 line 224, in query
_mysql.connection.query(self, query)

> Scheduler is crashing because of IntegrityError
> -----------------------------------------------
>
>                 Key: AIRFLOW-1370
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1370
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery, scheduler
>    Affects Versions: 1.8.0
>            Reporter: Maneesh Sharma
>            Priority: Major
>
> Scheduler is crashing with multiple task running on Celery Executor. It is 
> throwing `{color:red}IntegrityError: (psycopg2.IntegrityError) duplicate key 
> value violates unique constraint "task_instance_pkey"{color}`. Below is the 
> complete stack trace of error --
> Process DagFileProcessor490-Process:
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in 
> _bootstrap
>     self.run()
>   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self._target(*self._args, **self._kwargs)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", 
> line 348, in helper
>     pickle_dags)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/utils/db.py", 
> line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1587, in process_file
>     self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1176, in _process_dags
>     self._process_task_instances(dag, tis_out)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", 
> line 880, in _process_task_instances
>     run.verify_integrity(session=session)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/utils/db.py", 
> line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/models.py", 
> line 4117, in verify_integrity
>     session.commit()
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", 
> line 906, in commit
>     self.transaction.commit()
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", 
> line 461, in commit
>     self._prepare_impl()
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", 
> line 441, in _prepare_impl
>     self.session.flush()
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", 
> line 2171, in flush
>     self._flush(objects)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", 
> line 2291, in _flush
>     transaction.rollback(_capture_exception=True)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
>  line 66, in __exit__
>     compat.reraise(exc_type, exc_value, exc_tb)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", 
> line 2255, in _flush
>     flush_context.execute()
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
>  line 389, in execute
>     rec.execute(self)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
>  line 548, in execute
>     uow
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
>  line 181, in save_obj
>     mapper, table, insert)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
>  line 799, in _emit_insert_statements
>     execute(statement, multiparams)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 945, in execute
>     return meth(self, multiparams, params)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", 
> line 263, in _execute_on_connection
>     return connection._execute_clauseelement(self, multiparams, params)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 1053, in _execute_clauseelement
>     compiled_sql, distilled_params
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 1189, in _execute_context
>     context)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 1402, in _handle_dbapi_exception
>     exc_info
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", 
> line 203, in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 1159, in _execute_context
>     context)
>   File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/default.py",
>  line 467, in do_executemany
>     cursor.executemany(statement, parameters)
> IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique 
> constraint "task_instance_pkey"
> DETAIL:  Key (task_id, dag_id, execution_date)=(Hello_World_task-21, 
> Hello_World_Tasks, 2017-07-04 06:59:40) already exists.
>  [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, 
> start_date, end_date, duration, state, try_number, hostname, unixname, 
> job_id, pool, queue, priority_weight, operator, queued_dttm, pid) VALUES 
> (%(task_id)s, %(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s, 
> %(duration)s, %(state)s, %(try_number)s, %(hostname)s, %(unixname)s, 
> %(job_id)s, %(pool)s, %(queue)s, %(priority_weight)s, %(operator)s, 
> %(queued_dttm)s, %(pid)s)'] [parameters: ({'task_id': 'Hello_World_task-21', 
> 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 
> 'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 
> 40), 'pid': None, 'try_number': 0, 'queue': 'default', 'duration': None, 
> 'state': None, 'start_date': None, 'operator': None, 'priority_weight': 2, 
> 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id': 
> 'Hello_World_task-20', 'unixname': 'ubuntu', 'job_id': None, 'end_date': 
> None, 'pool': None, 'queued_dttm': None, 'execution_date': 
> datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None, 'try_number': 0, 
> 'queue': 'default', 'duration': None, 'state': None, 'start_date': None, 
> 'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 
> 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-23', 'unixname': 
> 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': 
> None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': 
> None, 'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 
> 'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'', 
> 'dag_id': 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-22', 
> 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 
> 'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 
> 40), 'pid': None, 'try_number': 0, 'queue': 'default', 'duration': None, 
> 'state': None, 'start_date': None, 'operator': None, 'priority_weight': 2, 
> 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id': 
> 'Hello_World_task-25', 'unixname': 'ubuntu', 'job_id': None, 'end_date': 
> None, 'pool': None, 'queued_dttm': None, 'execution_date': 
> datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None, 'try_number': 0, 
> 'queue': 'default', 'duration': None, 'state': None, 'start_date': None, 
> 'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 
> 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-24', 'unixname': 
> 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': 
> None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': 
> None, 'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 
> 'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'', 
> 'dag_id': 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-27', 
> 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 
> 'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 
> 40), 'pid': None, 'try_number': 0, 'queue': 'default', 'duration': None, 
> 'state': None, 'start_date': None, 'operator': None, 'priority_weight': 2, 
> 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id': 
> 'Hello_World_task-26', 'unixname': 'ubuntu', 'job_id': None, 'end_date': 
> None, 'pool': None, 'queued_dttm': None, 'execution_date': 
> datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None, 'try_number': 0, 
> 'queue': 'default', 'duration': None, 'state': None, 'start_date': None, 
> 'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 
> 'Hello_World_Tasks'}  ... displaying 10 of 2002 total bound parameter sets 
> ...  {'task_id': 'Hello_World_task-1783', 'unixname': 'ubuntu', 'job_id': 
> None, 'end_date': None, 'pool': None, 'queued_dttm': None, 'execution_date': 
> datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None, 'try_number': 0, 
> 'queue': 'default', 'duration': None, 'state': None, 'start_date': None, 
> 'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 
> 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-1782', 'unixname': 
> 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': 
> None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': 
> None, 'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 
> 'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'', 
> 'dag_id': 'Hello_World_Tasks'})]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to