potiuk commented on issue #32778:
URL: https://github.com/apache/airflow/issues/32778#issuecomment-1646822882

   More information - aftere separating it out to a separate job that runs 
sequentially - it seems that it fails much more often and even in isolation: 
   
   
https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:538
   
   ```
   __________________ TestDaskExecutor.test_backfill_integration 
__________________
     
     self = <sqlalchemy.future.engine.Connection object at 0x7f99d4144100>
     dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 
object at 
0x7f9a2d[526](https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:539)130>
     constructor = <bound method DefaultExecutionContext._init_compiled of 
<class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
     statement = 'UPDATE dag_run SET 
last_scheduling_decision=%(last_scheduling_decision)s, 
updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s'
     parameters = {'dag_run_id': 1, 'last_scheduling_decision': None, 
'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, 
tzinfo=Timezone('UTC'))}
     execution_options = immutabledict({'autocommit': True, 'compiled_cache': 
{(<sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 obj...ed_at'), 
False, False), <sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 
object at 0x7f99d41ae3a0>, 208]}})
     args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 
object at 0x7f99d41a5fa0>, [{'dag_run_id': 1, 'last_sche...n': None}], 
<sqlalchemy.sql.dml.Update object at 0x7f99d58688e0>, 
[BindParameter('dag_run_id', None, type_=Integer())])
     kw = {'cache_hit': symbol('CACHE_HIT')}
     branched = <sqlalchemy.future.engine.Connection object at 0x7f99d4144100>
     yp = None
     conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f99d4159a60>
     context = 
<sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 
0x7f99d41a8c70>
     cursor = <cursor object at 0x7f99d6b97c70; closed: -1>, evt_handled = False
     
         def _execute_context(
             self,
             dialect,
             constructor,
             statement,
             parameters,
             execution_options,
             *args,
             **kw
         ):
             """Create an :class:`.ExecutionContext` and execute, returning
             a :class:`_engine.CursorResult`."""
         
             branched = self
             if self.__branch_from:
                 # if this is a "branched" connection, do everything in terms
                 # of the "root" connection, *except* for .close(), which is
                 # the only feature that branching provides
                 self = self.__branch_from
         
             if execution_options:
                 yp = execution_options.get("yield_per", None)
                 if yp:
                     execution_options = execution_options.union(
                         {"stream_results": True, "max_row_buffer": yp}
                     )
         
             try:
                 conn = self._dbapi_connection
                 if conn is None:
                     conn = self._revalidate_connection()
         
                 context = constructor(
                     dialect, self, conn, execution_options, *args, **kw
                 )
             except (exc.PendingRollbackError, exc.ResourceClosedError):
                 raise
             except BaseException as e:
                 self._handle_dbapi_exception(
                     e, util.text_type(statement), parameters, None, None
                 )
         
             if (
                 self._transaction
                 and not self._transaction.is_active
                 or (
                     self._nested_transaction
                     and not self._nested_transaction.is_active
                 )
             ):
                 self._invalid_transaction()
         
             elif self._trans_context_manager:
                 TransactionalContext._trans_ctx_check(self)
         
             if self._is_future and self._transaction is None:
                 self._autobegin()
         
             context.pre_exec()
         
             if dialect.use_setinputsizes:
                 context._set_input_sizes()
         
             cursor, statement, parameters = (
                 context.cursor,
                 context.statement,
                 context.parameters,
             )
         
             if not context.executemany:
                 parameters = parameters[0]
         
             if self._has_events or self.engine._has_events:
                 for fn in self.dispatch.before_cursor_execute:
                     statement, parameters = fn(
                         self,
                         cursor,
                         statement,
                         parameters,
                         context,
                         context.executemany,
                     )
         
             if self._echo:
         
                 self._log_info(statement)
         
                 stats = context._get_cache_stats()
         
                 if not self.engine.hide_parameters:
                     self._log_info(
                         "[%s] %r",
                         stats,
                         sql_util._repr_params(
                             parameters, batches=10, ismulti=context.executemany
                         ),
                     )
                 else:
                     self._log_info(
                         "[%s] [SQL parameters hidden due to 
hide_parameters=True]"
                         % (stats,)
                     )
         
             evt_handled = False
             try:
                 if context.executemany:
                     if self.dialect._has_events:
                         for fn in self.dialect.dispatch.do_executemany:
                             if fn(cursor, statement, parameters, context):
                                 evt_handled = True
                                 break
                     if not evt_handled:
                         self.dialect.do_executemany(
                             cursor, statement, parameters, context
                         )
                 elif not parameters and context.no_parameters:
                     if self.dialect._has_events:
                         for fn in self.dialect.dispatch.do_execute_no_params:
                             if fn(cursor, statement, context):
                                 evt_handled = True
                                 break
                     if not evt_handled:
                         self.dialect.do_execute_no_params(
                             cursor, statement, context
                         )
                 else:
                     if self.dialect._has_events:
                         for fn in self.dialect.dispatch.do_execute:
                             if fn(cursor, statement, parameters, context):
                                 evt_handled = True
                                 break
                     if not evt_handled:
     >                   self.dialect.do_execute(
                             cursor, statement, parameters, context
                         )
     
     /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: 
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     
     self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object 
at 0x7f9a2d526130>
     cursor = <cursor object at 0x7f99d6b97c70; closed: -1>
     statement = 'UPDATE dag_run SET 
last_scheduling_decision=%(last_scheduling_decision)s, 
updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s'
     parameters = {'dag_run_id': 1, 'last_scheduling_decision': None, 
'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, 
tzinfo=Timezone('UTC'))}
     context = 
<sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 
0x7f99d41a8c70>
     
         def do_execute(self, cursor, statement, parameters, context=None):
     >       cursor.execute(statement, parameters)
     E       psycopg2.errors.DeadlockDetected: deadlock detected
     E       DETAIL:  Process 206 waits for ShareLock on transaction 2211; 
blocked by process 265.
     E       Process 265 waits for ShareLock on transaction 2210; blocked by 
process 206.
     E       HINT:  See server log for query details.
     E       CONTEXT:  while updating tuple (0,215) in relation "dag_run"
     
     /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: 
DeadlockDetected
     
     The above exception was the direct cause of the following exception:
     
     self = <airflow.jobs.backfill_job_runner.BackfillJobRunner object at 
0x7f99d608fcd0>
     session = <sqlalchemy.orm.session.Session object at 0x7f99fa1165b0>
     
         @provide_session
         def _execute(self, session: Session = NEW_SESSION) -> None:
             """
             Initialize all required components of a dag for a specified date 
range and execute the tasks.
         
             :meta private:
             """
             ti_status = BackfillJobRunner._DagRunTaskStatus()
         
             start_date = self.bf_start_date
         
             # Get DagRun schedule between the start/end dates, which will turn 
into dag runs.
             dagrun_start_date = timezone.coerce_datetime(start_date)
             if self.bf_end_date is None:
                 dagrun_end_date = pendulum.now(timezone.utc)
             else:
                 dagrun_end_date = pendulum.instance(self.bf_end_date)
             dagrun_infos = 
list(self.dag.iter_dagrun_infos_between(dagrun_start_date, dagrun_end_date))
             if self.run_backwards:
                 tasks_that_depend_on_past = [t.task_id for t in 
self.dag.task_dict.values() if t.depends_on_past]
                 if tasks_that_depend_on_past:
                     raise AirflowException(
                         f"You cannot backfill backwards because one or more "
                         f'tasks depend_on_past: 
{",".join(tasks_that_depend_on_past)}'
                     )
                 dagrun_infos = dagrun_infos[::-1]
         
             if not dagrun_infos:
                 if not self.run_at_least_once:
                     self.log.info("No run dates were found for the given dates 
and dag interval.")
                     return
                 dagrun_infos = [DagRunInfo.interval(dagrun_start_date, 
dagrun_end_date)]
         
             dag_with_subdags_ids = [d.dag_id for d in 
self._get_dag_with_subdags()]
             running_dagruns = DagRun.find(
                 dag_id=dag_with_subdags_ids,
                 execution_start_date=self.bf_start_date,
                 execution_end_date=self.bf_end_date,
                 no_backfills=True,
                 state=DagRunState.RUNNING,
             )
         
             if running_dagruns:
                 for run in running_dagruns:
                     self.log.error(
                         "Backfill cannot be created for DagRun %s in %s, as 
there's already %s in a RUNNING "
                         "state.",
                         run.run_id,
                         run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"),
                         run.run_type,
                     )
                 self.log.error(
                     "Changing DagRun into BACKFILL would cause scheduler to 
lose track of executing "
                     "tasks. Not changing DagRun type into BACKFILL, and trying 
insert another DagRun into "
                     "database would cause database constraint violation for 
dag_id + execution_date "
                     "combination. Please adjust backfill dates or wait for 
this DagRun to finish.",
                 )
                 return
             # picklin'
             pickle_id = None
         
             executor_class, _ = ExecutorLoader.import_default_executor_cls()
         
             if not self.donot_pickle and executor_class.supports_pickling:
                 pickle = DagPickle(self.dag)
                 session.add(pickle)
                 session.commit()
                 pickle_id = pickle.id
         
             executor = self.job.executor
             executor.job_id = self.job.id
             executor.start()
         
             ti_status.total_runs = len(dagrun_infos)  # total dag runs in 
backfill
         
             try:
                 remaining_dates = ti_status.total_runs
                 while remaining_dates > 0:
                     dagrun_infos_to_process = [
                         dagrun_info
                         for dagrun_info in dagrun_infos
                         if dagrun_info.logical_date not in 
ti_status.executed_dag_run_dates
                     ]
     >               self._execute_dagruns(
                         dagrun_infos=dagrun_infos_to_process,
                         ti_status=ti_status,
                         executor=executor,
                         pickle_id=pickle_id,
                         start_date=start_date,
                         session=session,
                     )
     
     airflow/jobs/backfill_job_runner.py:914: 
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     airflow/utils/session.py:74: in wrapper
         return func(*args, **kwargs)
     airflow/jobs/backfill_job_runner.py:802: in _execute_dagruns
         processed_dag_run_dates = self._process_backfill_task_instances(
     airflow/jobs/backfill_job_runner.py:645: in 
_process_backfill_task_instances
         session.commit()
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in 
commit
         self._transaction.commit(_to_root=self.future)
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:832: in 
commit
         self._prepare_impl()
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:811: in 
_prepare_impl
         self.session.flush()
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3449: in 
flush
         self._flush(objects)
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3589: in 
_flush
         transaction.rollback(_capture_exception=True)
     /usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: 
in __exit__
         compat.raise_(
     /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in 
raise_
         raise exception
     
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3[549](https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:562):
 in _flush
         flush_context.execute()
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:456: 
in execute
         rec.execute(self)
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:630: 
in execute
         util.preloaded.orm_persistence.save_obj(
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:237: 
in save_obj
         _emit_update_statements(
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1001: 
in _emit_update_statements
         c = connection._execute_20(
     /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1710: in 
_execute_20
         return meth(self, args_10style, kwargs_10style, execution_options)
     /usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:334: in 
_execute_on_connection
         return connection._execute_clauseelement(
     /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1577: in 
_execute_clauseelement
         ret = self._execute_context(
     /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1953: in 
_execute_context
         self._handle_dbapi_exception(
     /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2134: in 
_handle_dbapi_exception
         util.raise_(
     /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in 
raise_
         raise exception
     /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: in 
_execute_context
         self.dialect.do_execute(
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     
     self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object 
at 0x7f9a2d526130>
     cursor = <cursor object at 0x7f99d6b97c70; closed: -1>
     statement = 'UPDATE dag_run SET 
last_scheduling_decision=%(last_scheduling_decision)s, 
updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s'
     parameters = {'dag_run_id': 1, 'last_scheduling_decision': None, 
'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, 
tzinfo=Timezone('UTC'))}
     context = 
<sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 
0x7f99d41a8c70>
     
         def do_execute(self, cursor, statement, parameters, context=None):
     >       cursor.execute(statement, parameters)
     E       sqlalchemy.exc.OperationalError: 
(psycopg2.errors.DeadlockDetected) deadlock detected
     E       DETAIL:  Process 206 waits for ShareLock on transaction 2211; 
blocked by process 265.
     E       Process 265 waits for ShareLock on transaction 2210; blocked by 
process 206.
     E       HINT:  See server log for query details.
     E       CONTEXT:  while updating tuple (0,215) in relation "dag_run"
     E       
     E       [SQL: UPDATE dag_run SET 
last_scheduling_decision=%(last_scheduling_decision)s, 
updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
     E       [parameters: {'last_scheduling_decision': None, 'updated_at': 
datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC')), 
'dag_run_id': 1}]
     E       (Background on this error at: https://sqlalche.me/e/14/e3q8)
     
     /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: 
OperationalError
     
     During handling of the above exception, another exception occurred:
     
     self = <tests.executors.test_dask_executor.TestDaskExecutor object at 
0x7f99fe7af340>
     
         @pytest.mark.quarantined
         @pytest.mark.execution_timeout(180)
         def test_backfill_integration(self):
             """
             Test that DaskExecutor can be used to backfill example dags
             """
             dag = self.dagbag.get_dag("example_bash_operator")
         
             job = Job(
                 
executor=DaskExecutor(cluster_address=self.cluster.scheduler_address),
             )
             job_runner = BackfillJobRunner(
                 job=job,
                 dag=dag,
                 start_date=DEFAULT_DATE,
                 end_date=DEFAULT_DATE,
                 ignore_first_depends_on_past=True,
             )
     >       run_job(job=job, execute_callable=job_runner._execute)
     
     tests/executors/test_dask_executor.py:125: 
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     airflow/utils/session.py:77: in wrapper
         return func(*args, session=session, **kwargs)
     airflow/jobs/job.py:280: in run_job
         return execute_job(job, execute_callable=execute_callable)
     airflow/jobs/job.py:309: in execute_job
         ret = execute_callable()
     airflow/utils/session.py:77: in wrapper
         return func(*args, session=session, **kwargs)
     airflow/jobs/backfill_job_runner.py:943: in _execute
         session.commit()
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in 
commit
         self._transaction.commit(_to_root=self.future)
     /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:830: in 
commit
         self._assert_active(prepared_ok=True)
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     
     self = <sqlalchemy.orm.session.SessionTransaction object at 0x7f99d4154a30>
     prepared_ok = True, rollback_ok = False, deactive_ok = False
     closed_msg = 'This transaction is closed'
     
         def _assert_active(
             self,
             prepared_ok=False,
             rollback_ok=False,
             deactive_ok=False,
             closed_msg="This transaction is closed",
         ):
             if self._state is COMMITTED:
                 raise sa_exc.InvalidRequestError(
                     "This session is in 'committed' state; no further "
                     "SQL can be emitted within this transaction."
                 )
             elif self._state is PREPARED:
                 if not prepared_ok:
                     raise sa_exc.InvalidRequestError(
                         "This session is in 'prepared' state; no further "
                         "SQL can be emitted within this transaction."
                     )
             elif self._state is DEACTIVE:
                 if not deactive_ok and not rollback_ok:
                     if self._rollback_exception:
     >                   raise sa_exc.PendingRollbackError(
                             "This Session's transaction has been rolled back "
                             "due to a previous exception during flush."
                             " To begin a new transaction with this Session, "
                             "first issue Session.rollback()."
                             " Original exception was: %s"
                             % self._rollback_exception,
                             code="7s2a",
     E                       sqlalchemy.exc.PendingRollbackError: This 
Session's transaction has been rolled back due to a previous exception during 
flush. To begin a new transaction with this Session, first issue 
Session.rollback(). Original exception was: (psycopg2.errors.DeadlockDetected) 
deadlock detected
     E                       DETAIL:  Process 206 waits for ShareLock on 
transaction 2211; blocked by process 265.
     E                       Process 265 waits for ShareLock on transaction 
2210; blocked by process 206.
     E                       HINT:  See server log for query details.
     E                       CONTEXT:  while updating tuple (0,215) in relation 
"dag_run"
     E                       
     E                       [SQL: UPDATE dag_run SET 
last_scheduling_decision=%(last_scheduling_decision)s, 
updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
     E                       [parameters: {'last_scheduling_decision': None, 
'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, 
tzinfo=Timezone('UTC')), 'dag_run_id': 1}]
     E                       (Background on this error at: 
https://sqlalche.me/e/14/e3q8) (Background on this error at: 
https://sqlalche.me/e/14/7s2a)
     
     
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:[604](https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:617):
 PendingRollbackError
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to