potiuk commented on issue #32778: URL: https://github.com/apache/airflow/issues/32778#issuecomment-1646822882
More information - aftere separating it out to a separate job that runs sequentially - it seems that it fails much more often and even in isolation: https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:538 ``` __________________ TestDaskExecutor.test_backfill_integration __________________ self = <sqlalchemy.future.engine.Connection object at 0x7f99d4144100> dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7f9a2d[526](https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:539)130> constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>> statement = 'UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s' parameters = {'dag_run_id': 1, 'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC'))} execution_options = immutabledict({'autocommit': True, 'compiled_cache': {(<sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 obj...ed_at'), False, False), <sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0x7f99d41ae3a0>, 208]}}) args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0x7f99d41a5fa0>, [{'dag_run_id': 1, 'last_sche...n': None}], <sqlalchemy.sql.dml.Update object at 0x7f99d58688e0>, [BindParameter('dag_run_id', None, type_=Integer())]) kw = {'cache_hit': symbol('CACHE_HIT')} branched = <sqlalchemy.future.engine.Connection object at 0x7f99d4144100> yp = None conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f99d4159a60> context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0x7f99d41a8c70> cursor = <cursor object at 0x7f99d6b97c70; closed: -1>, evt_handled = False def _execute_context( self, dialect, constructor, statement, parameters, execution_options, *args, **kw ): """Create an :class:`.ExecutionContext` and execute, returning a :class:`_engine.CursorResult`.""" branched = self if self.__branch_from: # if this is a "branched" connection, do everything in terms # of the "root" connection, *except* for .close(), which is # the only feature that branching provides self = self.__branch_from if execution_options: yp = execution_options.get("yield_per", None) if yp: execution_options = execution_options.union( {"stream_results": True, "max_row_buffer": yp} ) try: conn = self._dbapi_connection if conn is None: conn = self._revalidate_connection() context = constructor( dialect, self, conn, execution_options, *args, **kw ) except (exc.PendingRollbackError, exc.ResourceClosedError): raise except BaseException as e: self._handle_dbapi_exception( e, util.text_type(statement), parameters, None, None ) if ( self._transaction and not self._transaction.is_active or ( self._nested_transaction and not self._nested_transaction.is_active ) ): self._invalid_transaction() elif self._trans_context_manager: TransactionalContext._trans_ctx_check(self) if self._is_future and self._transaction is None: self._autobegin() context.pre_exec() if dialect.use_setinputsizes: context._set_input_sizes() cursor, statement, parameters = ( context.cursor, context.statement, context.parameters, ) if not context.executemany: parameters = parameters[0] if self._has_events or self.engine._has_events: for fn in self.dispatch.before_cursor_execute: statement, parameters = fn( self, cursor, statement, parameters, context, context.executemany, ) if self._echo: self._log_info(statement) stats = context._get_cache_stats() if not self.engine.hide_parameters: self._log_info( "[%s] %r", stats, sql_util._repr_params( parameters, batches=10, ismulti=context.executemany ), ) else: self._log_info( "[%s] [SQL parameters hidden due to hide_parameters=True]" % (stats,) ) evt_handled = False try: if context.executemany: if self.dialect._has_events: for fn in self.dialect.dispatch.do_executemany: if fn(cursor, statement, parameters, context): evt_handled = True break if not evt_handled: self.dialect.do_executemany( cursor, statement, parameters, context ) elif not parameters and context.no_parameters: if self.dialect._has_events: for fn in self.dialect.dispatch.do_execute_no_params: if fn(cursor, statement, context): evt_handled = True break if not evt_handled: self.dialect.do_execute_no_params( cursor, statement, context ) else: if self.dialect._has_events: for fn in self.dialect.dispatch.do_execute: if fn(cursor, statement, parameters, context): evt_handled = True break if not evt_handled: > self.dialect.do_execute( cursor, statement, parameters, context ) /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7f9a2d526130> cursor = <cursor object at 0x7f99d6b97c70; closed: -1> statement = 'UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s' parameters = {'dag_run_id': 1, 'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC'))} context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0x7f99d41a8c70> def do_execute(self, cursor, statement, parameters, context=None): > cursor.execute(statement, parameters) E psycopg2.errors.DeadlockDetected: deadlock detected E DETAIL: Process 206 waits for ShareLock on transaction 2211; blocked by process 265. E Process 265 waits for ShareLock on transaction 2210; blocked by process 206. E HINT: See server log for query details. E CONTEXT: while updating tuple (0,215) in relation "dag_run" /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: DeadlockDetected The above exception was the direct cause of the following exception: self = <airflow.jobs.backfill_job_runner.BackfillJobRunner object at 0x7f99d608fcd0> session = <sqlalchemy.orm.session.Session object at 0x7f99fa1165b0> @provide_session def _execute(self, session: Session = NEW_SESSION) -> None: """ Initialize all required components of a dag for a specified date range and execute the tasks. :meta private: """ ti_status = BackfillJobRunner._DagRunTaskStatus() start_date = self.bf_start_date # Get DagRun schedule between the start/end dates, which will turn into dag runs. dagrun_start_date = timezone.coerce_datetime(start_date) if self.bf_end_date is None: dagrun_end_date = pendulum.now(timezone.utc) else: dagrun_end_date = pendulum.instance(self.bf_end_date) dagrun_infos = list(self.dag.iter_dagrun_infos_between(dagrun_start_date, dagrun_end_date)) if self.run_backwards: tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past] if tasks_that_depend_on_past: raise AirflowException( f"You cannot backfill backwards because one or more " f'tasks depend_on_past: {",".join(tasks_that_depend_on_past)}' ) dagrun_infos = dagrun_infos[::-1] if not dagrun_infos: if not self.run_at_least_once: self.log.info("No run dates were found for the given dates and dag interval.") return dagrun_infos = [DagRunInfo.interval(dagrun_start_date, dagrun_end_date)] dag_with_subdags_ids = [d.dag_id for d in self._get_dag_with_subdags()] running_dagruns = DagRun.find( dag_id=dag_with_subdags_ids, execution_start_date=self.bf_start_date, execution_end_date=self.bf_end_date, no_backfills=True, state=DagRunState.RUNNING, ) if running_dagruns: for run in running_dagruns: self.log.error( "Backfill cannot be created for DagRun %s in %s, as there's already %s in a RUNNING " "state.", run.run_id, run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"), run.run_type, ) self.log.error( "Changing DagRun into BACKFILL would cause scheduler to lose track of executing " "tasks. Not changing DagRun type into BACKFILL, and trying insert another DagRun into " "database would cause database constraint violation for dag_id + execution_date " "combination. Please adjust backfill dates or wait for this DagRun to finish.", ) return # picklin' pickle_id = None executor_class, _ = ExecutorLoader.import_default_executor_cls() if not self.donot_pickle and executor_class.supports_pickling: pickle = DagPickle(self.dag) session.add(pickle) session.commit() pickle_id = pickle.id executor = self.job.executor executor.job_id = self.job.id executor.start() ti_status.total_runs = len(dagrun_infos) # total dag runs in backfill try: remaining_dates = ti_status.total_runs while remaining_dates > 0: dagrun_infos_to_process = [ dagrun_info for dagrun_info in dagrun_infos if dagrun_info.logical_date not in ti_status.executed_dag_run_dates ] > self._execute_dagruns( dagrun_infos=dagrun_infos_to_process, ti_status=ti_status, executor=executor, pickle_id=pickle_id, start_date=start_date, session=session, ) airflow/jobs/backfill_job_runner.py:914: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ airflow/utils/session.py:74: in wrapper return func(*args, **kwargs) airflow/jobs/backfill_job_runner.py:802: in _execute_dagruns processed_dag_run_dates = self._process_backfill_task_instances( airflow/jobs/backfill_job_runner.py:645: in _process_backfill_task_instances session.commit() /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in commit self._transaction.commit(_to_root=self.future) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:832: in commit self._prepare_impl() /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:811: in _prepare_impl self.session.flush() /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3449: in flush self._flush(objects) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3589: in _flush transaction.rollback(_capture_exception=True) /usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__ compat.raise_( /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_ raise exception /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3[549](https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:562): in _flush flush_context.execute() /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:456: in execute rec.execute(self) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:630: in execute util.preloaded.orm_persistence.save_obj( /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:237: in save_obj _emit_update_statements( /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1001: in _emit_update_statements c = connection._execute_20( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1710: in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) /usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:334: in _execute_on_connection return connection._execute_clauseelement( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1577: in _execute_clauseelement ret = self._execute_context( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1953: in _execute_context self._handle_dbapi_exception( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception util.raise_( /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_ raise exception /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: in _execute_context self.dialect.do_execute( _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7f9a2d526130> cursor = <cursor object at 0x7f99d6b97c70; closed: -1> statement = 'UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s' parameters = {'dag_run_id': 1, 'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC'))} context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0x7f99d41a8c70> def do_execute(self, cursor, statement, parameters, context=None): > cursor.execute(statement, parameters) E sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected E DETAIL: Process 206 waits for ShareLock on transaction 2211; blocked by process 265. E Process 265 waits for ShareLock on transaction 2210; blocked by process 206. E HINT: See server log for query details. E CONTEXT: while updating tuple (0,215) in relation "dag_run" E E [SQL: UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s] E [parameters: {'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC')), 'dag_run_id': 1}] E (Background on this error at: https://sqlalche.me/e/14/e3q8) /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: OperationalError During handling of the above exception, another exception occurred: self = <tests.executors.test_dask_executor.TestDaskExecutor object at 0x7f99fe7af340> @pytest.mark.quarantined @pytest.mark.execution_timeout(180) def test_backfill_integration(self): """ Test that DaskExecutor can be used to backfill example dags """ dag = self.dagbag.get_dag("example_bash_operator") job = Job( executor=DaskExecutor(cluster_address=self.cluster.scheduler_address), ) job_runner = BackfillJobRunner( job=job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_first_depends_on_past=True, ) > run_job(job=job, execute_callable=job_runner._execute) tests/executors/test_dask_executor.py:125: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ airflow/utils/session.py:77: in wrapper return func(*args, session=session, **kwargs) airflow/jobs/job.py:280: in run_job return execute_job(job, execute_callable=execute_callable) airflow/jobs/job.py:309: in execute_job ret = execute_callable() airflow/utils/session.py:77: in wrapper return func(*args, session=session, **kwargs) airflow/jobs/backfill_job_runner.py:943: in _execute session.commit() /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in commit self._transaction.commit(_to_root=self.future) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:830: in commit self._assert_active(prepared_ok=True) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <sqlalchemy.orm.session.SessionTransaction object at 0x7f99d4154a30> prepared_ok = True, rollback_ok = False, deactive_ok = False closed_msg = 'This transaction is closed' def _assert_active( self, prepared_ok=False, rollback_ok=False, deactive_ok=False, closed_msg="This transaction is closed", ): if self._state is COMMITTED: raise sa_exc.InvalidRequestError( "This session is in 'committed' state; no further " "SQL can be emitted within this transaction." ) elif self._state is PREPARED: if not prepared_ok: raise sa_exc.InvalidRequestError( "This session is in 'prepared' state; no further " "SQL can be emitted within this transaction." ) elif self._state is DEACTIVE: if not deactive_ok and not rollback_ok: if self._rollback_exception: > raise sa_exc.PendingRollbackError( "This Session's transaction has been rolled back " "due to a previous exception during flush." " To begin a new transaction with this Session, " "first issue Session.rollback()." " Original exception was: %s" % self._rollback_exception, code="7s2a", E sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.DeadlockDetected) deadlock detected E DETAIL: Process 206 waits for ShareLock on transaction 2211; blocked by process 265. E Process 265 waits for ShareLock on transaction 2210; blocked by process 206. E HINT: See server log for query details. E CONTEXT: while updating tuple (0,215) in relation "dag_run" E E [SQL: UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s] E [parameters: {'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC')), 'dag_run_id': 1}] E (Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on this error at: https://sqlalche.me/e/14/7s2a) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:[604](https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:617): PendingRollbackError ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
