pierrejeambrun commented on code in PR #46484:
URL: https://github.com/apache/airflow/pull/46484#discussion_r1946772748


##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -63,15 +65,85 @@
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.exceptions import ParamValidationError
 from airflow.listeners.listener import get_listener_manager
-from airflow.models import DAG, DagModel, DagRun
+from airflow.models import DAG, DagModel, DagRun, TaskInstance
 from airflow.models.dag_version import DagVersion
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 from airflow.timetables.base import DataInterval
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 dag_run_router = AirflowRouter(tags=["DagRun"], 
prefix="/dags/{dag_id}/dagRuns")
 
 
+def get_dag_run_with_task_ids(dag_id: str, run_id: str, session: SessionDep) 
-> DagRun:
+    """Get the DagRun with the given task_ids."""
+    return session.scalar(
+        select(DagRun)
+        .options(joinedload(DagRun.task_instances).load_only("task_id"))
+        .filter_by(dag_id=dag_id, run_id=run_id)
+    )
+
+
+def get_dag_version_ids_among_dag_run(dag_id: str, dag_run_id: str, task_ids: 
list[str], session: SessionDep):
+    """Get the DagVersions from the TaskInstances and TaskInstanceHistories of 
the DagRun."""
+    task_instance_dag_version_ids = select(TaskInstance.dag_version_id).where(
+        TaskInstance.dag_id == dag_id, TaskInstance.run_id == dag_run_id, 
TaskInstance.task_id.in_(task_ids)
+    )
+    task_instance_history_dag_version_ids = 
select(TaskInstanceHistory.dag_version_id).where(
+        TaskInstanceHistory.dag_id == dag_id,
+        TaskInstanceHistory.run_id == dag_run_id,
+        TaskInstanceHistory.task_id.in_(task_ids),
+    )
+    return (
+        session.execute(
+            
task_instance_dag_version_ids.distinct().union(task_instance_history_dag_version_ids.distinct())
+        )
+        .scalars()
+        .all()
+    )

Review Comment:
   Hello @jason810496,
   
   To not make this issue too big / hard, @ephraimbuddy suggested to help on 
the ORM / database side (migration, ORM update etc...). I created another issue 
for that https://github.com/apache/airflow/issues/46565. Once Ephraim is done 
working on that, you can continue integrating the change into the RestAPI.
   
   Does that work for you ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to