sudarshan2906 opened a new issue #13811:
URL: https://github.com/apache/airflow/issues/13811


   **Apache Airflow version**: 2.0
   
   - **OS** (e.g. from /etc/os-release): 
   
   PRETTY_NAME="Debian GNU/Linux 10 (buster)"
   NAME="Debian GNU/Linux"
   VERSION_ID="10"
   VERSION="10 (buster)"
   VERSION_CODENAME=buster
   ID=debian
   
   **What happened**:
   I am using a task_instance_mutation_hook cluster policy and inside that 
using get_dagrun() function from task_instance object to get the dag run object 
for the task instance. This was working fine in 1.10.14 but it started giving 
following error in 2.0 due to which scheduler is not starting.
   
   ```
   dag_run = task_instance.get_dagrun()
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.7/contextlib.py", line 119, in __exit__
       next(self.gen)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 32, in create_session
       session.commit()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 1042, in commit
       self.transaction.commit()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 504, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 472, in _prepare_impl
       self.session.dispatch.before_commit(self.session)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/event/attr.py", 
line 322, in __call__
       fn(*args, **kw)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py", 
line 217, in _validate_commit
       raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
   ```
   I am using the dag run object to get the conf passed to the dag run and I am 
setting some properties of the task_instance according to it.
   
   **How to reproduce it**:
   Example of the cluster policy used:
   ```
   def task_instance_mutation_hook(task_instance):
       dag_run = task_instance.get_dagrun()
       conf = dag_run.conf
   
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to