jedcunningham commented on code in PR #46626:
URL: https://github.com/apache/airflow/pull/46626#discussion_r1958981148
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1675,19 +1677,20 @@ def _verify_integrity_if_dag_changed(self, dag_run:
DagRun, session: Session) ->
latest_dag_version = DagVersion.get_latest_version(dag_run.dag_id,
session=session)
if TYPE_CHECKING:
assert latest_dag_version
- if dag_run.dag_version_id == latest_dag_version.id:
+
+ if latest_dag_version in dag_run.dag_versions:
self.log.debug("DAG %s not changed structure, skipping
dagrun.verify_integrity", dag_run.dag_id)
return True
-
# Refresh the DAG
dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id,
session=session)
if not dag_run.dag:
return False
-
- dag_run.dag_version = latest_dag_version
-
+ # Select all TIs in State.unfinished and update the dag_version_id
+ for ti in dag_run.task_instances:
+ if ti.state in State.unfinished:
+ ti.dag_version = latest_dag_version
Review Comment:
I think it's fine, I believe this isn't lazy loaded, right? If so we should
refactor. Can you just double check that before merging?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]