This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b44a889f19 AIP-84 Get Pools (#43223)
b44a889f19 is described below
commit b44a889f19dcf9d6b47636feea14ef23378f85bb
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Oct 22 22:34:02 2024 +0800
AIP-84 Get Pools (#43223)
---
airflow/api_connexion/endpoints/pool_endpoint.py | 1 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 78 +++++++++++++++++++++-
.../api_fastapi/core_api/routes/public/pools.py | 36 +++++++++-
.../core_api/serializers/connections.py | 2 +-
airflow/api_fastapi/core_api/serializers/pools.py | 9 ++-
airflow/ui/openapi-gen/queries/common.ts | 20 ++++++
airflow/ui/openapi-gen/queries/prefetch.ts | 26 ++++++++
airflow/ui/openapi-gen/queries/queries.ts | 35 ++++++++++
airflow/ui/openapi-gen/queries/suspense.ts | 35 ++++++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 22 +++++-
airflow/ui/openapi-gen/requests/services.gen.ts | 32 +++++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 45 ++++++++++++-
.../core_api/routes/public/test_connections.py | 2 +-
.../core_api/routes/public/test_pools.py | 31 ++++++++-
14 files changed, 363 insertions(+), 11 deletions(-)
diff --git a/airflow/api_connexion/endpoints/pool_endpoint.py
b/airflow/api_connexion/endpoints/pool_endpoint.py
index 1f39297c36..2e62ce0f3d 100644
--- a/airflow/api_connexion/endpoints/pool_endpoint.py
+++ b/airflow/api_connexion/endpoints/pool_endpoint.py
@@ -66,6 +66,7 @@ def get_pool(*, pool_name: str, session: Session =
NEW_SESSION) -> APIResponse:
return pool_schema.dump(obj)
+@mark_fastapi_migration_done
@security.requires_access_pool("GET")
@format_parameters({"limit": check_limit})
@provide_session
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 3dc9f35443..0f862e496e 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1163,6 +1163,66 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/pools/:
+ get:
+ tags:
+ - Pool
+ summary: Get Pools
+ description: Get all pools entries.
+ operationId: get_pools
+ parameters:
+ - name: limit
+ in: query
+ required: false
+ schema:
+ type: integer
+ default: 100
+ title: Limit
+ - name: offset
+ in: query
+ required: false
+ schema:
+ type: integer
+ default: 0
+ title: Offset
+ - name: order_by
+ in: query
+ required: false
+ schema:
+ type: string
+ default: id
+ title: Order By
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/PoolCollectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/providers/:
get:
tags:
@@ -1227,7 +1287,7 @@ components:
- connections
- total_entries
title: ConnectionCollectionResponse
- description: DAG Collection serializer for responses.
+ description: Connection Collection serializer for responses.
ConnectionResponse:
properties:
connection_id:
@@ -1960,6 +2020,22 @@ components:
- task_instance_states
title: HistoricalMetricDataResponse
description: Historical Metric Data serializer for responses.
+ PoolCollectionResponse:
+ properties:
+ pools:
+ items:
+ $ref: '#/components/schemas/PoolResponse'
+ type: array
+ title: Pools
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - pools
+ - total_entries
+ title: PoolCollectionResponse
+ description: Pool Collection serializer for responses.
PoolResponse:
properties:
name:
diff --git a/airflow/api_fastapi/core_api/routes/public/pools.py
b/airflow/api_fastapi/core_api/routes/public/pools.py
index 0d27f842b1..0f5329a1cc 100644
--- a/airflow/api_fastapi/core_api/routes/public/pools.py
+++ b/airflow/api_fastapi/core_api/routes/public/pools.py
@@ -21,10 +21,11 @@ from sqlalchemy import delete, select
from sqlalchemy.orm import Session
from typing_extensions import Annotated
-from airflow.api_fastapi.common.db.common import get_session
+from airflow.api_fastapi.common.db.common import get_session, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset,
SortParam
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
-from airflow.api_fastapi.core_api.serializers.pools import PoolResponse
+from airflow.api_fastapi.core_api.serializers.pools import
PoolCollectionResponse, PoolResponse
from airflow.models.pool import Pool
pools_router = AirflowRouter(tags=["Pool"], prefix="/pools")
@@ -63,3 +64,34 @@ async def get_pool(
raise HTTPException(404, f"The Pool with name: `{pool_name}` was not
found")
return PoolResponse.model_validate(pool, from_attributes=True)
+
+
+@pools_router.get(
+ "/",
+ responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_pools(
+ limit: QueryLimit,
+ offset: QueryOffset,
+ order_by: Annotated[
+ SortParam,
+ Depends(SortParam(["id", "name"], Pool).dynamic_depends()),
+ ],
+ session: Annotated[Session, Depends(get_session)],
+) -> PoolCollectionResponse:
+ """Get all pools entries."""
+ pools_select, total_entries = paginated_select(
+ select(Pool),
+ [],
+ order_by=order_by,
+ offset=offset,
+ limit=limit,
+ session=session,
+ )
+
+ pools = session.scalars(pools_select).all()
+
+ return PoolCollectionResponse(
+ pools=[PoolResponse.model_validate(pool, from_attributes=True) for
pool in pools],
+ total_entries=total_entries,
+ )
diff --git a/airflow/api_fastapi/core_api/serializers/connections.py
b/airflow/api_fastapi/core_api/serializers/connections.py
index 1c80160729..1cc069cac0 100644
--- a/airflow/api_fastapi/core_api/serializers/connections.py
+++ b/airflow/api_fastapi/core_api/serializers/connections.py
@@ -51,7 +51,7 @@ class ConnectionResponse(BaseModel):
class ConnectionCollectionResponse(BaseModel):
- """DAG Collection serializer for responses."""
+ """Connection Collection serializer for responses."""
connections: list[ConnectionResponse]
total_entries: int
diff --git a/airflow/api_fastapi/core_api/serializers/pools.py
b/airflow/api_fastapi/core_api/serializers/pools.py
index e0b03fd8c1..4bfa7137f1 100644
--- a/airflow/api_fastapi/core_api/serializers/pools.py
+++ b/airflow/api_fastapi/core_api/serializers/pools.py
@@ -34,7 +34,7 @@ def _call_function(function: Callable[[], int]) -> int:
class PoolResponse(BaseModel):
"""Pool serializer for responses."""
- pool: str = Field(serialization_alias="name")
+ pool: str = Field(serialization_alias="name", validation_alias="pool")
slots: int
description: str | None
include_deferred: bool
@@ -45,3 +45,10 @@ class PoolResponse(BaseModel):
scheduled_slots: Annotated[int, BeforeValidator(_call_function)]
open_slots: Annotated[int, BeforeValidator(_call_function)]
deferred_slots: Annotated[int, BeforeValidator(_call_function)]
+
+
+class PoolCollectionResponse(BaseModel):
+ """Pool Collection serializer for responses."""
+
+ pools: list[PoolResponse]
+ total_entries: int
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index ae21f16a8d..41b1ff86f1 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -288,6 +288,26 @@ export const UsePoolServiceGetPoolKeyFn = (
},
queryKey?: Array<unknown>,
) => [usePoolServiceGetPoolKey, ...(queryKey ?? [{ poolName }])];
+export type PoolServiceGetPoolsDefaultResponse = Awaited<
+ ReturnType<typeof PoolService.getPools>
+>;
+export type PoolServiceGetPoolsQueryResult<
+ TData = PoolServiceGetPoolsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const usePoolServiceGetPoolsKey = "PoolServiceGetPools";
+export const UsePoolServiceGetPoolsKeyFn = (
+ {
+ limit,
+ offset,
+ orderBy,
+ }: {
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ } = {},
+ queryKey?: Array<unknown>,
+) => [usePoolServiceGetPoolsKey, ...(queryKey ?? [{ limit, offset, orderBy
}])];
export type ProviderServiceGetProvidersDefaultResponse = Awaited<
ReturnType<typeof ProviderService.getProviders>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index d352f29047..2e2e9b0e3c 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -357,6 +357,32 @@ export const prefetchUsePoolServiceGetPool = (
queryKey: Common.UsePoolServiceGetPoolKeyFn({ poolName }),
queryFn: () => PoolService.getPool({ poolName }),
});
+/**
+ * Get Pools
+ * Get all pools entries.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @returns PoolCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUsePoolServiceGetPools = (
+ queryClient: QueryClient,
+ {
+ limit,
+ offset,
+ orderBy,
+ }: {
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ } = {},
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UsePoolServiceGetPoolsKeyFn({ limit, offset, orderBy }),
+ queryFn: () => PoolService.getPools({ limit, offset, orderBy }),
+ });
/**
* Get Providers
* Get providers.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 0cc21b2ee8..5e05eab973 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -458,6 +458,41 @@ export const usePoolServiceGetPool = <
queryFn: () => PoolService.getPool({ poolName }) as TData,
...options,
});
+/**
+ * Get Pools
+ * Get all pools entries.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @returns PoolCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const usePoolServiceGetPools = <
+ TData = Common.PoolServiceGetPoolsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ limit,
+ offset,
+ orderBy,
+ }: {
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ } = {},
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UsePoolServiceGetPoolsKeyFn(
+ { limit, offset, orderBy },
+ queryKey,
+ ),
+ queryFn: () => PoolService.getPools({ limit, offset, orderBy }) as TData,
+ ...options,
+ });
/**
* Get Providers
* Get providers.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index c4f05bf5d8..8f032bacd4 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -453,6 +453,41 @@ export const usePoolServiceGetPoolSuspense = <
queryFn: () => PoolService.getPool({ poolName }) as TData,
...options,
});
+/**
+ * Get Pools
+ * Get all pools entries.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @returns PoolCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const usePoolServiceGetPoolsSuspense = <
+ TData = Common.PoolServiceGetPoolsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ limit,
+ offset,
+ orderBy,
+ }: {
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ } = {},
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UsePoolServiceGetPoolsKeyFn(
+ { limit, offset, orderBy },
+ queryKey,
+ ),
+ queryFn: () => PoolService.getPools({ limit, offset, orderBy }) as TData,
+ ...options,
+ });
/**
* Get Providers
* Get providers.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e8607e24cd..1556ae2541 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -37,7 +37,7 @@ export const $ConnectionCollectionResponse = {
type: "object",
required: ["connections", "total_entries"],
title: "ConnectionCollectionResponse",
- description: "DAG Collection serializer for responses.",
+ description: "Connection Collection serializer for responses.",
} as const;
export const $ConnectionResponse = {
@@ -1189,6 +1189,26 @@ export const $HistoricalMetricDataResponse = {
description: "Historical Metric Data serializer for responses.",
} as const;
+export const $PoolCollectionResponse = {
+ properties: {
+ pools: {
+ items: {
+ $ref: "#/components/schemas/PoolResponse",
+ },
+ type: "array",
+ title: "Pools",
+ },
+ total_entries: {
+ type: "integer",
+ title: "Total Entries",
+ },
+ },
+ type: "object",
+ required: ["pools", "total_entries"],
+ title: "PoolCollectionResponse",
+ description: "Pool Collection serializer for responses.",
+} as const;
+
export const $PoolResponse = {
properties: {
name: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index c77e039bb8..45c5c98526 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -46,6 +46,8 @@ import type {
DeletePoolResponse,
GetPoolData,
GetPoolResponse,
+ GetPoolsData,
+ GetPoolsResponse,
GetProvidersData,
GetProvidersResponse,
} from "./types.gen";
@@ -683,6 +685,36 @@ export class PoolService {
},
});
}
+
+ /**
+ * Get Pools
+ * Get all pools entries.
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @param data.orderBy
+ * @returns PoolCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getPools(
+ data: GetPoolsData = {},
+ ): CancelablePromise<GetPoolsResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/pools/",
+ query: {
+ limit: data.limit,
+ offset: data.offset,
+ order_by: data.orderBy,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
}
export class ProviderService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 41b6f92350..cf70c15ed6 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -8,7 +8,7 @@ export type BaseInfoSchema = {
};
/**
- * DAG Collection serializer for responses.
+ * Connection Collection serializer for responses.
*/
export type ConnectionCollectionResponse = {
connections: Array<ConnectionResponse>;
@@ -266,6 +266,14 @@ export type HistoricalMetricDataResponse = {
task_instance_states: TaskInstanceState;
};
+/**
+ * Pool Collection serializer for responses.
+ */
+export type PoolCollectionResponse = {
+ pools: Array<PoolResponse>;
+ total_entries: number;
+};
+
/**
* Pool serializer for responses.
*/
@@ -528,6 +536,14 @@ export type GetPoolData = {
export type GetPoolResponse = PoolResponse;
+export type GetPoolsData = {
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+};
+
+export type GetPoolsResponse = PoolCollectionResponse;
+
export type GetProvidersData = {
limit?: number;
offset?: number;
@@ -1085,6 +1101,33 @@ export type $OpenApiTs = {
};
};
};
+ "/public/pools/": {
+ get: {
+ req: GetPoolsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: PoolCollectionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/providers/": {
get: {
req: GetProvidersData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_connections.py
b/tests/api_fastapi/core_api/routes/public/test_connections.py
index 8c2e3b6d58..ee9c80219e 100644
--- a/tests/api_fastapi/core_api/routes/public/test_connections.py
+++ b/tests/api_fastapi/core_api/routes/public/test_connections.py
@@ -168,4 +168,4 @@ class TestGetConnections(TestConnectionEndpoint):
body = response.json()
assert body["total_entries"] == expected_total_entries
- assert [dag["connection_id"] for dag in body["connections"]] ==
expected_ids
+ assert [connection["connection_id"] for connection in
body["connections"]] == expected_ids
diff --git a/tests/api_fastapi/core_api/routes/public/test_pools.py
b/tests/api_fastapi/core_api/routes/public/test_pools.py
index d75ad6e417..e97f85b95d 100644
--- a/tests/api_fastapi/core_api/routes/public/test_pools.py
+++ b/tests/api_fastapi/core_api/routes/public/test_pools.py
@@ -43,7 +43,7 @@ def _create_pools(session) -> None:
session.add_all([pool1, pool2])
-class TestPools:
+class TestPoolsEndpoint:
@pytest.fixture(autouse=True)
def setup(self) -> None:
clear_db_pools()
@@ -55,7 +55,7 @@ class TestPools:
_create_pools()
-class TestDeletePool(TestPools):
+class TestDeletePool(TestPoolsEndpoint):
def test_delete_should_respond_204(self, test_client, session):
self.create_pools()
pools = session.query(Pool).all()
@@ -78,7 +78,7 @@ class TestDeletePool(TestPools):
assert f"The Pool with name: `{POOL1_NAME}` was not found" ==
body["detail"]
-class TestGetPool(TestPools):
+class TestGetPool(TestPoolsEndpoint):
def test_get_should_respond_200(self, test_client, session):
self.create_pools()
response = test_client.get(f"/public/pools/{POOL1_NAME}")
@@ -101,3 +101,28 @@ class TestGetPool(TestPools):
assert response.status_code == 404
body = response.json()
assert f"The Pool with name: `{POOL1_NAME}` was not found" ==
body["detail"]
+
+
+class TestGetPools(TestPoolsEndpoint):
+ @pytest.mark.parametrize(
+ "query_params, expected_total_entries, expected_ids",
+ [
+ # Filters
+ ({}, 3, [Pool.DEFAULT_POOL_NAME, POOL1_NAME, POOL2_NAME]),
+ ({"limit": 1}, 3, [Pool.DEFAULT_POOL_NAME]),
+ ({"limit": 1, "offset": 1}, 3, [POOL1_NAME]),
+ # Sort
+ ({"order_by": "-id"}, 3, [POOL2_NAME, POOL1_NAME,
Pool.DEFAULT_POOL_NAME]),
+ ({"order_by": "id"}, 3, [Pool.DEFAULT_POOL_NAME, POOL1_NAME,
POOL2_NAME]),
+ ],
+ )
+ def test_should_respond_200(
+ self, test_client, session, query_params, expected_total_entries,
expected_ids
+ ):
+ self.create_pools()
+ response = test_client.get("/public/pools/", params=query_params)
+ assert response.status_code == 200
+
+ body = response.json()
+ assert body["total_entries"] == expected_total_entries
+ assert [pool["name"] for pool in body["pools"]] == expected_ids