jedcunningham commented on code in PR #60956:
URL: https://github.com/apache/airflow/pull/60956#discussion_r2718853823


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2275,10 +2275,20 @@ def _verify_integrity_if_dag_changed(self, dag_run: 
DagRun, session: Session) ->
         dag_run.dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, 
session=session)
         if not dag_run.dag:
             return False
-        # Select all TIs in State.unfinished and update the dag_version_id
-        for ti in dag_run.task_instances:
-            if ti.state in State.unfinished:
-                ti.dag_version = latest_dag_version
+        # Bulk update dag_version_id for unfinished TIs instead of loading all 
TIs into memory.
+        # Use synchronize_session=False since we handle cache coherence via 
session.expire() below.
+        session.execute(
+            update(TI)
+            .where(
+                TI.dag_id == dag_run.dag_id,
+                TI.run_id == dag_run.run_id,
+                TI.state.in_(State.unfinished),
+            )
+            .values(dag_version_id=latest_dag_version.id),
+            execution_options={"synchronize_session": False},
+        )

Review Comment:
   _Imports are there!_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to