This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b00d30a187 AIP-84 Migrate GET connection to FastAPI API (#42674)
b00d30a187 is described below
commit b00d30a1875f9b358cd112720131593ca159efd2
Author: Kalyan <[email protected]>
AuthorDate: Fri Oct 4 12:56:36 2024 +0530
AIP-84 Migrate GET connection to FastAPI API (#42674)
* add get connection endpoint
* remove hooks folder
* add test
* add ConnectionResponse model
* add return type to get connection
* add alias to connection_id
* remove 200 status from args
* redact extra field
* add mode to field_validator
* add test for redaction in extra
* remove breakpoint commend
* enable redact for test
---
.../api_connexion/endpoints/connection_endpoint.py | 1 +
airflow/api_fastapi/openapi/v1-generated.yaml | 94 ++++++++++++++++++++++
airflow/api_fastapi/serializers/connections.py | 50 ++++++++++++
airflow/api_fastapi/views/public/connections.py | 18 +++++
airflow/ui/openapi-gen/queries/common.ts | 20 +++++
airflow/ui/openapi-gen/queries/prefetch.ts | 26 +++++-
airflow/ui/openapi-gen/queries/queries.ts | 29 +++++++
airflow/ui/openapi-gen/queries/suspense.ts | 35 +++++++-
airflow/ui/openapi-gen/requests/schemas.gen.ts | 92 +++++++++++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 28 +++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 45 +++++++++++
tests/api_fastapi/views/public/test_connections.py | 41 ++++++++++
12 files changed, 477 insertions(+), 2 deletions(-)
diff --git a/airflow/api_connexion/endpoints/connection_endpoint.py
b/airflow/api_connexion/endpoints/connection_endpoint.py
index b28c9dfcaf..7cea4cf983 100644
--- a/airflow/api_connexion/endpoints/connection_endpoint.py
+++ b/airflow/api_connexion/endpoints/connection_endpoint.py
@@ -75,6 +75,7 @@ def delete_connection(*, connection_id: str, session: Session
= NEW_SESSION) ->
return NoContent, HTTPStatus.NO_CONTENT
+@mark_fastapi_migration_done
@security.requires_access_connection("GET")
@provide_session
def get_connection(*, connection_id: str, session: Session = NEW_SESSION) ->
APIResponse:
diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml
b/airflow/api_fastapi/openapi/v1-generated.yaml
index 23c4ecf545..d272dd03d9 100644
--- a/airflow/api_fastapi/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/openapi/v1-generated.yaml
@@ -411,8 +411,102 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ get:
+ tags:
+ - Connection
+ summary: Get Connection
+ description: Get a connection entry.
+ operationId: get_connection
+ parameters:
+ - name: connection_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Connection Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
+ ConnectionResponse:
+ properties:
+ conn_id:
+ type: string
+ title: Conn Id
+ conn_type:
+ type: string
+ title: Conn Type
+ description:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Description
+ host:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Host
+ login:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Login
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Schema
+ port:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Port
+ extra:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Extra
+ type: object
+ required:
+ - conn_id
+ - conn_type
+ - description
+ - host
+ - login
+ - schema
+ - port
+ - extra
+ title: ConnectionResponse
+ description: Connection serializer for responses.
DAGCollectionResponse:
properties:
dags:
diff --git a/airflow/api_fastapi/serializers/connections.py
b/airflow/api_fastapi/serializers/connections.py
new file mode 100644
index 0000000000..e40b2fa1b2
--- /dev/null
+++ b/airflow/api_fastapi/serializers/connections.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+
+from pydantic import BaseModel, Field, field_validator
+
+from airflow.utils.log.secrets_masker import redact
+
+
+class ConnectionResponse(BaseModel):
+ """Connection serializer for responses."""
+
+ connection_id: str = Field(alias="conn_id")
+ conn_type: str
+ description: str | None
+ host: str | None
+ login: str | None
+ schema_: str | None = Field(alias="schema")
+ port: int | None
+ extra: str | None
+
+ @field_validator("extra", mode="before")
+ @classmethod
+ def redact_extra(cls, v: str | None) -> str | None:
+ if v is None:
+ return None
+ try:
+ extra_dict = json.loads(v)
+ redacted_dict = redact(extra_dict)
+ return json.dumps(redacted_dict)
+ except json.JSONDecodeError:
+ # we can't redact fields in an unstructured `extra`
+ return v
diff --git a/airflow/api_fastapi/views/public/connections.py
b/airflow/api_fastapi/views/public/connections.py
index d418e10026..850a017988 100644
--- a/airflow/api_fastapi/views/public/connections.py
+++ b/airflow/api_fastapi/views/public/connections.py
@@ -23,6 +23,7 @@ from typing_extensions import Annotated
from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.serializers.connections import ConnectionResponse
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.models import Connection
@@ -45,3 +46,20 @@ async def delete_connection(
raise HTTPException(404, f"The Connection with connection_id:
`{connection_id}` was not found")
session.delete(connection)
+
+
+@connections_router.get(
+ "/connections/{connection_id}",
+ responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_connection(
+ connection_id: str,
+ session: Annotated[Session, Depends(get_session)],
+) -> ConnectionResponse:
+ """Get a connection entry."""
+ connection =
session.scalar(select(Connection).filter_by(conn_id=connection_id))
+
+ if connection is None:
+ raise HTTPException(404, f"The Connection with connection_id:
`{connection_id}` was not found")
+
+ return ConnectionResponse.model_validate(connection, from_attributes=True)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index f5bc875a56..ff7bb3995c 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -90,6 +90,26 @@ export const UseDagServiceGetDagDetailsKeyFn = (
},
queryKey?: Array<unknown>,
) => [useDagServiceGetDagDetailsKey, ...(queryKey ?? [{ dagId }])];
+export type ConnectionServiceGetConnectionDefaultResponse = Awaited<
+ ReturnType<typeof ConnectionService.getConnection>
+>;
+export type ConnectionServiceGetConnectionQueryResult<
+ TData = ConnectionServiceGetConnectionDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useConnectionServiceGetConnectionKey =
+ "ConnectionServiceGetConnection";
+export const UseConnectionServiceGetConnectionKeyFn = (
+ {
+ connectionId,
+ }: {
+ connectionId: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useConnectionServiceGetConnectionKey,
+ ...(queryKey ?? [{ connectionId }]),
+];
export type DagServicePatchDagsMutationResult = Awaited<
ReturnType<typeof DagService.patchDags>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 210c4f7f77..cbb43cca3a 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1,7 +1,11 @@
// generated with @7nohe/[email protected]
import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, DagService } from "../requests/services.gen";
+import {
+ AssetService,
+ ConnectionService,
+ DagService,
+} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
import * as Common from "./common";
@@ -114,3 +118,23 @@ export const prefetchUseDagServiceGetDagDetails = (
queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }),
queryFn: () => DagService.getDagDetails({ dagId }),
});
+/**
+ * Get Connection
+ * Get a connection entry.
+ * @param data The data for the request.
+ * @param data.connectionId
+ * @returns ConnectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseConnectionServiceGetConnection = (
+ queryClient: QueryClient,
+ {
+ connectionId,
+ }: {
+ connectionId: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseConnectionServiceGetConnectionKeyFn({ connectionId }),
+ queryFn: () => ConnectionService.getConnection({ connectionId }),
+ });
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index f12a6504c8..4aa627d74f 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -144,6 +144,35 @@ export const useDagServiceGetDagDetails = <
queryFn: () => DagService.getDagDetails({ dagId }) as TData,
...options,
});
+/**
+ * Get Connection
+ * Get a connection entry.
+ * @param data The data for the request.
+ * @param data.connectionId
+ * @returns ConnectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useConnectionServiceGetConnection = <
+ TData = Common.ConnectionServiceGetConnectionDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ connectionId,
+ }: {
+ connectionId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseConnectionServiceGetConnectionKeyFn(
+ { connectionId },
+ queryKey,
+ ),
+ queryFn: () => ConnectionService.getConnection({ connectionId }) as TData,
+ ...options,
+ });
/**
* Patch Dags
* Patch multiple DAGs.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index d9d62d35e3..04d7eb94b3 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1,7 +1,11 @@
// generated with @7nohe/[email protected]
import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query";
-import { AssetService, DagService } from "../requests/services.gen";
+import {
+ AssetService,
+ ConnectionService,
+ DagService,
+} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
import * as Common from "./common";
@@ -135,3 +139,32 @@ export const useDagServiceGetDagDetailsSuspense = <
queryFn: () => DagService.getDagDetails({ dagId }) as TData,
...options,
});
+/**
+ * Get Connection
+ * Get a connection entry.
+ * @param data The data for the request.
+ * @param data.connectionId
+ * @returns ConnectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useConnectionServiceGetConnectionSuspense = <
+ TData = Common.ConnectionServiceGetConnectionDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ connectionId,
+ }: {
+ connectionId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseConnectionServiceGetConnectionKeyFn(
+ { connectionId },
+ queryKey,
+ ),
+ queryFn: () => ConnectionService.getConnection({ connectionId }) as TData,
+ ...options,
+ });
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e8aae616be..910354423b 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1,5 +1,97 @@
// This file is auto-generated by @hey-api/openapi-ts
+export const $ConnectionResponse = {
+ properties: {
+ conn_id: {
+ type: "string",
+ title: "Conn Id",
+ },
+ conn_type: {
+ type: "string",
+ title: "Conn Type",
+ },
+ description: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Description",
+ },
+ host: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Host",
+ },
+ login: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Login",
+ },
+ schema: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Schema",
+ },
+ port: {
+ anyOf: [
+ {
+ type: "integer",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Port",
+ },
+ extra: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Extra",
+ },
+ },
+ type: "object",
+ required: [
+ "conn_id",
+ "conn_type",
+ "description",
+ "host",
+ "login",
+ "schema",
+ "port",
+ "extra",
+ ],
+ title: "ConnectionResponse",
+ description: "Connection serializer for responses.",
+} as const;
+
export const $DAGCollectionResponse = {
properties: {
dags: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 0e91fa4165..023a2a458d 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -15,6 +15,8 @@ import type {
PatchDagResponse,
DeleteConnectionData,
DeleteConnectionResponse,
+ GetConnectionData,
+ GetConnectionResponse,
} from "./types.gen";
export class AssetService {
@@ -217,4 +219,30 @@ export class ConnectionService {
},
});
}
+
+ /**
+ * Get Connection
+ * Get a connection entry.
+ * @param data The data for the request.
+ * @param data.connectionId
+ * @returns ConnectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getConnection(
+ data: GetConnectionData,
+ ): CancelablePromise<GetConnectionResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/connections/{connection_id}",
+ path: {
+ connection_id: data.connectionId,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
}
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index b87a172363..65a2db8926 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1,5 +1,19 @@
// This file is auto-generated by @hey-api/openapi-ts
+/**
+ * Connection serializer for responses.
+ */
+export type ConnectionResponse = {
+ conn_id: string;
+ conn_type: string;
+ description: string | null;
+ host: string | null;
+ login: string | null;
+ schema: string | null;
+ port: number | null;
+ extra: string | null;
+};
+
/**
* DAG Collection serializer for responses.
*/
@@ -202,6 +216,12 @@ export type DeleteConnectionData = {
export type DeleteConnectionResponse = void;
+export type GetConnectionData = {
+ connectionId: string;
+};
+
+export type GetConnectionResponse = ConnectionResponse;
+
export type $OpenApiTs = {
"/ui/next_run_assets/{dag_id}": {
get: {
@@ -352,5 +372,30 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
+ get: {
+ req: GetConnectionData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: ConnectionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
};
};
diff --git a/tests/api_fastapi/views/public/test_connections.py
b/tests/api_fastapi/views/public/test_connections.py
index cfdca1d679..0b76fdbe4f 100644
--- a/tests/api_fastapi/views/public/test_connections.py
+++ b/tests/api_fastapi/views/public/test_connections.py
@@ -61,3 +61,44 @@ class TestDeleteConnection(TestConnectionEndpoint):
assert response.status_code == 404
body = response.json()
assert f"The Connection with connection_id: `{TEST_CONN_ID}` was not
found" == body["detail"]
+
+
+class TestGetConnection(TestConnectionEndpoint):
+ def test_get_should_respond_200(self, test_client, session):
+ self.create_connection()
+ response = test_client.get(f"/public/connections/{TEST_CONN_ID}")
+ assert response.status_code == 200
+ body = response.json()
+ assert body["conn_id"] == TEST_CONN_ID
+ assert body["conn_type"] == TEST_CONN_TYPE
+
+ def test_get_should_respond_404(self, test_client):
+ response = test_client.get(f"/public/connections/{TEST_CONN_ID}")
+ assert response.status_code == 404
+ body = response.json()
+ assert f"The Connection with connection_id: `{TEST_CONN_ID}` was not
found" == body["detail"]
+
+ def test_get_should_respond_200_with_extra(self, test_client, session):
+ self.create_connection()
+ connection = session.query(Connection).first()
+ connection.extra = '{"extra_key": "extra_value"}'
+ session.commit()
+ response = test_client.get(f"/public/connections/{TEST_CONN_ID}")
+ assert response.status_code == 200
+ body = response.json()
+ assert body["conn_id"] == TEST_CONN_ID
+ assert body["conn_type"] == TEST_CONN_TYPE
+ assert body["extra"] == '{"extra_key": "extra_value"}'
+
+ @pytest.mark.enable_redact
+ def test_get_should_respond_200_with_extra_redacted(self, test_client,
session):
+ self.create_connection()
+ connection = session.query(Connection).first()
+ connection.extra = '{"password": "test-password"}'
+ session.commit()
+ response = test_client.get(f"/public/connections/{TEST_CONN_ID}")
+ assert response.status_code == 200
+ body = response.json()
+ assert body["conn_id"] == TEST_CONN_ID
+ assert body["conn_type"] == TEST_CONN_TYPE
+ assert body["extra"] == '{"password": "***"}'