[ https://issues.apache.org/jira/browse/AIRFLOW-2219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16932479#comment-16932479 ]
Dmitry commented on AIRFLOW-2219: --------------------------------- I had same error a couple of times per day from TriggerDagRunOperator, but after i rewrote it, error is almost gone - new operator is working for a month and a half, and issue happened only once during this time. My operator code is attached, two key features is that i checked if dag already started, and i use same session in get_dagrun and create_dagrun functions. {code:python} from airflow import settings from airflow.utils.state import State from airflow.models import DagBag from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder class DagRunOperator(TriggerDagRunOperator): template_fields = ('execution_date',) ui_color = '#e6ccff' def __init__( self, trigger_dag_id, python_callable, execution_date=None, *args, **kwargs ): self.execution_date = execution_date super(DagRunOperator, self).__init__( trigger_dag_id=trigger_dag_id, python_callable=python_callable, *args, **kwargs ) def execute(self, context): run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S') if self.execution_date is not None \ else datetime.now() dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat()) dro = self.python_callable(context, dro) if dro: session = settings.Session() dbag = DagBag(settings.DAGS_FOLDER) trigger_dag = dbag.get_dag(self.trigger_dag_id) if not trigger_dag.get_dagrun(self.execution_date, session=session): logging.info("Creating DagRun...") dr = trigger_dag.create_dagrun( run_id=dro.run_id, state=State.RUNNING, execution_date=self.execution_date, conf=dro.payload, session=session, external_trigger=True ) logging.info("DagRun Created: {}".format(dr)) session.add(dr) session.commit() else: logging.info("DagRun already exists {}".format(trigger_dag)) session.close() else: logging.info("Criteria not met, moving on") {code} > Race condition to DagRun.verify_integrity between Scheduler and Webserver > ------------------------------------------------------------------------- > > Key: AIRFLOW-2219 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2219 > Project: Apache Airflow > Issue Type: Bug > Components: database > Affects Versions: 1.8.1, 1.9.0 > Reporter: Will Wong > Priority: Trivial > > Symptoms: > * Triggering dag causes the 404 nuke page with an error message along the > lines of: {{psycopg2.IntegrityError: duplicate key value violates unique > constraint "task_instance_pkey"}} when calling {{DagRun.verify_integrity}} > Or > * Similar error in scheduler log for dag file when scheduling a DAG. > (Example exception at the end of description) > This occurs because {{Dag.create_dagrun}} commits a the dag_run entry to the > database and then runs {{verify_integrity}} to add the task_instances > immediately. However, the scheduler already picks up a dag run before all > task_instances are created and also calls {{verify_integrity}} to create > task_instances at the same time. > I don't _think_ this actually breaks anything in particular. The exception > happens either on the webpage or in the scheduler logs: > * If it occurs in the UI, it just scares people thinking something broke but > the task_instances will be created by the scheduler. > * If the error shows up in the scheduler, the task_instances are created by > the webserver and it continues processing the DAG during the next loop. > > I'm not sure if {{DagRun.verify_integrity}} is necessary for both > {{SchedulerJob._process_task_instances}} as well {{Dag.create_dagrun}} but > perhaps we can just stick to one? > > {noformat} > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1170, in _execute_context > context) > File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", > line 683, in do_executemany > cursor.executemany(statement, parameters) > psycopg2.IntegrityError: duplicate key value violates unique constraint > "task_instance_pkey" > DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, > chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists. > The above exception was the direct cause of the following exception: > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 371, in > helper > pickle_dags) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, > in wrapper > result = func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1792, > in process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1391, > in _process_dags > self._process_task_instances(dag, tis_out) > File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 915, in > _process_task_instances > run.verify_integrity(session=session) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, > in wrapper > result = func(*args, **kwargs) > File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 4786, > in verify_integrity > session.commit() > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 943, in commit > self.transaction.commit() > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 467, in commit > self._prepare_impl() > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 447, in _prepare_impl > self.session.flush() > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 2254, in flush > self._flush(objects) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 2380, in _flush > transaction.rollback(_capture_exception=True) > File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line > 66, in __exit__ > compat.reraise(exc_type, exc_value, exc_tb) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", > line 187, in reraise > raise value > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", > line 2344, in _flush > flush_context.execute() > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", > line 391, in execute > rec.execute(self) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", > line 556, in execute > uow > File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line > 181, in save_obj > mapper, table, insert) > File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line > 830, in _emit_insert_statements > execute(statement, multiparams) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 948, in execute > return meth(self, multiparams, params) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", > line 269, in _execute_on_connection > return connection._execute_clauseelement(self, multiparams, params) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1060, in _execute_clauseelement > compiled_sql, distilled_params > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1200, in _execute_context > context) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1413, in _handle_dbapi_exception > exc_info > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", > line 203, in raise_from_cause > reraise(type(exception), exception, tb=exc_tb, cause=cause) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", > line 186, in reraise > raise value.with_traceback(tb) > File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", > line 1170, in _execute_context > context) > File > "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", > line 683, in do_executemany > cursor.executemany(statement, parameters) > DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, > chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists. > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)