dstandish commented on code in PR #59115:
URL: https://github.com/apache/airflow/pull/59115#discussion_r2698745380


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1765,24 +1767,48 @@ def _create_dag_runs(self, dag_models: 
Collection[DagModel], session: Session) -
         # as DagModel.dag_id and DagModel.next_dagrun
         # This list is used to verify if the DagRun already exist so that we 
don't attempt to create
         # duplicate DagRuns
-        existing_dagruns = (
-            session.execute(
-                select(DagRun.dag_id, DagRun.logical_date).where(
+        existing_dagrun_objects = (
+            session.scalars(
+                select(DagRun)
+                .where(
                     tuple_(DagRun.dag_id, DagRun.logical_date).in_(
                         (dm.dag_id, dm.next_dagrun) for dm in dag_models
-                    ),
+                    )
                 )
+                .options(load_only(DagRun.dag_id, DagRun.logical_date))
             )
             .unique()
             .all()
         )
+        existing_dagruns = {(x.dag_id, x.logical_date): x for x in 
existing_dagrun_objects}

Review Comment:
   what do we need ... what?  this is a dictionary that is later used to find 
the dr for existing run
   
   it's basically for when something went wrong, scheduler trying to create 
not-latest run, and we need to advance next_dagrun on DagModel



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to