Prab-27 commented on code in PR #66765:
URL: https://github.com/apache/airflow/pull/66765#discussion_r3254795196


##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1423,6 +1423,48 @@ def test_ti_update_state_database_error(self, client, 
session, create_task_insta
             assert response.status_code == 500
             assert response.json()["detail"] == "Database error occurred"
 
+    def test_ti_update_state_terminal_does_not_lock_dag_run(self, client, 
session, create_task_instance):
+        """
+        Regression guard: session.get(TI, pk, with_for_update=True) must use
+        options=[lazyload(TI.dag_run)] to avoid inadvertently locking dag_run.
+
+        TaskInstance.dag_run has lazy="joined", so without the lazyload 
override the
+        ORM emits FOR UPDATE on both task_instance and dag_run.  The scheduler 
holds
+        a dag_run lock while bulk-updating task_instance rows in
+        _verify_integrity_if_dag_changed, producing a lock-order inversion 
deadlock.
+        """
+        from sqlalchemy.orm import lazyload
+
+        ti = create_task_instance(
+            task_id="test_ti_update_state_no_dag_run_lock",
+            state=State.RUNNING,
+            start_date=DEFAULT_START_DATE,
+        )
+        session.commit()

Review Comment:
   Yes ! we need this `session.commit() ` here ! Thanks !!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to