This is an automated email from the ASF dual-hosted git repository. utkarsharma pushed a commit to branch sync_2-10-test-rc2 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 39e666bc9ed3f04533a54fa45cb09cdd203de488 Author: Kalyan R <[email protected]> AuthorDate: Tue Oct 29 20:29:12 2024 +0530 include limit and offset in request body schema for List task instances (batch) endpoint (#43479) * include limit and offset in request body schema for List task instances (batch) endpoint (#42870) * add offset and limit * add offset and limit * add offset and limit * add tests * add test for default offset and limit * fix * Update airflow/api_connexion/endpoints/task_instance_endpoint.py Co-authored-by: Ephraim Anierobi <[email protected]> * Update tests/api_connexion/endpoints/test_task_instance_endpoint.py Co-authored-by: Ephraim Anierobi <[email protected]> * fix test --------- Co-authored-by: Ephraim Anierobi <[email protected]> (cherry picked from commit 7b85d4e6bdc5a9f80cd8ccf1d65c248623b5e1d4) Signed-off-by: kalyanr <[email protected]> * remove spaces --------- Signed-off-by: kalyanr <[email protected]> Co-authored-by: Ephraim Anierobi <[email protected]> (cherry picked from commit bd19bf88a59ae4ec60f6d4dcd4f0fcd718eb9675) --- .../endpoints/task_instance_endpoint.py | 1 + airflow/api_connexion/openapi/v1.yaml | 9 ++++ airflow/www/static/js/types/api-generated.ts | 7 +++ .../endpoints/test_task_instance_endpoint.py | 50 ++++++++++++++++++++++ 4 files changed, 67 insertions(+) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index a9213d2c24..a79af61f69 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -453,6 +453,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse: ti_query = base_query.options( joinedload(TI.rendered_task_instance_fields), joinedload(TI.task_instance_note) ) + ti_query = ti_query.offset(data["page_offset"]).limit(data["page_limit"]) # using execute because we want the SlaMiss entity. Scalars don't return None for missing entities task_instances = session.execute(ti_query).all() diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 2f4d08fe72..09cfc7bc67 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -5097,6 +5097,15 @@ components: ListTaskInstanceForm: type: object properties: + page_offset: + type: integer + minimum: 0 + description: The number of items to skip before starting to collect the result set. + page_limit: + type: integer + minimum: 1 + default: 100 + description: The numbers of items to return. dag_ids: type: array items: diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index e55a744a47..fac52cf954 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -2233,6 +2233,13 @@ export interface components { end_date_lte?: string; }; ListTaskInstanceForm: { + /** @description The number of items to skip before starting to collect the result set. */ + page_offset?: number; + /** + * @description The numbers of items to return. + * @default 100 + */ + page_limit?: number; /** * @description Return objects with specific DAG IDs. * The value can be repeated to retrieve multiple matching values (OR condition). diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 6d04cbf398..330b69c386 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -1106,6 +1106,56 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): assert response.status_code == 400 assert expected in response.json["detail"] + def test_should_respond_200_for_pagination(self, session): + dag_id = "example_python_operator" + + self.create_task_instances( + session, + task_instances=[ + {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} for i in range(10) + ], + dag_id=dag_id, + ) + + # First 5 items + response_batch1 = self.client.post( + "/api/v1/dags/~/dagRuns/~/taskInstances/list", + environ_overrides={"REMOTE_USER": "test"}, + json={"page_limit": 5, "page_offset": 0}, + ) + assert response_batch1.status_code == 200, response_batch1.json + num_entries_batch1 = len(response_batch1.json["task_instances"]) + assert num_entries_batch1 == 5 + assert len(response_batch1.json["task_instances"]) == 5 + + # 5 items after that + response_batch2 = self.client.post( + "/api/v1/dags/~/dagRuns/~/taskInstances/list", + environ_overrides={"REMOTE_USER": "test"}, + json={"page_limit": 5, "page_offset": 5}, + ) + assert response_batch2.status_code == 200, response_batch2.json + num_entries_batch2 = len(response_batch2.json["task_instances"]) + assert num_entries_batch2 > 0 + assert len(response_batch2.json["task_instances"]) > 0 + + # Match + ti_count = 9 + assert response_batch1.json["total_entries"] == response_batch2.json["total_entries"] == ti_count + assert (num_entries_batch1 + num_entries_batch2) == ti_count + assert response_batch1 != response_batch2 + + # default limit and offset + response_batch3 = self.client.post( + "/api/v1/dags/~/dagRuns/~/taskInstances/list", + environ_overrides={"REMOTE_USER": "test"}, + json={}, + ) + + num_entries_batch3 = len(response_batch3.json["task_instances"]) + assert num_entries_batch3 == ti_count + assert len(response_batch3.json["task_instances"]) == ti_count + class TestPostClearTaskInstances(TestTaskInstanceEndpoint): @pytest.mark.parametrize(
