pierrejeambrun commented on code in PR #46626:
URL: https://github.com/apache/airflow/pull/46626#discussion_r1955873573


##########
airflow/models/dagrun.py:
##########
@@ -300,6 +297,21 @@ def validate_run_id(self, key: str, run_id: str) -> str | 
None:
             f"The run_id provided '{run_id}' does not match regex pattern 
'{regex}' or '{RUN_ID_REGEX}'"
         )
 
+    @provide_session
+    def dag_versions(self, session: Session = NEW_SESSION) -> list[DagVersion]:
+        """
+        Return the DAG versions associated with the TIs of this DagRun.
+
+        :param session: database session
+        """
+        tis = session.scalars(
+            select(TI.dag_version_id).where(TI.run_id == self.run_id, 
TI.dag_id == self.dag_id).distinct()
+        ).all()
+        tih = session.scalars(
+            select(TIH.dag_version_id).where(TIH.run_id == self.run_id, 
TIH.dag_id == self.dag_id).distinct()
+        ).all()
+        return list(tis + tih)

Review Comment:
   Here is an example with relationship. (I'm sure we can also achieve the same 
with hybrid properties and column properties, but here it is).
   
   On the DagRun model add the TIH relationship:
   ```python
       task_instances_history = relationship(
           TIH,
           primaryjoin="and_(foreign(TaskInstanceHistory.run_id) == 
DagRun.run_id, foreign(TaskInstanceHistory.dag_id) == DagRun.dag_id)",
           viewonly=True,
       )
   ```
   
   For convenience I added two association proxies (can be omitted because they 
are just for internal use and not public outside the DagRun class):
   ```python
       _ti_dag_versions = association_proxy("task_instances", "dag_version")
       _tih_dag_versions = association_proxy("task_instances_history", 
"dag_version")
   ```
   
   Then the 'normal' python property that does deduplication and concatenation 
of all versions encountered in the TIH and TI relationships. All that is done 
in the app code in memory on already loaded instances. I'm sure we can manage 
to get that done directly in SQL but I don't think it's worth the pain to go 
further. (cost is really low at this point)
   ```python
       @property
       def dag_versions(self) -> list[DagVersion]:
           return list(set(self._ti_dag_versions or []) | 
set(self._tih_dag_versions or []))
   ```
   
   
   Test Code, we can just eagerload the TI and TIH relationship. 1 query should 
be emitted for 
   ```
       with assert_queries_count(1):
           dag_runs = (
               session.scalars(
                   select(DagRun).options(
                       
joinedload(DagRun.task_instances).joinedload(TaskInstance.dag_version),
                       
joinedload(DagRun.task_instances_history).joinedload(TaskInstanceHistory.dag_version),
                   )
               )
               .unique()
               .all()
           )
           print(f"{len(dag_runs)} DAG Runs loaded")
   
           for dag_run in dag_runs:
               print(f"{dag_run.dag_id}: {len(dag_run.dag_versions)}")
   
               if dag_run.dag_id == "example_branch_operator":
                   print([dag_version.version_number for dag_version in 
dag_run.dag_versions])
   ```
   On small sample with task retries I have:
   
   ### With loading options:
   ![Screenshot 2025-02-14 at 10 59 
11](https://github.com/user-attachments/assets/6d590600-7499-49a5-8910-c31ded5827a4)
   
   
   ### Without loading options:
   ![Screenshot 2025-02-14 at 10 56 
55](https://github.com/user-attachments/assets/30649289-37d8-4531-865c-8903ccd512b0)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to