Copilot commented on code in PR #60956:
URL: https://github.com/apache/airflow/pull/60956#discussion_r2718849787


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2275,10 +2275,20 @@ def _verify_integrity_if_dag_changed(self, dag_run: 
DagRun, session: Session) ->
         dag_run.dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, 
session=session)
         if not dag_run.dag:
             return False
-        # Select all TIs in State.unfinished and update the dag_version_id
-        for ti in dag_run.task_instances:
-            if ti.state in State.unfinished:
-                ti.dag_version = latest_dag_version
+        # Bulk update dag_version_id for unfinished TIs instead of loading all 
TIs into memory.
+        # Use synchronize_session=False since we handle cache coherence via 
session.expire() below.
+        session.execute(
+            update(TI)
+            .where(
+                TI.dag_id == dag_run.dag_id,
+                TI.run_id == dag_run.run_id,
+                TI.state.in_(State.unfinished),
+            )
+            .values(dag_version_id=latest_dag_version.id),
+            execution_options={"synchronize_session": False},
+        )

Review Comment:
   The bulk UPDATE statement references `TI` and `update()` without visible 
imports in this diff. Ensure these are imported at the top of the file (likely 
`from airflow.models.taskinstance import TaskInstance as TI` and `from 
sqlalchemy import update`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to