uranusjr commented on code in PR #59712:
URL: https://github.com/apache/airflow/pull/59712#discussion_r2641943847
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -876,6 +877,61 @@ def get_task_instance_count(
return count or 0
[email protected]("/previous/{dag_id}/{task_id}", status_code=status.HTTP_200_OK)
+def get_previous_task_instance(
+ dag_id: str,
+ task_id: str,
+ session: SessionDep,
+ logical_date: Annotated[UtcDateTime | None, Query()] = None,
+ run_id: Annotated[str | None, Query()] = None,
+ state: Annotated[TaskInstanceState | None, Query()] = None,
+) -> PreviousTIResponse | None:
+ """
+ Get the previous task instance matching the given criteria.
+
+ :param dag_id: DAG ID (from path)
+ :param task_id: Task ID (from path)
+ :param logical_date: If provided, finds TI with logical_date < this value
(before filter)
+ :param run_id: If provided, filters by run_id
+ :param state: If provided, filters by TaskInstance state
+ """
+ query = (
+ select(TI)
+ .join(DR, (TI.dag_id == DR.dag_id) & (TI.run_id == DR.run_id))
+ .options(joinedload(TI.dag_run))
+ .where(TI.dag_id == dag_id, TI.task_id == task_id)
+ .order_by(DR.logical_date.desc())
+ )
Review Comment:
This wouldn’t work very well with dynamic task mapping. I wonder if this API
should return multiple tis instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]