jason810496 commented on code in PR #46484:
URL: https://github.com/apache/airflow/pull/46484#discussion_r1946779881
##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -63,15 +65,85 @@
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import ParamValidationError
from airflow.listeners.listener import get_listener_manager
-from airflow.models import DAG, DagModel, DagRun
+from airflow.models import DAG, DagModel, DagRun, TaskInstance
from airflow.models.dag_version import DagVersion
+from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.timetables.base import DataInterval
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
dag_run_router = AirflowRouter(tags=["DagRun"],
prefix="/dags/{dag_id}/dagRuns")
+def get_dag_run_with_task_ids(dag_id: str, run_id: str, session: SessionDep)
-> DagRun:
+ """Get the DagRun with the given task_ids."""
+ return session.scalar(
+ select(DagRun)
+ .options(joinedload(DagRun.task_instances).load_only("task_id"))
+ .filter_by(dag_id=dag_id, run_id=run_id)
+ )
+
+
+def get_dag_version_ids_among_dag_run(dag_id: str, dag_run_id: str, task_ids:
list[str], session: SessionDep):
+ """Get the DagVersions from the TaskInstances and TaskInstanceHistories of
the DagRun."""
+ task_instance_dag_version_ids = select(TaskInstance.dag_version_id).where(
+ TaskInstance.dag_id == dag_id, TaskInstance.run_id == dag_run_id,
TaskInstance.task_id.in_(task_ids)
+ )
+ task_instance_history_dag_version_ids =
select(TaskInstanceHistory.dag_version_id).where(
+ TaskInstanceHistory.dag_id == dag_id,
+ TaskInstanceHistory.run_id == dag_run_id,
+ TaskInstanceHistory.task_id.in_(task_ids),
+ )
+ return (
+ session.execute(
+
task_instance_dag_version_ids.distinct().union(task_instance_history_dag_version_ids.distinct())
+ )
+ .scalars()
+ .all()
+ )
Review Comment:
I just added the `dag_versions_from_task_instance` and
`dag_versions_from_task_instance_history` association proxies to `DagRun` and
also added the `dag_versions` property. However, I'm okay with waiting for
#46565.
If it's not finished by the weekend, I can take care of #46565 then.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]