Maneesh Sharma created AIRFLOW-1370:
---------------------------------------
Summary: Scheduler is crashing because of IntegrityError
Key: AIRFLOW-1370
URL: https://issues.apache.org/jira/browse/AIRFLOW-1370
Project: Apache Airflow
Issue Type: Bug
Components: celery, scheduler
Affects Versions: Airflow 1.8
Reporter: Maneesh Sharma
Scheduler is crashing with multiple task running on Celery Executor. It is
throwing `{color:red}IntegrityError: (psycopg2.IntegrityError) duplicate key
value violates unique constraint "task_instance_pkey"{color}`. Below is the
complete stack trace of error --
Process DagFileProcessor490-Process:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", line
348, in helper
pickle_dags)
File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/utils/db.py",
line 53, in wrapper
result = func(*args, **kwargs)
File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", line
1587, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", line
1176, in _process_dags
self._process_task_instances(dag, tis_out)
File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", line
880, in _process_task_instances
run.verify_integrity(session=session)
File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/utils/db.py",
line 53, in wrapper
result = func(*args, **kwargs)
File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/models.py",
line 4117, in verify_integrity
session.commit()
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 906, in commit
self.transaction.commit()
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 461, in commit
self._prepare_impl()
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 441, in _prepare_impl
self.session.flush()
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2171, in flush
self._flush(objects)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2291, in _flush
transaction.rollback(_capture_exception=True)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
line 66, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2255, in _flush
flush_context.execute()
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 389, in execute
rec.execute(self)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 548, in execute
uow
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 181, in save_obj
mapper, table, insert)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 799, in _emit_insert_statements
execute(statement, multiparams)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 945, in execute
return meth(self, multiparams, params)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
line 263, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1053, in _execute_clauseelement
compiled_sql, distilled_params
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1189, in _execute_context
context)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1402, in _handle_dbapi_exception
exc_info
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
line 203, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1159, in _execute_context
context)
File
"/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/default.py",
line 467, in do_executemany
cursor.executemany(statement, parameters)
IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique
constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(Hello_World_task-21,
Hello_World_Tasks, 2017-07-04 06:59:40) already exists.
[SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date,
end_date, duration, state, try_number, hostname, unixname, job_id, pool, queue,
priority_weight, operator, queued_dttm, pid) VALUES (%(task_id)s, %(dag_id)s,
%(execution_date)s, %(start_date)s, %(end_date)s, %(duration)s, %(state)s,
%(try_number)s, %(hostname)s, %(unixname)s, %(job_id)s, %(pool)s, %(queue)s,
%(priority_weight)s, %(operator)s, %(queued_dttm)s, %(pid)s)'] [parameters:
({'task_id': 'Hello_World_task-21', 'unixname': 'ubuntu', 'job_id': None,
'end_date': None, 'pool': None, 'queued_dttm': None, 'execution_date':
datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None, 'try_number': 0,
'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id':
'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-20', 'unixname': 'ubuntu',
'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None,
'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'',
'dag_id': 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-23', 'unixname':
'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None,
'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'',
'dag_id': 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-22', 'unixname':
'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None,
'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'',
'dag_id': 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-25', 'unixname':
'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None,
'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'',
'dag_id': 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-24', 'unixname':
'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None,
'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'',
'dag_id': 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-27', 'unixname':
'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None,
'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'',
'dag_id': 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-26', 'unixname':
'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None,
'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'',
'dag_id': 'Hello_World_Tasks'} ... displaying 10 of 2002 total bound parameter
sets ... {'task_id': 'Hello_World_task-1783', 'unixname': 'ubuntu', 'job_id':
None, 'end_date': None, 'pool': None, 'queued_dttm': None, 'execution_date':
datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None, 'try_number': 0,
'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id':
'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-1782', 'unixname':
'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None,
'start_date': None, 'operator': None, 'priority_weight': 2, 'hostname': u'',
'dag_id': 'Hello_World_Tasks'})]
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)