[
https://issues.apache.org/jira/browse/AIRFLOW-2219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036815#comment-17036815
]
wei chea commented on AIRFLOW-2219:
-----------------------------------
We seems to be facing the same issue as well. The web server will just crash.
We are using API to trigger the dag.
Anyone know how we can go about troubleshooting this? We been looking at the
web server but didnt see any error log.
{code:java}
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key
value violates unique constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(create_hive_alert_task, default,
2020-02-13 06:54:23.37372+00) already exists.
{code}
> Race condition to DagRun.verify_integrity between Scheduler and Webserver
> -------------------------------------------------------------------------
>
> Key: AIRFLOW-2219
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2219
> Project: Apache Airflow
> Issue Type: Bug
> Components: database
> Affects Versions: 1.8.1, 1.9.0
> Reporter: Will Wong
> Priority: Trivial
>
> Symptoms:
> * Triggering dag causes the 404 nuke page with an error message along the
> lines of: {{psycopg2.IntegrityError: duplicate key value violates unique
> constraint "task_instance_pkey"}} when calling {{DagRun.verify_integrity}}
> Or
> * Similar error in scheduler log for dag file when scheduling a DAG.
> (Example exception at the end of description)
> This occurs because {{Dag.create_dagrun}} commits a the dag_run entry to the
> database and then runs {{verify_integrity}} to add the task_instances
> immediately. However, the scheduler already picks up a dag run before all
> task_instances are created and also calls {{verify_integrity}} to create
> task_instances at the same time.
> I don't _think_ this actually breaks anything in particular. The exception
> happens either on the webpage or in the scheduler logs:
> * If it occurs in the UI, it just scares people thinking something broke but
> the task_instances will be created by the scheduler.
> * If the error shows up in the scheduler, the task_instances are created by
> the webserver and it continues processing the DAG during the next loop.
>
> I'm not sure if {{DagRun.verify_integrity}} is necessary for both
> {{SchedulerJob._process_task_instances}} as well {{Dag.create_dagrun}} but
> perhaps we can just stick to one?
>
> {noformat}
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1170, in _execute_context
> context)
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
> line 683, in do_executemany
> cursor.executemany(statement, parameters)
> psycopg2.IntegrityError: duplicate key value violates unique constraint
> "task_instance_pkey"
> DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0,
> chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 371, in
> helper
> pickle_dags)
> File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50,
> in wrapper
> result = func(*args, **kwargs)
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1792,
> in process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1391,
> in _process_dags
> self._process_task_instances(dag, tis_out)
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 915, in
> _process_task_instances
> run.verify_integrity(session=session)
> File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50,
> in wrapper
> result = func(*args, **kwargs)
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 4786,
> in verify_integrity
> session.commit()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 943, in commit
> self.transaction.commit()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 467, in commit
> self._prepare_impl()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 447, in _prepare_impl
> self.session.flush()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 2254, in flush
> self._flush(objects)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 2380, in _flush
> transaction.rollback(_capture_exception=True)
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line
> 66, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",
> line 187, in reraise
> raise value
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py",
> line 2344, in _flush
> flush_context.execute()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py",
> line 391, in execute
> rec.execute(self)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py",
> line 556, in execute
> uow
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line
> 181, in save_obj
> mapper, table, insert)
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line
> 830, in _emit_insert_statements
> execute(statement, multiparams)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 948, in execute
> return meth(self, multiparams, params)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py",
> line 269, in _execute_on_connection
> return connection._execute_clauseelement(self, multiparams, params)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1060, in _execute_clauseelement
> compiled_sql, distilled_params
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1200, in _execute_context
> context)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1413, in _handle_dbapi_exception
> exc_info
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",
> line 203, in raise_from_cause
> reraise(type(exception), exception, tb=exc_tb, cause=cause)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",
> line 186, in reraise
> raise value.with_traceback(tb)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
> line 1170, in _execute_context
> context)
> File
> "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
> line 683, in do_executemany
> cursor.executemany(statement, parameters)
> DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0,
> chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)