potiuk commented on issue #33178: URL: https://github.com/apache/airflow/issues/33178#issuecomment-1668144144
Possibly related case: https://github.com/apache/airflow/actions/runs/5778361625/job/15659550497#step:5:8291 This is `test_upstream_in_mapped_group_triggers_only_relevant`, but likely has similar root cause (SQL commands by out of sync/in a wrong sequence). ``` _____________ test_upstream_in_mapped_group_triggers_only_relevant _____________ self = Engine(***postgres/airflow) fn = <bound method Pool.connect of <sqlalchemy.pool.impl.QueuePool object at 0x7fc52cc83700>> connection = None def _wrap_pool_connect(self, fn, connection): dialect = self.dialect try: > return fn() /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3371: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:327: in connect return _ConnectionFairy._checkout(self) /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:995: in _checkout del fairy /usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__ compat.raise_( /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_ raise exception /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:928: in _checkout result = pool._dialect.do_ping(fairy.dbapi_connection) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7fc5e0105370> dbapi_connection = <connection object at 0x7fc5df537180; dsn: 'user=postgres *** dbname=airflow host=postgres', closed: 1> def do_ping(self, dbapi_connection): cursor = None before_autocommit = dbapi_connection.autocommit try: if not before_autocommit: dbapi_connection.autocommit = True cursor = dbapi_connection.cursor() try: cursor.execute(self._dialect_specific_select_one) finally: cursor.close() if not before_autocommit and not dbapi_connection.closed: > dbapi_connection.autocommit = before_autocommit E psycopg2.ProgrammingError: set_session cannot be used inside a transaction /usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py:877: ProgrammingError The above exception was the direct cause of the following exception: dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 0x7fc525ff6ee0> session = <sqlalchemy.orm.session.Session object at 0x7fc5c0e661c0> def test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, session): from airflow.decorators import task, task_group with dag_maker(session=session): @task def t(x): return x @task_group def tg(x): t1 = t.override(task_id="t1")(x=x) return t.override(task_id="t2")(x=t1) t2 = tg.expand(x=[1, 2, 3]) t.override(task_id="t3")(x=t2) dr: DagRun = dag_maker.create_dagrun() def _one_scheduling_decision_iteration() -> dict[tuple[str, int], TaskInstance]: decision = dr.task_instance_scheduling_decisions(session=session) return {(ti.task_id, ti.map_index): ti for ti in decision.schedulable_tis} # Initial decision. tis = _one_scheduling_decision_iteration() assert sorted(tis) == [("tg.t1", 0), ("tg.t1", 1), ("tg.t1", 2)] # After running the first t1, the first t2 becomes immediately available. tis["tg.t1", 0].run() tis = _one_scheduling_decision_iteration() assert sorted(tis) == [("tg.t1", 1), ("tg.t1", 2), ("tg.t2", 0)] # Similarly for the subsequent t2 instances. tis["tg.t1", 2].run() tis = _one_scheduling_decision_iteration() assert sorted(tis) == [("tg.t1", 1), ("tg.t2", 0), ("tg.t2", 2)] # But running t2 partially does not make t3 available. tis["tg.t1", 1].run() tis["tg.t2", 0].run() tis["tg.t2", 2].run() tis = _one_scheduling_decision_iteration() assert sorted(tis) == [("tg.t2", 1)] # Only after all t2 instances are run does t3 become available. > tis["tg.t2", 1].run() tests/ti_deps/deps/test_trigger_rule_dep.py:1158: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ airflow/utils/session.py:77: in wrapper return func(*args, session=session, **kwargs) airflow/models/taskinstance.py:1840: in run self._run_raw_task( airflow/utils/session.py:74: in wrapper return func(*args, **kwargs) airflow/models/taskinstance.py:1514: in _run_raw_task context = self.get_template_context(ignore_param_exceptions=False) airflow/models/taskinstance.py:2204: in get_template_context "prev_data_interval_start_success": get_prev_data_interval_start_success(), airflow/models/taskinstance.py:2077: in get_prev_data_interval_start_success data_interval = _get_previous_dagrun_data_interval_success() airflow/models/taskinstance.py:2071: in _get_previous_dagrun_data_interval_success dagrun = _get_previous_dagrun_success() airflow/models/taskinstance.py:2068: in _get_previous_dagrun_success return self.get_previous_dagrun(state=DagRunState.SUCCESS, session=session) airflow/utils/session.py:74: in wrapper return func(*args, **kwargs) airflow/models/taskinstance.py:1024: in get_previous_dagrun last_dagrun = dr.get_previous_dagrun(session=session, state=state) airflow/utils/session.py:74: in wrapper return func(*args, **kwargs) airflow/models/dagrun.py:533: in get_previous_dagrun return session.scalar(select(DagRun).where(*filters).order_by(DagRun.execution_date.desc())) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1747: in scalar return self.execute( /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1716: in execute conn = self._connection_for_bind(bind) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1555: in _connection_for_bind return self._transaction._connection_for_bind( /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:750: in _connection_for_bind conn = bind.connect() /usr/local/lib/python3.8/site-packages/sqlalchemy/future/engine.py:406: in connect return super(Engine, self).connect() /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3325: in connect return self._connection_cls(self, close_with_result=close_with_result) /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:96: in __init__ else engine.raw_connection() /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3404: in raw_connection return self._wrap_pool_connect(self.pool.connect, _connection) /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3374: in _wrap_pool_connect Connection._handle_dbapi_exception_noconnection( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2208: in _handle_dbapi_exception_noconnection util.raise_( /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_ raise exception /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:3371: in _wrap_pool_connect return fn() /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:327: in connect return _ConnectionFairy._checkout(self) /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:995: in _checkout del fairy /usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__ compat.raise_( /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_ raise exception /usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py:928: in _checkout result = pool._dialect.do_ping(fairy.dbapi_connection) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7fc5e0105370> dbapi_connection = <connection object at 0x7fc5df537180; dsn: 'user=postgres *** dbname=airflow host=postgres', closed: 1> def do_ping(self, dbapi_connection): cursor = None before_autocommit = dbapi_connection.autocommit try: if not before_autocommit: dbapi_connection.autocommit = True cursor = dbapi_connection.cursor() try: cursor.execute(self._dialect_specific_select_one) finally: cursor.close() if not before_autocommit and not dbapi_connection.closed: > dbapi_connection.autocommit = before_autocommit E sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) set_session cannot be used inside a transaction E (Background on this error at: https://sqlalche.me/e/14/f405) /usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py:877: ProgrammingError ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
