potiuk commented on issue #33178: URL: https://github.com/apache/airflow/issues/33178#issuecomment-1668122970
Another incarnation of the same problem (SQLite produces different error): https://github.com/apache/airflow/actions/runs/5786336184/job/15681127372?pr=33144#step:6:9388 ``` def test_xcom_map_error_fails_task(dag_maker, session): with dag_maker(session=session) as dag: @dag.task() def push(): return ["a", "b", "c"] @dag.task() def pull(value): print(value) def does_not_work_with_c(v): if v == "c": raise ValueError("nope") return {"value": v * 2} pull.expand_kwargs(push().map(does_not_work_with_c)) dr = dag_maker.create_dagrun() # The "push" task should not fail. decision = dr.task_instance_scheduling_decisions(session=session) for ti in decision.schedulable_tis: ti.run(session=session) assert [ti.state for ti in decision.schedulable_tis] == [TaskInstanceState.SUCCESS] # Prepare to run "pull"... decision = dr.task_instance_scheduling_decisions(session=session) tis = {(ti.task_id, ti.map_index): ti for ti in decision.schedulable_tis} # The first two "pull" tis should also succeed. > tis[("pull", 0)].run(session=session) tests/models/test_xcom_arg_map.py:174: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ airflow/utils/session.py:74: in wrapper return func(*args, **kwargs) airflow/models/taskinstance.py:1840: in run self._run_raw_task( airflow/utils/session.py:74: in wrapper return func(*args, **kwargs) airflow/models/taskinstance.py:1494: in _run_raw_task session.commit() /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in commit self._transaction.commit(_to_root=self.future) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:832: in commit self._prepare_impl() /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:811: in _prepare_impl self.session.flush() /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3449: in flush self._flush(objects) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3589: in _flush transaction.rollback(_capture_exception=True) /usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__ compat.raise_( /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_ raise exception /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3549: in _flush flush_context.execute() /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:456: in execute rec.execute(self) /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:630: in execute util.preloaded.orm_persistence.save_obj( /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:237: in save_obj _emit_update_statements( /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1001: in _emit_update_statements c = connection._execute_20( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1710: in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) /usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:334: in _execute_on_connection return connection._execute_clauseelement( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1577: in _execute_clauseelement ret = self._execute_context( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1953: in _execute_context self._handle_dbapi_exception( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception util.raise_( /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_ raise exception /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: in _execute_context self.dialect.do_execute( /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: in do_execute cursor.execute(statement, parameters) /usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:174: in execute self._discard() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <MySQLdb.cursors.Cursor object at 0x7f52bc978a60> def _discard(self): self.description = None self.description_flags = None # Django uses some member after __exit__. # So we keep rowcount and lastrowid here. They are cleared in Cursor._query(). # self.rowcount = 0 # self.lastrowid = None self._rows = None self.rownumber = None if self._result: self._result.discard() self._result = None con = self.connection if con is None: return > while con.next_result() == 0: # -1 means no more data. E sqlalchemy.exc.ProgrammingError: (MySQLdb.ProgrammingError) (2014, "Commands out of sync; you can't run this command now") E [SQL: UPDATE task_instance SET pid=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s] E [parameters: (90, datetime.datetime(2023, 8, 7, 14, 44, 7, 580365), 'test_dag', 'pull', 'test', 0)] E (Background on this error at: https://sqlalche.me/e/14/f405) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
