pierrejeambrun commented on code in PR #44301:
URL: https://github.com/apache/airflow/pull/44301#discussion_r1856651335
##########
airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -234,6 +236,69 @@ def get_task_instance_dependencies(
return TaskDependencyCollectionResponse.model_validate({"dependencies":
deps})
+@task_instances_router.get(
+ task_instances_prefix + "/{task_id}/tries",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+)
+def get_task_instance_tries(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ session: Annotated[Session, Depends(get_session)],
+ map_index: int = -1,
+) -> TaskInstanceHistoryCollectionResponse:
+ """Get list of task instances history."""
+
+ def _query(orm_object: Base) -> Select:
+ query = select(orm_object).where(
+ orm_object.dag_id == dag_id,
+ orm_object.run_id == dag_run_id,
+ orm_object.task_id == task_id,
+ orm_object.map_index == map_index,
+ )
+ return query
+
+ # Exclude TaskInstance with state UP_FOR_RETRY since they have been
recorded in TaskInstanceHistory
+ tis = session.scalars(
+ _query(TI).where(or_(TI.state != TaskInstanceState.UP_FOR_RETRY,
TI.state.is_(None)))
+ ).all()
+ task_instance_select = session.scalars(_query(TIH)).all() + tis
Review Comment:
The request has been made, it's not a select anymore, but `task_instances`
##########
tests/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -2318,3 +2318,44 @@ def test_raises_404_for_non_existent_dag(self,
test_client):
)
assert response.status_code == 404
assert "DAG non-existent-dag not found" in response.text
+
+
+class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
+ def test_should_respond_200(self, test_client, session):
+ self.create_task_instances(
+ session=session, task_instances=[{"state": State.SUCCESS}],
with_ti_history=True
+ )
+ print("here")
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries"
+ )
+ assert response.status_code == 200
+ assert response.json()["total_entries"] == 2 # The task instance and
its history
+ assert len(response.json()["task_instances"]) == 2
+
+ def test_ti_in_retry_state_not_returned(self, test_client, session):
+ self.create_task_instances(
+ session=session, task_instances=[{"state": State.SUCCESS}],
with_ti_history=True
+ )
+ ti = session.query(TaskInstance).one()
+ ti.state = State.UP_FOR_RETRY
+ session.merge(ti)
+ session.commit()
+
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries"
+ )
+ assert response.status_code == 200
+ assert response.json()["total_entries"] == 1
+ assert len(response.json()["task_instances"]) == 1
+
+ def test_raises_404_for_nonexistent_task_instance(self, test_client,
session):
+ self.create_task_instances(session)
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/non_existent_task/tries"
+ )
+ assert response.status_code == 404
+
+ assert response.json() == {
+ "detail": "The Task Instance with dag_id:
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id:
`non_existent_task` and map_index: `-1` was not found"
+ }
Review Comment:
I think we are missing a test for `mapped_task_instances`. It was named
`test_mapped_task_should_respond_200`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]