ephraimbuddy commented on code in PR #46626:
URL: https://github.com/apache/airflow/pull/46626#discussion_r1959561256


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1675,19 +1677,20 @@ def _verify_integrity_if_dag_changed(self, dag_run: 
DagRun, session: Session) ->
         latest_dag_version = DagVersion.get_latest_version(dag_run.dag_id, 
session=session)
         if TYPE_CHECKING:
             assert latest_dag_version
-        if dag_run.dag_version_id == latest_dag_version.id:
+
+        if latest_dag_version in dag_run.dag_versions:
             self.log.debug("DAG %s not changed structure, skipping 
dagrun.verify_integrity", dag_run.dag_id)
             return True
-
         # Refresh the DAG
         dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, 
session=session)
         if not dag_run.dag:
             return False
-
-        dag_run.dag_version = latest_dag_version
-
+        # Select all TIs in State.unfinished and update the dag_version_id
+        for ti in dag_run.task_instances:
+            if ti.state in State.unfinished:
+                ti.dag_version = latest_dag_version

Review Comment:
   joinedloaded it in 
https://github.com/apache/airflow/pull/46626/commits/e43930f354b344d51d3622b7d110064f272f8891



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to