This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e213178e0d AIP-84 Get Pool (#43221)
e213178e0d is described below
commit e213178e0ddc384b3e4cc053ca723f47c9f1125f
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Oct 22 16:07:47 2024 +0800
AIP-84 Get Pool (#43221)
---
airflow/api_connexion/endpoints/pool_endpoint.py | 1 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 92 ++++++++++++++++++++++
.../api_fastapi/core_api/routes/public/pools.py | 19 ++++-
airflow/api_fastapi/core_api/serializers/pools.py | 47 +++++++++++
airflow/ui/openapi-gen/queries/common.ts | 16 ++++
airflow/ui/openapi-gen/queries/prefetch.ts | 21 +++++
airflow/ui/openapi-gen/queries/queries.ts | 26 ++++++
airflow/ui/openapi-gen/queries/suspense.ts | 27 +++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 67 ++++++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 26 ++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 47 +++++++++++
.../core_api/routes/public/test_pools.py | 25 ++++++
12 files changed, 413 insertions(+), 1 deletion(-)
diff --git a/airflow/api_connexion/endpoints/pool_endpoint.py
b/airflow/api_connexion/endpoints/pool_endpoint.py
index 6ea28f7457..1f39297c36 100644
--- a/airflow/api_connexion/endpoints/pool_endpoint.py
+++ b/airflow/api_connexion/endpoints/pool_endpoint.py
@@ -55,6 +55,7 @@ def delete_pool(*, pool_name: str, session: Session =
NEW_SESSION) -> APIRespons
return Response(status=HTTPStatus.NO_CONTENT)
+@mark_fastapi_migration_done
@security.requires_access_pool("GET")
@provide_session
def get_pool(*, pool_name: str, session: Session = NEW_SESSION) -> APIResponse:
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index ebe209334d..3dc9f35443 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1119,6 +1119,50 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ get:
+ tags:
+ - Pool
+ summary: Get Pool
+ description: Get a pool.
+ operationId: get_pool
+ parameters:
+ - name: pool_name
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Pool Name
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/PoolResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/providers/:
get:
tags:
@@ -1916,6 +1960,54 @@ components:
- task_instance_states
title: HistoricalMetricDataResponse
description: Historical Metric Data serializer for responses.
+ PoolResponse:
+ properties:
+ name:
+ type: string
+ title: Name
+ slots:
+ type: integer
+ title: Slots
+ description:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Description
+ include_deferred:
+ type: boolean
+ title: Include Deferred
+ occupied_slots:
+ type: integer
+ title: Occupied Slots
+ running_slots:
+ type: integer
+ title: Running Slots
+ queued_slots:
+ type: integer
+ title: Queued Slots
+ scheduled_slots:
+ type: integer
+ title: Scheduled Slots
+ open_slots:
+ type: integer
+ title: Open Slots
+ deferred_slots:
+ type: integer
+ title: Deferred Slots
+ type: object
+ required:
+ - name
+ - slots
+ - description
+ - include_deferred
+ - occupied_slots
+ - running_slots
+ - queued_slots
+ - scheduled_slots
+ - open_slots
+ - deferred_slots
+ title: PoolResponse
+ description: Pool serializer for responses.
ProviderCollectionResponse:
properties:
providers:
diff --git a/airflow/api_fastapi/core_api/routes/public/pools.py
b/airflow/api_fastapi/core_api/routes/public/pools.py
index 2b4ffc0263..0d27f842b1 100644
--- a/airflow/api_fastapi/core_api/routes/public/pools.py
+++ b/airflow/api_fastapi/core_api/routes/public/pools.py
@@ -17,13 +17,14 @@
from __future__ import annotations
from fastapi import Depends, HTTPException
-from sqlalchemy import delete
+from sqlalchemy import delete, select
from sqlalchemy.orm import Session
from typing_extensions import Annotated
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.serializers.pools import PoolResponse
from airflow.models.pool import Pool
pools_router = AirflowRouter(tags=["Pool"], prefix="/pools")
@@ -46,3 +47,19 @@ async def delete_pool(
if affected_count == 0:
raise HTTPException(404, f"The Pool with name: `{pool_name}` was not
found")
+
+
+@pools_router.get(
+ "/{pool_name}",
+ responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_pool(
+ pool_name: str,
+ session: Annotated[Session, Depends(get_session)],
+) -> PoolResponse:
+ """Get a pool."""
+ pool = session.scalar(select(Pool).where(Pool.pool == pool_name))
+ if pool is None:
+ raise HTTPException(404, f"The Pool with name: `{pool_name}` was not
found")
+
+ return PoolResponse.model_validate(pool, from_attributes=True)
diff --git a/airflow/api_fastapi/core_api/serializers/pools.py
b/airflow/api_fastapi/core_api/serializers/pools.py
new file mode 100644
index 0000000000..e0b03fd8c1
--- /dev/null
+++ b/airflow/api_fastapi/core_api/serializers/pools.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Annotated, Callable
+
+from pydantic import BaseModel, BeforeValidator, Field
+
+
+def _call_function(function: Callable[[], int]) -> int:
+ """
+ Call the given function.
+
+ Used for the BeforeValidator to get the actual values from the bound
method.
+ """
+ return function()
+
+
+class PoolResponse(BaseModel):
+ """Pool serializer for responses."""
+
+ pool: str = Field(serialization_alias="name")
+ slots: int
+ description: str | None
+ include_deferred: bool
+
+ occupied_slots: Annotated[int, BeforeValidator(_call_function)]
+ running_slots: Annotated[int, BeforeValidator(_call_function)]
+ queued_slots: Annotated[int, BeforeValidator(_call_function)]
+ scheduled_slots: Annotated[int, BeforeValidator(_call_function)]
+ open_slots: Annotated[int, BeforeValidator(_call_function)]
+ deferred_slots: Annotated[int, BeforeValidator(_call_function)]
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 5e950de844..ae21f16a8d 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -272,6 +272,22 @@ export const UseMonitorServiceGetHealthKeyFn = (queryKey?:
Array<unknown>) => [
useMonitorServiceGetHealthKey,
...(queryKey ?? []),
];
+export type PoolServiceGetPoolDefaultResponse = Awaited<
+ ReturnType<typeof PoolService.getPool>
+>;
+export type PoolServiceGetPoolQueryResult<
+ TData = PoolServiceGetPoolDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const usePoolServiceGetPoolKey = "PoolServiceGetPool";
+export const UsePoolServiceGetPoolKeyFn = (
+ {
+ poolName,
+ }: {
+ poolName: string;
+ },
+ queryKey?: Array<unknown>,
+) => [usePoolServiceGetPoolKey, ...(queryKey ?? [{ poolName }])];
export type ProviderServiceGetProvidersDefaultResponse = Awaited<
ReturnType<typeof ProviderService.getProviders>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 8807201d42..d352f29047 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -8,6 +8,7 @@ import {
DagService,
DashboardService,
MonitorService,
+ PoolService,
ProviderService,
VariableService,
} from "../requests/services.gen";
@@ -336,6 +337,26 @@ export const prefetchUseMonitorServiceGetHealth =
(queryClient: QueryClient) =>
queryKey: Common.UseMonitorServiceGetHealthKeyFn(),
queryFn: () => MonitorService.getHealth(),
});
+/**
+ * Get Pool
+ * Get a pool.
+ * @param data The data for the request.
+ * @param data.poolName
+ * @returns PoolResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUsePoolServiceGetPool = (
+ queryClient: QueryClient,
+ {
+ poolName,
+ }: {
+ poolName: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UsePoolServiceGetPoolKeyFn({ poolName }),
+ queryFn: () => PoolService.getPool({ poolName }),
+ });
/**
* Get Providers
* Get providers.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index ac6939d9f6..0cc21b2ee8 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -432,6 +432,32 @@ export const useMonitorServiceGetHealth = <
queryFn: () => MonitorService.getHealth() as TData,
...options,
});
+/**
+ * Get Pool
+ * Get a pool.
+ * @param data The data for the request.
+ * @param data.poolName
+ * @returns PoolResponse Successful Response
+ * @throws ApiError
+ */
+export const usePoolServiceGetPool = <
+ TData = Common.PoolServiceGetPoolDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ poolName,
+ }: {
+ poolName: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UsePoolServiceGetPoolKeyFn({ poolName }, queryKey),
+ queryFn: () => PoolService.getPool({ poolName }) as TData,
+ ...options,
+ });
/**
* Get Providers
* Get providers.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 1e49741889..c4f05bf5d8 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -8,6 +8,7 @@ import {
DagService,
DashboardService,
MonitorService,
+ PoolService,
ProviderService,
VariableService,
} from "../requests/services.gen";
@@ -426,6 +427,32 @@ export const useMonitorServiceGetHealthSuspense = <
queryFn: () => MonitorService.getHealth() as TData,
...options,
});
+/**
+ * Get Pool
+ * Get a pool.
+ * @param data The data for the request.
+ * @param data.poolName
+ * @returns PoolResponse Successful Response
+ * @throws ApiError
+ */
+export const usePoolServiceGetPoolSuspense = <
+ TData = Common.PoolServiceGetPoolDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ poolName,
+ }: {
+ poolName: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UsePoolServiceGetPoolKeyFn({ poolName }, queryKey),
+ queryFn: () => PoolService.getPool({ poolName }) as TData,
+ ...options,
+ });
/**
* Get Providers
* Get providers.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 79a55e7b08..e8607e24cd 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1189,6 +1189,73 @@ export const $HistoricalMetricDataResponse = {
description: "Historical Metric Data serializer for responses.",
} as const;
+export const $PoolResponse = {
+ properties: {
+ name: {
+ type: "string",
+ title: "Name",
+ },
+ slots: {
+ type: "integer",
+ title: "Slots",
+ },
+ description: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Description",
+ },
+ include_deferred: {
+ type: "boolean",
+ title: "Include Deferred",
+ },
+ occupied_slots: {
+ type: "integer",
+ title: "Occupied Slots",
+ },
+ running_slots: {
+ type: "integer",
+ title: "Running Slots",
+ },
+ queued_slots: {
+ type: "integer",
+ title: "Queued Slots",
+ },
+ scheduled_slots: {
+ type: "integer",
+ title: "Scheduled Slots",
+ },
+ open_slots: {
+ type: "integer",
+ title: "Open Slots",
+ },
+ deferred_slots: {
+ type: "integer",
+ title: "Deferred Slots",
+ },
+ },
+ type: "object",
+ required: [
+ "name",
+ "slots",
+ "description",
+ "include_deferred",
+ "occupied_slots",
+ "running_slots",
+ "queued_slots",
+ "scheduled_slots",
+ "open_slots",
+ "deferred_slots",
+ ],
+ title: "PoolResponse",
+ description: "Pool serializer for responses.",
+} as const;
+
export const $ProviderCollectionResponse = {
properties: {
providers: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index d6f46b283f..c77e039bb8 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -44,6 +44,8 @@ import type {
GetHealthResponse,
DeletePoolData,
DeletePoolResponse,
+ GetPoolData,
+ GetPoolResponse,
GetProvidersData,
GetProvidersResponse,
} from "./types.gen";
@@ -657,6 +659,30 @@ export class PoolService {
},
});
}
+
+ /**
+ * Get Pool
+ * Get a pool.
+ * @param data The data for the request.
+ * @param data.poolName
+ * @returns PoolResponse Successful Response
+ * @throws ApiError
+ */
+ public static getPool(data: GetPoolData): CancelablePromise<GetPoolResponse>
{
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/pools/{pool_name}",
+ path: {
+ pool_name: data.poolName,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
}
export class ProviderService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 83030dfb0d..41b6f92350 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -266,6 +266,22 @@ export type HistoricalMetricDataResponse = {
task_instance_states: TaskInstanceState;
};
+/**
+ * Pool serializer for responses.
+ */
+export type PoolResponse = {
+ name: string;
+ slots: number;
+ description: string | null;
+ include_deferred: boolean;
+ occupied_slots: number;
+ running_slots: number;
+ queued_slots: number;
+ scheduled_slots: number;
+ open_slots: number;
+ deferred_slots: number;
+};
+
/**
* Provider Collection serializer for responses.
*/
@@ -506,6 +522,12 @@ export type DeletePoolData = {
export type DeletePoolResponse = void;
+export type GetPoolData = {
+ poolName: string;
+};
+
+export type GetPoolResponse = PoolResponse;
+
export type GetProvidersData = {
limit?: number;
offset?: number;
@@ -1037,6 +1059,31 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
+ get: {
+ req: GetPoolData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: PoolResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
};
"/public/providers/": {
get: {
diff --git a/tests/api_fastapi/core_api/routes/public/test_pools.py
b/tests/api_fastapi/core_api/routes/public/test_pools.py
index 4a5c511b8d..d75ad6e417 100644
--- a/tests/api_fastapi/core_api/routes/public/test_pools.py
+++ b/tests/api_fastapi/core_api/routes/public/test_pools.py
@@ -76,3 +76,28 @@ class TestDeletePool(TestPools):
assert response.status_code == 404
body = response.json()
assert f"The Pool with name: `{POOL1_NAME}` was not found" ==
body["detail"]
+
+
+class TestGetPool(TestPools):
+ def test_get_should_respond_200(self, test_client, session):
+ self.create_pools()
+ response = test_client.get(f"/public/pools/{POOL1_NAME}")
+ assert response.status_code == 200
+ assert response.json() == {
+ "deferred_slots": 0,
+ "description": None,
+ "include_deferred": True,
+ "name": "pool1",
+ "occupied_slots": 0,
+ "open_slots": 3,
+ "queued_slots": 0,
+ "running_slots": 0,
+ "scheduled_slots": 0,
+ "slots": 3,
+ }
+
+ def test_get_should_respond_404(self, test_client):
+ response = test_client.get(f"/public/pools/{POOL1_NAME}")
+ assert response.status_code == 404
+ body = response.json()
+ assert f"The Pool with name: `{POOL1_NAME}` was not found" ==
body["detail"]