potiuk commented on PR #33150:
URL: https://github.com/apache/airflow/pull/33150#issuecomment-1666814265

   Example failure: 
https://github.com/apache/airflow/actions/runs/5775967682/job/15654656922?pr=32991
   
   ```
   ________________________ test_xcom_map_error_fails_task 
________________________
   
   self = <sqlalchemy.future.engine.Connection object at 0x7f5a70b28c70>
   dialect = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object 
at 0x7f5ad1451db0>
   constructor = <bound method DefaultExecutionContext._init_compiled of <class 
'sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext'>>
   statement = <sqlalchemy.dialects.sqlite.base.SQLiteCompiler object at 
0x7f5ad06c7970>
   parameters = []
   execution_options = immutabledict({'_sa_orm_load_options': 
default_load_options(_legacy_uniquing=True), 
'_result_disable_adapt_to_context': True, 'future_result': True})
   args = (<sqlalchemy.dialects.sqlite.base.SQLiteCompiler object at 
0x7f5ad06c7970>, [], <sqlalchemy.sql.selectable.Select obje...0720 run_id)s', 
'test', type_=String(length=250)), BindParameter('%(140026414543824 
map_index)s', 0, type_=Integer())])
   kw = {'cache_hit': symbol('CACHE_HIT')}
   branched = <sqlalchemy.future.engine.Connection object at 0x7f5a70b28c70>
   yp = None
   conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>
   
       def _execute_context(
           self,
           dialect,
           constructor,
           statement,
           parameters,
           execution_options,
           *args,
           **kw
       ):
           """Create an :class:`.ExecutionContext` and execute, returning
           a :class:`_engine.CursorResult`."""
       
           branched = self
           if self.__branch_from:
               # if this is a "branched" connection, do everything in terms
               # of the "root" connection, *except* for .close(), which is
               # the only feature that branching provides
               self = self.__branch_from
       
           if execution_options:
               yp = execution_options.get("yield_per", None)
               if yp:
                   execution_options = execution_options.union(
                       {"stream_results": True, "max_row_buffer": yp}
                   )
       
           try:
               conn = self._dbapi_connection
               if conn is None:
                   conn = self._revalidate_connection()
       
   >           context = constructor(
                   dialect, self, conn, execution_options, *args, **kw
               )
   
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1810: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1020: 
in _init_compiled
       self.cursor = self.create_cursor()
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1391: 
in create_cursor
       return self.create_default_cursor()
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1394: 
in create_default_cursor
       return self._dbapi_connection.cursor()
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>
   args = (), kwargs = {}
   
       def cursor(self, *args, **kwargs):
           """Return a new DBAPI cursor for the underlying connection.
       
           This method is a proxy for the ``connection.cursor()`` DBAPI
           method.
       
           """
   >       return self.dbapi_connection.cursor(*args, **kwargs)
   E       sqlite3.ProgrammingError: SQLite objects created in a thread can 
only be used in that same thread. The object was created in thread id 
140026872592128 and this is thread id 140028343548800.
   
   /usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py:1133: 
ProgrammingError
   
   The above exception was the direct cause of the following exception:
   
       @contextlib.contextmanager
       def create_session() -> Generator[settings.SASession, None, None]:
           """Contextmanager that will create and teardown a session."""
           Session = getattr(settings, "Session", None)
           if Session is None:
               raise RuntimeError("Session must be set before!")
           session = Session()
           try:
   >           yield session
   
   airflow/utils/session.py:36: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   airflow/utils/session.py:77: in wrapper
       return func(*args, session=session, **kwargs)
   airflow/models/taskinstance.py:1840: in run
       self._run_raw_task(
   airflow/utils/session.py:74: in wrapper
       return func(*args, **kwargs)
   airflow/models/taskinstance.py:1488: in _run_raw_task
       self.refresh_from_db(session=session)
   airflow/utils/session.py:74: in wrapper
       return func(*args, **kwargs)
   airflow/models/taskinstance.py:866: in refresh_from_db
       ti = qry.one_or_none()
   /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2850: in 
one_or_none
       return self._iter().one_or_none()
   /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2916: in 
_iter
       result = self.session.execute(
   /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py:1717: in 
execute
       result = conn._execute_20(statement, params or {}, execution_options)
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1710: in 
_execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
   /usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:334: in 
_execute_on_connection
       return connection._execute_clauseelement(
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1577: in 
_execute_clauseelement
       ret = self._execute_context(
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1816: in 
_execute_context
       self._handle_dbapi_exception(
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2134: in 
_handle_dbapi_exception
       util.raise_(
   /usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py:211: in 
raise_
       raise exception
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1810: in 
_execute_context
       context = constructor(
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1020: 
in _init_compiled
       self.cursor = self.create_cursor()
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1391: 
in create_cursor
       return self.create_default_cursor()
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1394: 
in create_default_cursor
       return self._dbapi_connection.cursor()
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>
   args = (), kwargs = {}
   
       def cursor(self, *args, **kwargs):
           """Return a new DBAPI cursor for the underlying connection.
       
           This method is a proxy for the ``connection.cursor()`` DBAPI
           method.
       
           """
   >       return self.dbapi_connection.cursor(*args, **kwargs)
   E       sqlalchemy.exc.ProgrammingError: (sqlite3.ProgrammingError) SQLite 
objects created in a thread can only be used in that same thread. The object 
was created in thread id 140026872592128 and this is thread id 140028343548800.
   E       [SQL: SELECT task_instance.task_id AS task_instance_task_id, 
task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS 
task_instance_run_id, task_instance.map_index AS task_instance_map_index, 
task_instance.start_date AS task_instance_start_date, task_instance.end_date AS 
task_instance_end_date, task_instance.duration AS task_instance_duration, 
task_instance.state AS task_instance_state, task_instance.try_number AS 
task_instance_try_number, task_instance.max_tries AS task_instance_max_tries, 
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS 
task_instance_unixname, task_instance.job_id AS task_instance_job_id, 
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS 
task_instance_pool_slots, task_instance.queue AS task_instance_queue, 
task_instance.priority_weight AS task_instance_priority_weight, 
task_instance.operator AS task_instance_operator, 
task_instance.custom_operator_name AS task_instance_custom_operator_name, task
 _instance.queued_dttm AS task_instance_queued_dttm, 
task_instance.queued_by_job_id AS task_instance_queued_by_job_id, 
task_instance.pid AS task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.updated_at AS 
task_instance_updated_at, task_instance.external_executor_id AS 
task_instance_external_executor_id, task_instance.trigger_id AS 
task_instance_trigger_id, task_instance.trigger_timeout AS 
task_instance_trigger_timeout, task_instance.next_method AS 
task_instance_next_method, task_instance.next_kwargs AS 
task_instance_next_kwargs 
   E       FROM task_instance 
   E       WHERE task_instance.dag_id = ? AND task_instance.task_id = ? AND 
task_instance.run_id = ? AND task_instance.map_index = ?]
   E       (Background on this error at: https://sqlalche.me/e/14/f405)
   
   /usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py:1133: 
ProgrammingError
   
   During handling of the above exception, another exception occurred:
   
   self = <sqlalchemy.future.engine.Connection object at 0x7f5a70b28c70>
   
       def _rollback_impl(self):
           assert not self.__branch_from
       
           if self._has_events or self.engine._has_events:
               self.dispatch.rollback(self)
       
           if self._still_open_and_dbapi_connection_is_valid:
               if self._echo:
                   if self._is_autocommit_isolation():
                       self._log_info(
                           "ROLLBACK using DBAPI connection.rollback(), "
                           "DBAPI should ignore due to autocommit mode"
                       )
                   else:
                       self._log_info("ROLLBACK")
               try:
   >               self.engine.dialect.do_rollback(self.connection)
   
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1062: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 
0x7f5ad1451db0>
   dbapi_connection = <sqlalchemy.pool.base._ConnectionFairy object at 
0x7f5a70b2add0>
   
       def do_rollback(self, dbapi_connection):
   >       dbapi_connection.rollback()
   E       sqlite3.ProgrammingError: SQLite objects created in a thread can 
only be used in that same thread. The object was created in thread id 
140026872592128 and this is thread id 140028343548800.
   
   /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:683: 
ProgrammingError
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to