potiuk commented on issue #33178:
URL: https://github.com/apache/airflow/issues/33178#issuecomment-1668144144

   Possibly related case: 
   
   
https://github.com/apache/airflow/actions/runs/5778361625/job/15659550497#step:5:8291
   
   
   This is `test_upstream_in_mapped_group_triggers_only_relevant`, but likely 
has similar root cause (SQL commands by  out of sync/in a wrong sequence). 
   
   ```
   _____________ test_upstream_in_mapped_group_triggers_only_relevant 
_____________
   
   self = Engine(***postgres/airflow)
   fn = <bound method Pool.connect of <sqlalchemy.pool.impl.QueuePool object at 
0x7fc52cc83700>>
   connection = None
   
       def _wrap_pool_connect(self, fn, connection):
           dialect = self.dialect
           try:
   >           return fn()
   
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3371: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:327: in 
connect
       return _ConnectionFairy._checkout(self)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:995: in 
_checkout
       del fairy
   /usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in 
__exit__
       compat.raise_(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in 
raise_
       raise exception
   /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:928: in 
_checkout
       result = pool._dialect.do_ping(fairy.dbapi_connection)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 
0x7fc5e0105370>
   dbapi_connection = <connection object at 0x7fc5df537180; dsn: 'user=postgres 
*** dbname=airflow host=postgres', closed: 1>
   
       def do_ping(self, dbapi_connection):
           cursor = None
           before_autocommit = dbapi_connection.autocommit
           try:
               if not before_autocommit:
                   dbapi_connection.autocommit = True
               cursor = dbapi_connection.cursor()
               try:
                   cursor.execute(self._dialect_specific_select_one)
               finally:
                   cursor.close()
                   if not before_autocommit and not dbapi_connection.closed:
   >                   dbapi_connection.autocommit = before_autocommit
   E                   psycopg2.ProgrammingError: set_session cannot be used 
inside a transaction
   
   
/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py:877:
 ProgrammingError
   
   The above exception was the direct cause of the following exception:
   
   dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 
0x7fc525ff6ee0>
   session = <sqlalchemy.orm.session.Session object at 0x7fc5c0e661c0>
   
       def test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, 
session):
           from airflow.decorators import task, task_group
       
           with dag_maker(session=session):
       
               @task
               def t(x):
                   return x
       
               @task_group
               def tg(x):
                   t1 = t.override(task_id="t1")(x=x)
                   return t.override(task_id="t2")(x=t1)
       
               t2 = tg.expand(x=[1, 2, 3])
               t.override(task_id="t3")(x=t2)
       
           dr: DagRun = dag_maker.create_dagrun()
       
           def _one_scheduling_decision_iteration() -> dict[tuple[str, int], 
TaskInstance]:
               decision = dr.task_instance_scheduling_decisions(session=session)
               return {(ti.task_id, ti.map_index): ti for ti in 
decision.schedulable_tis}
       
           # Initial decision.
           tis = _one_scheduling_decision_iteration()
           assert sorted(tis) == [("tg.t1", 0), ("tg.t1", 1), ("tg.t1", 2)]
       
           # After running the first t1, the first t2 becomes immediately 
available.
           tis["tg.t1", 0].run()
           tis = _one_scheduling_decision_iteration()
           assert sorted(tis) == [("tg.t1", 1), ("tg.t1", 2), ("tg.t2", 0)]
       
           # Similarly for the subsequent t2 instances.
           tis["tg.t1", 2].run()
           tis = _one_scheduling_decision_iteration()
           assert sorted(tis) == [("tg.t1", 1), ("tg.t2", 0), ("tg.t2", 2)]
       
           # But running t2 partially does not make t3 available.
           tis["tg.t1", 1].run()
           tis["tg.t2", 0].run()
           tis["tg.t2", 2].run()
           tis = _one_scheduling_decision_iteration()
           assert sorted(tis) == [("tg.t2", 1)]
       
           # Only after all t2 instances are run does t3 become available.
   >       tis["tg.t2", 1].run()
   
   tests/ti_deps/deps/test_trigger_rule_dep.py:1158: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   airflow/utils/session.py:77: in wrapper
       return func(*args, session=session, **kwargs)
   airflow/models/taskinstance.py:1840: in run
       self._run_raw_task(
   airflow/utils/session.py:74: in wrapper
       return func(*args, **kwargs)
   airflow/models/taskinstance.py:1514: in _run_raw_task
       context = self.get_template_context(ignore_param_exceptions=False)
   airflow/models/taskinstance.py:2204: in get_template_context
       "prev_data_interval_start_success": 
get_prev_data_interval_start_success(),
   airflow/models/taskinstance.py:2077: in get_prev_data_interval_start_success
       data_interval = _get_previous_dagrun_data_interval_success()
   airflow/models/taskinstance.py:2071: in 
_get_previous_dagrun_data_interval_success
       dagrun = _get_previous_dagrun_success()
   airflow/models/taskinstance.py:2068: in _get_previous_dagrun_success
       return self.get_previous_dagrun(state=DagRunState.SUCCESS, 
session=session)
   airflow/utils/session.py:74: in wrapper
       return func(*args, **kwargs)
   airflow/models/taskinstance.py:1024: in get_previous_dagrun
       last_dagrun = dr.get_previous_dagrun(session=session, state=state)
   airflow/utils/session.py:74: in wrapper
       return func(*args, **kwargs)
   airflow/models/dagrun.py:533: in get_previous_dagrun
       return 
session.scalar(select(DagRun).where(*filters).order_by(DagRun.execution_date.desc()))
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1747: in 
scalar
       return self.execute(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1716: in 
execute
       conn = self._connection_for_bind(bind)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1555: in 
_connection_for_bind
       return self._transaction._connection_for_bind(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:750: in 
_connection_for_bind
       conn = bind.connect()
   /usr/local/lib/python3.8/site-packages/sqlalchemy/future/engine.py:406: in 
connect
       return super(Engine, self).connect()
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3325: in 
connect
       return self._connection_cls(self, close_with_result=close_with_result)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:96: in 
__init__
       else engine.raw_connection()
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3404: in 
raw_connection
       return self._wrap_pool_connect(self.pool.connect, _connection)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3374: in 
_wrap_pool_connect
       Connection._handle_dbapi_exception_noconnection(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2208: in 
_handle_dbapi_exception_noconnection
       util.raise_(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in 
raise_
       raise exception
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3371: in 
_wrap_pool_connect
       return fn()
   /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:327: in 
connect
       return _ConnectionFairy._checkout(self)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:995: in 
_checkout
       del fairy
   /usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in 
__exit__
       compat.raise_(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in 
raise_
       raise exception
   /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:928: in 
_checkout
       result = pool._dialect.do_ping(fairy.dbapi_connection)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 
0x7fc5e0105370>
   dbapi_connection = <connection object at 0x7fc5df537180; dsn: 'user=postgres 
*** dbname=airflow host=postgres', closed: 1>
   
       def do_ping(self, dbapi_connection):
           cursor = None
           before_autocommit = dbapi_connection.autocommit
           try:
               if not before_autocommit:
                   dbapi_connection.autocommit = True
               cursor = dbapi_connection.cursor()
               try:
                   cursor.execute(self._dialect_specific_select_one)
               finally:
                   cursor.close()
                   if not before_autocommit and not dbapi_connection.closed:
   >                   dbapi_connection.autocommit = before_autocommit
   E                   sqlalchemy.exc.ProgrammingError: 
(psycopg2.ProgrammingError) set_session cannot be used inside a transaction
   E                   (Background on this error at: 
https://sqlalche.me/e/14/f405)
   
   
/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py:877:
 ProgrammingError
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to