ephraimbuddy opened a new pull request, #42928: URL: https://github.com/apache/airflow/pull/42928
Previously, this was how it was done, but now, a session was used for both the writing and deletion of RTIF, which we suspect caused StaleDataError. The related PR: https://github.com/apache/airflow/pull/38565 This PR brings back the old behaviour of using different sessions for writing/deleting RTIFs The ERROR: ``` [2024-10-08T14:58:00.817+0000] {taskinstance.py:3310} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task TaskInstance._execute_task_with_callbacks( File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 3122, in _execute_task_with_callbacks _update_rtif(ti=self, rendered_fields=rendered_fields) File "/usr/local/lib/python3.10/site-packages/airflow/api_internal/internal_api_call.py", line 139, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 97, in wrapper return func(*args, session=session, **kwargs) File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1642, in _update_rtif RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, session=session) File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 94, in wrapper return func(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/airflow/models/renderedtifields.py", line 271, in delete_old_records session.flush() File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush with util.safe_reraise(): File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj _emit_update_statements( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1035, in _emit_update_statements raise orm_exc.StaleDataError( sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'rendered_task_instance_fields' expected to update 1 row(s); 0 were matched. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
