pierrejeambrun commented on code in PR #46626:
URL: https://github.com/apache/airflow/pull/46626#discussion_r1955873573
##########
airflow/models/dagrun.py:
##########
@@ -300,6 +297,21 @@ def validate_run_id(self, key: str, run_id: str) -> str |
None:
f"The run_id provided '{run_id}' does not match regex pattern
'{regex}' or '{RUN_ID_REGEX}'"
)
+ @provide_session
+ def dag_versions(self, session: Session = NEW_SESSION) -> list[DagVersion]:
+ """
+ Return the DAG versions associated with the TIs of this DagRun.
+
+ :param session: database session
+ """
+ tis = session.scalars(
+ select(TI.dag_version_id).where(TI.run_id == self.run_id,
TI.dag_id == self.dag_id).distinct()
+ ).all()
+ tih = session.scalars(
+ select(TIH.dag_version_id).where(TIH.run_id == self.run_id,
TIH.dag_id == self.dag_id).distinct()
+ ).all()
+ return list(tis + tih)
Review Comment:
Here is an example with relationship. (I'm sure we can also achieve the same
with hybrid properties and column properties, but here it is).
On the DagRun model add the TIH relationship:
```python
task_instances_history = relationship(
TIH,
primaryjoin="and_(foreign(TaskInstanceHistory.run_id) ==
DagRun.run_id, foreign(TaskInstanceHistory.dag_id) == DagRun.dag_id)",
viewonly=True,
)
```
For convenience I added two association proxies (can be omitted because they
are just for internal use and not public outside the DagRun class):
```python
_ti_dag_versions = association_proxy("task_instances", "dag_version")
_tih_dag_versions = association_proxy("task_instances_history",
"dag_version")
```
Then the 'normal' python property that does deduplication and concatenation
of all versions encountered in the TIH and TI relationships. All that is done
in the app code in memory on already loaded instances. I'm sure we can manage
to get that done directly in SQL but I don't think it's worth the pain to go
further. (cost is really low at this point)
```python
@property
def dag_versions(self) -> list[DagVersion]:
return list(set(self._ti_dag_versions or []) |
set(self._tih_dag_versions or []))
```
Test Code, we can just eagerload the TI and TIH relationship. 1 query should
be emitted for
```
with assert_queries_count(1):
dag_runs = (
session.scalars(
select(DagRun).options(
joinedload(DagRun.task_instances).joinedload(TaskInstance.dag_version),
joinedload(DagRun.task_instances_history).joinedload(TaskInstanceHistory.dag_version),
)
)
.unique()
.all()
)
print(f"{len(dag_runs)} DAG Runs loaded")
for dag_run in dag_runs:
print(f"{dag_run.dag_id}: {len(dag_run.dag_versions)}")
if dag_run.dag_id == "example_branch_operator":
print([dag_version.version_number for dag_version in
dag_run.dag_versions])
```
On small sample with task retries I have:
### With loading options:

### Without loading options:

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]