This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a18bcd7a1b5 AIP-84 Migrate XCom get entries endpoint to Fastapi
(#44366)
a18bcd7a1b5 is described below
commit a18bcd7a1b54062d248c025d8a8148d4e859a9ae
Author: michaeljs-c <[email protected]>
AuthorDate: Tue Nov 26 20:21:09 2024 +0000
AIP-84 Migrate XCom get entries endpoint to Fastapi (#44366)
---
airflow/api_connexion/endpoints/xcom_endpoint.py | 1 +
airflow/api_fastapi/core_api/datamodels/xcom.py | 7 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 336 +++++++++++++++------
.../api_fastapi/core_api/routes/public/__init__.py | 2 +-
airflow/api_fastapi/core_api/routes/public/xcom.py | 54 +++-
airflow/ui/openapi-gen/queries/common.ts | 99 ++++--
airflow/ui/openapi-gen/queries/prefetch.ts | 167 ++++++----
airflow/ui/openapi-gen/queries/queries.ts | 170 +++++++----
airflow/ui/openapi-gen/queries/suspense.ts | 170 +++++++----
airflow/ui/openapi-gen/requests/schemas.gen.ts | 62 ++++
airflow/ui/openapi-gen/requests/services.gen.ts | 135 ++++++---
airflow/ui/openapi-gen/requests/types.gen.ts | 149 ++++++---
.../core_api/routes/public/test_xcom.py | 297 +++++++++++++++++-
13 files changed, 1256 insertions(+), 393 deletions(-)
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index c86617391ab..cb3faf7379e 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -45,6 +45,7 @@ if TYPE_CHECKING:
from airflow.api_connexion.types import APIResponse
+@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.XCOM)
@format_parameters({"limit": check_limit})
@provide_session
diff --git a/airflow/api_fastapi/core_api/datamodels/xcom.py
b/airflow/api_fastapi/core_api/datamodels/xcom.py
index 370aa651cb2..4e3c6f54a7a 100644
--- a/airflow/api_fastapi/core_api/datamodels/xcom.py
+++ b/airflow/api_fastapi/core_api/datamodels/xcom.py
@@ -49,3 +49,10 @@ class XComResponseString(XComResponse):
@field_validator("value", mode="before")
def value_to_string(cls, v):
return str(v) if v is not None else None
+
+
+class XComCollection(BaseModel):
+ """List of XCom items."""
+
+ xcom_entries: list[XComResponse]
+ total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 2610dbcf612..c53f68a8438 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -3651,6 +3651,200 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
+ get:
+ tags:
+ - XCom
+ summary: Get Xcom Entry
+ description: Get an XCom entry.
+ operationId: get_xcom_entry
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: task_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Task Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ - name: xcom_key
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Xcom Key
+ - name: map_index
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: -1
+ default: -1
+ title: Map Index
+ - name: deserialize
+ in: query
+ required: false
+ schema:
+ type: boolean
+ default: false
+ title: Deserialize
+ - name: stringify
+ in: query
+ required: false
+ schema:
+ type: boolean
+ default: true
+ title: Stringify
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ anyOf:
+ - $ref: '#/components/schemas/XComResponseNative'
+ - $ref: '#/components/schemas/XComResponseString'
+ title: Response Get Xcom Entry
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries:
+ get:
+ tags:
+ - XCom
+ summary: Get Xcom Entries
+ description: 'Get all XCom entries.
+
+
+ This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id
to
+ retrieve XCom entries for all DAGs.'
+ operationId: get_xcom_entries
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ - name: task_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Task Id
+ - name: xcom_key
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Xcom Key
+ - name: map_index
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: integer
+ minimum: -1
+ - type: 'null'
+ title: Map Index
+ - name: limit
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 100
+ title: Limit
+ - name: offset
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 0
+ title: Offset
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/XComCollection'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}:
get:
tags:
@@ -5199,100 +5393,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
-
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
- get:
- tags:
- - XCom
- summary: Get Xcom Entry
- description: Get an XCom entry.
- operationId: get_xcom_entry
- parameters:
- - name: dag_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Id
- - name: task_id
- in: path
- required: true
- schema:
- type: string
- title: Task Id
- - name: dag_run_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Run Id
- - name: xcom_key
- in: path
- required: true
- schema:
- type: string
- title: Xcom Key
- - name: map_index
- in: query
- required: false
- schema:
- type: integer
- minimum: -1
- default: -1
- title: Map Index
- - name: deserialize
- in: query
- required: false
- schema:
- type: boolean
- default: false
- title: Deserialize
- - name: stringify
- in: query
- required: false
- schema:
- type: boolean
- default: true
- title: Stringify
- responses:
- '200':
- description: Successful Response
- content:
- application/json:
- schema:
- anyOf:
- - $ref: '#/components/schemas/XComResponseNative'
- - $ref: '#/components/schemas/XComResponseString'
- title: Response Get Xcom Entry
- '401':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Unauthorized
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '404':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Not Found
- '422':
- description: Validation Error
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}:
get:
tags:
@@ -8715,6 +8815,54 @@ components:
- git_version
title: VersionInfo
description: Version information serializer for responses.
+ XComCollection:
+ properties:
+ xcom_entries:
+ items:
+ $ref: '#/components/schemas/XComResponse'
+ type: array
+ title: Xcom Entries
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - xcom_entries
+ - total_entries
+ title: XComCollection
+ description: List of XCom items.
+ XComResponse:
+ properties:
+ key:
+ type: string
+ title: Key
+ timestamp:
+ type: string
+ format: date-time
+ title: Timestamp
+ logical_date:
+ type: string
+ format: date-time
+ title: Logical Date
+ map_index:
+ type: integer
+ title: Map Index
+ task_id:
+ type: string
+ title: Task Id
+ dag_id:
+ type: string
+ title: Dag Id
+ type: object
+ required:
+ - key
+ - timestamp
+ - logical_date
+ - map_index
+ - task_id
+ - dag_id
+ title: XComResponse
+ description: Serializer for a xcom item.
XComResponseNative:
properties:
key:
diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py
b/airflow/api_fastapi/core_api/routes/public/__init__.py
index e2dbed54f71..3e05eb87680 100644
--- a/airflow/api_fastapi/core_api/routes/public/__init__.py
+++ b/airflow/api_fastapi/core_api/routes/public/__init__.py
@@ -68,10 +68,10 @@ authenticated_router.include_router(job_router)
authenticated_router.include_router(plugins_router)
authenticated_router.include_router(pools_router)
authenticated_router.include_router(providers_router)
+authenticated_router.include_router(xcom_router)
authenticated_router.include_router(task_instances_router)
authenticated_router.include_router(tasks_router)
authenticated_router.include_router(variables_router)
-authenticated_router.include_router(xcom_router)
authenticated_router.include_router(task_instances_log_router)
diff --git a/airflow/api_fastapi/core_api/routes/public/xcom.py
b/airflow/api_fastapi/core_api/routes/public/xcom.py
index 3a8e6130e45..1d4b154fd87 100644
--- a/airflow/api_fastapi/core_api/routes/public/xcom.py
+++ b/airflow/api_fastapi/core_api/routes/public/xcom.py
@@ -23,9 +23,11 @@ from fastapi import Depends, HTTPException, Query, status
from sqlalchemy import and_, select
from sqlalchemy.orm import Session
-from airflow.api_fastapi.common.db.common import get_session
+from airflow.api_fastapi.common.db.common import get_session, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.xcom import (
+ XComCollection,
XComResponseNative,
XComResponseString,
)
@@ -90,5 +92,53 @@ def get_xcom_entry(
if stringify:
return XComResponseString.model_validate(item)
-
return XComResponseNative.model_validate(item)
+
+
+@xcom_router.get(
+ "",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_xcom_entries(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ limit: QueryLimit,
+ offset: QueryOffset,
+ session: Annotated[Session, Depends(get_session)],
+ xcom_key: Annotated[str | None, Query()] = None,
+ map_index: Annotated[int | None, Query(ge=-1)] = None,
+) -> XComCollection:
+ """
+ Get all XCom entries.
+
+ This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to
retrieve XCom entries for all DAGs.
+ """
+ query = select(XCom)
+ if dag_id != "~":
+ query = query.where(XCom.dag_id == dag_id)
+ query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id ==
DR.run_id))
+
+ if task_id != "~":
+ query = query.where(XCom.task_id == task_id)
+ if dag_run_id != "~":
+ query = query.where(DR.run_id == dag_run_id)
+ if map_index is not None:
+ query = query.where(XCom.map_index == map_index)
+ if xcom_key is not None:
+ query = query.where(XCom.key == xcom_key)
+
+ query, total_entries = paginated_select(
+ statement=query,
+ offset=offset,
+ limit=limit,
+ session=session,
+ )
+ query = query.order_by(XCom.dag_id, XCom.task_id, XCom.run_id,
XCom.map_index, XCom.key)
+ xcoms = session.scalars(query)
+ return XComCollection(xcom_entries=xcoms, total_entries=total_entries)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 4696713d203..e4bc5f300d8 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1393,6 +1393,72 @@ export const UseProviderServiceGetProvidersKeyFn = (
} = {},
queryKey?: Array<unknown>,
) => [useProviderServiceGetProvidersKey, ...(queryKey ?? [{ limit, offset }])];
+export type XcomServiceGetXcomEntryDefaultResponse = Awaited<
+ ReturnType<typeof XcomService.getXcomEntry>
+>;
+export type XcomServiceGetXcomEntryQueryResult<
+ TData = XcomServiceGetXcomEntryDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useXcomServiceGetXcomEntryKey = "XcomServiceGetXcomEntry";
+export const UseXcomServiceGetXcomEntryKeyFn = (
+ {
+ dagId,
+ dagRunId,
+ deserialize,
+ mapIndex,
+ stringify,
+ taskId,
+ xcomKey,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ deserialize?: boolean;
+ mapIndex?: number;
+ stringify?: boolean;
+ taskId: string;
+ xcomKey: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useXcomServiceGetXcomEntryKey,
+ ...(queryKey ?? [
+ { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
+ ]),
+];
+export type XcomServiceGetXcomEntriesDefaultResponse = Awaited<
+ ReturnType<typeof XcomService.getXcomEntries>
+>;
+export type XcomServiceGetXcomEntriesQueryResult<
+ TData = XcomServiceGetXcomEntriesDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useXcomServiceGetXcomEntriesKey = "XcomServiceGetXcomEntries";
+export const UseXcomServiceGetXcomEntriesKeyFn = (
+ {
+ dagId,
+ dagRunId,
+ limit,
+ mapIndex,
+ offset,
+ taskId,
+ xcomKey,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+ xcomKey?: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useXcomServiceGetXcomEntriesKey,
+ ...(queryKey ?? [
+ { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey },
+ ]),
+];
export type TaskServiceGetTasksDefaultResponse = Awaited<
ReturnType<typeof TaskService.getTasks>
>;
@@ -1468,39 +1534,6 @@ export const UseVariableServiceGetVariablesKeyFn = (
useVariableServiceGetVariablesKey,
...(queryKey ?? [{ limit, offset, orderBy }]),
];
-export type XcomServiceGetXcomEntryDefaultResponse = Awaited<
- ReturnType<typeof XcomService.getXcomEntry>
->;
-export type XcomServiceGetXcomEntryQueryResult<
- TData = XcomServiceGetXcomEntryDefaultResponse,
- TError = unknown,
-> = UseQueryResult<TData, TError>;
-export const useXcomServiceGetXcomEntryKey = "XcomServiceGetXcomEntry";
-export const UseXcomServiceGetXcomEntryKeyFn = (
- {
- dagId,
- dagRunId,
- deserialize,
- mapIndex,
- stringify,
- taskId,
- xcomKey,
- }: {
- dagId: string;
- dagRunId: string;
- deserialize?: boolean;
- mapIndex?: number;
- stringify?: boolean;
- taskId: string;
- xcomKey: string;
- },
- queryKey?: Array<unknown>,
-) => [
- useXcomServiceGetXcomEntryKey,
- ...(queryKey ?? [
- { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
- ]),
-];
export type MonitorServiceGetHealthDefaultResponse = Awaited<
ReturnType<typeof MonitorService.getHealth>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index ff83018fc89..8f423b4c083 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1891,6 +1891,118 @@ export const prefetchUseProviderServiceGetProviders = (
queryKey: Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }),
queryFn: () => ProviderService.getProviders({ limit, offset }),
});
+/**
+ * Get Xcom Entry
+ * Get an XCom entry.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.taskId
+ * @param data.dagRunId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.deserialize
+ * @param data.stringify
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseXcomServiceGetXcomEntry = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ dagRunId,
+ deserialize,
+ mapIndex,
+ stringify,
+ taskId,
+ xcomKey,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ deserialize?: boolean;
+ mapIndex?: number;
+ stringify?: boolean;
+ taskId: string;
+ xcomKey: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseXcomServiceGetXcomEntryKeyFn({
+ dagId,
+ dagRunId,
+ deserialize,
+ mapIndex,
+ stringify,
+ taskId,
+ xcomKey,
+ }),
+ queryFn: () =>
+ XcomService.getXcomEntry({
+ dagId,
+ dagRunId,
+ deserialize,
+ mapIndex,
+ stringify,
+ taskId,
+ xcomKey,
+ }),
+ });
+/**
+ * Get Xcom Entries
+ * Get all XCom entries.
+ *
+ * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to
retrieve XCom entries for all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.limit
+ * @param data.offset
+ * @returns XComCollection Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseXcomServiceGetXcomEntries = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ dagRunId,
+ limit,
+ mapIndex,
+ offset,
+ taskId,
+ xcomKey,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+ xcomKey?: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({
+ dagId,
+ dagRunId,
+ limit,
+ mapIndex,
+ offset,
+ taskId,
+ xcomKey,
+ }),
+ queryFn: () =>
+ XcomService.getXcomEntries({
+ dagId,
+ dagRunId,
+ limit,
+ mapIndex,
+ offset,
+ taskId,
+ xcomKey,
+ }),
+ });
/**
* Get Tasks
* Get tasks for DAG.
@@ -1987,61 +2099,6 @@ export const prefetchUseVariableServiceGetVariables = (
}),
queryFn: () => VariableService.getVariables({ limit, offset, orderBy }),
});
-/**
- * Get Xcom Entry
- * Get an XCom entry.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.taskId
- * @param data.dagRunId
- * @param data.xcomKey
- * @param data.mapIndex
- * @param data.deserialize
- * @param data.stringify
- * @returns unknown Successful Response
- * @throws ApiError
- */
-export const prefetchUseXcomServiceGetXcomEntry = (
- queryClient: QueryClient,
- {
- dagId,
- dagRunId,
- deserialize,
- mapIndex,
- stringify,
- taskId,
- xcomKey,
- }: {
- dagId: string;
- dagRunId: string;
- deserialize?: boolean;
- mapIndex?: number;
- stringify?: boolean;
- taskId: string;
- xcomKey: string;
- },
-) =>
- queryClient.prefetchQuery({
- queryKey: Common.UseXcomServiceGetXcomEntryKeyFn({
- dagId,
- dagRunId,
- deserialize,
- mapIndex,
- stringify,
- taskId,
- xcomKey,
- }),
- queryFn: () =>
- XcomService.getXcomEntry({
- dagId,
- dagRunId,
- deserialize,
- mapIndex,
- stringify,
- taskId,
- xcomKey,
- }),
- });
/**
* Get Health
* @returns HealthInfoSchema Successful Response
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index ccc8bc1a12d..6ff3e83ccce 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -2247,6 +2247,120 @@ export const useProviderServiceGetProviders = <
queryFn: () => ProviderService.getProviders({ limit, offset }) as TData,
...options,
});
+/**
+ * Get Xcom Entry
+ * Get an XCom entry.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.taskId
+ * @param data.dagRunId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.deserialize
+ * @param data.stringify
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useXcomServiceGetXcomEntry = <
+ TData = Common.XcomServiceGetXcomEntryDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ deserialize,
+ mapIndex,
+ stringify,
+ taskId,
+ xcomKey,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ deserialize?: boolean;
+ mapIndex?: number;
+ stringify?: boolean;
+ taskId: string;
+ xcomKey: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseXcomServiceGetXcomEntryKeyFn(
+ { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
+ queryKey,
+ ),
+ queryFn: () =>
+ XcomService.getXcomEntry({
+ dagId,
+ dagRunId,
+ deserialize,
+ mapIndex,
+ stringify,
+ taskId,
+ xcomKey,
+ }) as TData,
+ ...options,
+ });
+/**
+ * Get Xcom Entries
+ * Get all XCom entries.
+ *
+ * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to
retrieve XCom entries for all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.limit
+ * @param data.offset
+ * @returns XComCollection Successful Response
+ * @throws ApiError
+ */
+export const useXcomServiceGetXcomEntries = <
+ TData = Common.XcomServiceGetXcomEntriesDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ limit,
+ mapIndex,
+ offset,
+ taskId,
+ xcomKey,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+ xcomKey?: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn(
+ { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey },
+ queryKey,
+ ),
+ queryFn: () =>
+ XcomService.getXcomEntries({
+ dagId,
+ dagRunId,
+ limit,
+ mapIndex,
+ offset,
+ taskId,
+ xcomKey,
+ }) as TData,
+ ...options,
+ });
/**
* Get Tasks
* Get tasks for DAG.
@@ -2370,62 +2484,6 @@ export const useVariableServiceGetVariables = <
VariableService.getVariables({ limit, offset, orderBy }) as TData,
...options,
});
-/**
- * Get Xcom Entry
- * Get an XCom entry.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.taskId
- * @param data.dagRunId
- * @param data.xcomKey
- * @param data.mapIndex
- * @param data.deserialize
- * @param data.stringify
- * @returns unknown Successful Response
- * @throws ApiError
- */
-export const useXcomServiceGetXcomEntry = <
- TData = Common.XcomServiceGetXcomEntryDefaultResponse,
- TError = unknown,
- TQueryKey extends Array<unknown> = unknown[],
->(
- {
- dagId,
- dagRunId,
- deserialize,
- mapIndex,
- stringify,
- taskId,
- xcomKey,
- }: {
- dagId: string;
- dagRunId: string;
- deserialize?: boolean;
- mapIndex?: number;
- stringify?: boolean;
- taskId: string;
- xcomKey: string;
- },
- queryKey?: TQueryKey,
- options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
- useQuery<TData, TError>({
- queryKey: Common.UseXcomServiceGetXcomEntryKeyFn(
- { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
- queryKey,
- ),
- queryFn: () =>
- XcomService.getXcomEntry({
- dagId,
- dagRunId,
- deserialize,
- mapIndex,
- stringify,
- taskId,
- xcomKey,
- }) as TData,
- ...options,
- });
/**
* Get Health
* @returns HealthInfoSchema Successful Response
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 49125fef08d..11386ab5d1f 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -2225,6 +2225,120 @@ export const useProviderServiceGetProvidersSuspense = <
queryFn: () => ProviderService.getProviders({ limit, offset }) as TData,
...options,
});
+/**
+ * Get Xcom Entry
+ * Get an XCom entry.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.taskId
+ * @param data.dagRunId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.deserialize
+ * @param data.stringify
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useXcomServiceGetXcomEntrySuspense = <
+ TData = Common.XcomServiceGetXcomEntryDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ deserialize,
+ mapIndex,
+ stringify,
+ taskId,
+ xcomKey,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ deserialize?: boolean;
+ mapIndex?: number;
+ stringify?: boolean;
+ taskId: string;
+ xcomKey: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseXcomServiceGetXcomEntryKeyFn(
+ { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
+ queryKey,
+ ),
+ queryFn: () =>
+ XcomService.getXcomEntry({
+ dagId,
+ dagRunId,
+ deserialize,
+ mapIndex,
+ stringify,
+ taskId,
+ xcomKey,
+ }) as TData,
+ ...options,
+ });
+/**
+ * Get Xcom Entries
+ * Get all XCom entries.
+ *
+ * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to
retrieve XCom entries for all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.limit
+ * @param data.offset
+ * @returns XComCollection Successful Response
+ * @throws ApiError
+ */
+export const useXcomServiceGetXcomEntriesSuspense = <
+ TData = Common.XcomServiceGetXcomEntriesDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ limit,
+ mapIndex,
+ offset,
+ taskId,
+ xcomKey,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+ xcomKey?: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn(
+ { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey },
+ queryKey,
+ ),
+ queryFn: () =>
+ XcomService.getXcomEntries({
+ dagId,
+ dagRunId,
+ limit,
+ mapIndex,
+ offset,
+ taskId,
+ xcomKey,
+ }) as TData,
+ ...options,
+ });
/**
* Get Tasks
* Get tasks for DAG.
@@ -2348,62 +2462,6 @@ export const useVariableServiceGetVariablesSuspense = <
VariableService.getVariables({ limit, offset, orderBy }) as TData,
...options,
});
-/**
- * Get Xcom Entry
- * Get an XCom entry.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.taskId
- * @param data.dagRunId
- * @param data.xcomKey
- * @param data.mapIndex
- * @param data.deserialize
- * @param data.stringify
- * @returns unknown Successful Response
- * @throws ApiError
- */
-export const useXcomServiceGetXcomEntrySuspense = <
- TData = Common.XcomServiceGetXcomEntryDefaultResponse,
- TError = unknown,
- TQueryKey extends Array<unknown> = unknown[],
->(
- {
- dagId,
- dagRunId,
- deserialize,
- mapIndex,
- stringify,
- taskId,
- xcomKey,
- }: {
- dagId: string;
- dagRunId: string;
- deserialize?: boolean;
- mapIndex?: number;
- stringify?: boolean;
- taskId: string;
- xcomKey: string;
- },
- queryKey?: TQueryKey,
- options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
- useSuspenseQuery<TData, TError>({
- queryKey: Common.UseXcomServiceGetXcomEntryKeyFn(
- { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
- queryKey,
- ),
- queryFn: () =>
- XcomService.getXcomEntry({
- dagId,
- dagRunId,
- deserialize,
- mapIndex,
- stringify,
- taskId,
- xcomKey,
- }) as TData,
- ...options,
- });
/**
* Get Health
* @returns HealthInfoSchema Successful Response
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 767b86fc82a..8002b9d37f6 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -5064,6 +5064,68 @@ export const $VersionInfo = {
description: "Version information serializer for responses.",
} as const;
+export const $XComCollection = {
+ properties: {
+ xcom_entries: {
+ items: {
+ $ref: "#/components/schemas/XComResponse",
+ },
+ type: "array",
+ title: "Xcom Entries",
+ },
+ total_entries: {
+ type: "integer",
+ title: "Total Entries",
+ },
+ },
+ type: "object",
+ required: ["xcom_entries", "total_entries"],
+ title: "XComCollection",
+ description: "List of XCom items.",
+} as const;
+
+export const $XComResponse = {
+ properties: {
+ key: {
+ type: "string",
+ title: "Key",
+ },
+ timestamp: {
+ type: "string",
+ format: "date-time",
+ title: "Timestamp",
+ },
+ logical_date: {
+ type: "string",
+ format: "date-time",
+ title: "Logical Date",
+ },
+ map_index: {
+ type: "integer",
+ title: "Map Index",
+ },
+ task_id: {
+ type: "string",
+ title: "Task Id",
+ },
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
+ },
+ },
+ type: "object",
+ required: [
+ "key",
+ "timestamp",
+ "logical_date",
+ "map_index",
+ "task_id",
+ "dag_id",
+ ],
+ title: "XComResponse",
+ description: "Serializer for a xcom item.",
+} as const;
+
export const $XComResponseNative = {
properties: {
key: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 1f14639496d..d8cb33bceec 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -150,6 +150,10 @@ import type {
PostPoolsResponse,
GetProvidersData,
GetProvidersResponse,
+ GetXcomEntryData,
+ GetXcomEntryResponse,
+ GetXcomEntriesData,
+ GetXcomEntriesResponse,
GetTasksData,
GetTasksResponse,
GetTaskData,
@@ -164,8 +168,6 @@ import type {
GetVariablesResponse,
PostVariableData,
PostVariableResponse,
- GetXcomEntryData,
- GetXcomEntryResponse,
GetHealthResponse,
GetVersionResponse,
} from "./types.gen";
@@ -2608,6 +2610,92 @@ export class ProviderService {
}
}
+export class XcomService {
+ /**
+ * Get Xcom Entry
+ * Get an XCom entry.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.taskId
+ * @param data.dagRunId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.deserialize
+ * @param data.stringify
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+ public static getXcomEntry(
+ data: GetXcomEntryData,
+ ): CancelablePromise<GetXcomEntryResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url:
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
+ path: {
+ dag_id: data.dagId,
+ task_id: data.taskId,
+ dag_run_id: data.dagRunId,
+ xcom_key: data.xcomKey,
+ },
+ query: {
+ map_index: data.mapIndex,
+ deserialize: data.deserialize,
+ stringify: data.stringify,
+ },
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
+ /**
+ * Get Xcom Entries
+ * Get all XCom entries.
+ *
+ * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to
retrieve XCom entries for all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.limit
+ * @param data.offset
+ * @returns XComCollection Successful Response
+ * @throws ApiError
+ */
+ public static getXcomEntries(
+ data: GetXcomEntriesData,
+ ): CancelablePromise<GetXcomEntriesResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url:
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ task_id: data.taskId,
+ },
+ query: {
+ xcom_key: data.xcomKey,
+ map_index: data.mapIndex,
+ limit: data.limit,
+ offset: data.offset,
+ },
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+}
+
export class TaskService {
/**
* Get Tasks
@@ -2809,49 +2897,6 @@ export class VariableService {
}
}
-export class XcomService {
- /**
- * Get Xcom Entry
- * Get an XCom entry.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.taskId
- * @param data.dagRunId
- * @param data.xcomKey
- * @param data.mapIndex
- * @param data.deserialize
- * @param data.stringify
- * @returns unknown Successful Response
- * @throws ApiError
- */
- public static getXcomEntry(
- data: GetXcomEntryData,
- ): CancelablePromise<GetXcomEntryResponse> {
- return __request(OpenAPI, {
- method: "GET",
- url:
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
- path: {
- dag_id: data.dagId,
- task_id: data.taskId,
- dag_run_id: data.dagRunId,
- xcom_key: data.xcomKey,
- },
- query: {
- map_index: data.mapIndex,
- deserialize: data.deserialize,
- stringify: data.stringify,
- },
- errors: {
- 400: "Bad Request",
- 401: "Unauthorized",
- 403: "Forbidden",
- 404: "Not Found",
- 422: "Validation Error",
- },
- });
- }
-}
-
export class MonitorService {
/**
* Get Health
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index f130c187c71..bdcce0157dc 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1185,6 +1185,26 @@ export type VersionInfo = {
git_version: string | null;
};
+/**
+ * List of XCom items.
+ */
+export type XComCollection = {
+ xcom_entries: Array<XComResponse>;
+ total_entries: number;
+};
+
+/**
+ * Serializer for a xcom item.
+ */
+export type XComResponse = {
+ key: string;
+ timestamp: string;
+ logical_date: string;
+ map_index: number;
+ task_id: string;
+ dag_id: string;
+};
+
/**
* XCom response serializer with native return type.
*/
@@ -1866,6 +1886,30 @@ export type GetProvidersData = {
export type GetProvidersResponse = ProviderCollectionResponse;
+export type GetXcomEntryData = {
+ dagId: string;
+ dagRunId: string;
+ deserialize?: boolean;
+ mapIndex?: number;
+ stringify?: boolean;
+ taskId: string;
+ xcomKey: string;
+};
+
+export type GetXcomEntryResponse = XComResponseNative | XComResponseString;
+
+export type GetXcomEntriesData = {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number | null;
+ offset?: number;
+ taskId: string;
+ xcomKey?: string | null;
+};
+
+export type GetXcomEntriesResponse = XComCollection;
+
export type GetTasksData = {
dagId: string;
orderBy?: string;
@@ -1914,18 +1958,6 @@ export type PostVariableData = {
export type PostVariableResponse = VariableResponse;
-export type GetXcomEntryData = {
- dagId: string;
- dagRunId: string;
- deserialize?: boolean;
- mapIndex?: number;
- stringify?: boolean;
- taskId: string;
- xcomKey: string;
-};
-
-export type GetXcomEntryResponse = XComResponseNative | XComResponseString;
-
export type GetHealthResponse = HealthInfoSchema;
export type GetVersionResponse = VersionInfo;
@@ -3906,6 +3938,68 @@ export type $OpenApiTs = {
};
};
};
+
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}":
{
+ get: {
+ req: GetXcomEntryData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: XComResponseNative | XComResponseString;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
+
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries":
{
+ get: {
+ req: GetXcomEntriesData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: XComCollection;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/dags/{dag_id}/tasks": {
get: {
req: GetTasksData;
@@ -4093,37 +4187,6 @@ export type $OpenApiTs = {
};
};
};
-
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}":
{
- get: {
- req: GetXcomEntryData;
- res: {
- /**
- * Successful Response
- */
- 200: XComResponseNative | XComResponseString;
- /**
- * Bad Request
- */
- 400: HTTPExceptionResponse;
- /**
- * Unauthorized
- */
- 401: HTTPExceptionResponse;
- /**
- * Forbidden
- */
- 403: HTTPExceptionResponse;
- /**
- * Not Found
- */
- 404: HTTPExceptionResponse;
- /**
- * Validation Error
- */
- 422: HTTPValidationError;
- };
- };
- };
"/public/monitor/health": {
get: {
res: {
diff --git a/tests/api_fastapi/core_api/routes/public/test_xcom.py
b/tests/api_fastapi/core_api/routes/public/test_xcom.py
index e0010c79fef..022987125fb 100644
--- a/tests/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/tests/api_fastapi/core_api/routes/public/test_xcom.py
@@ -21,6 +21,7 @@ from unittest import mock
import pytest
from airflow.models import XCom
+from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom import BaseXCom, resolve_xcom_backend
@@ -36,13 +37,17 @@ pytestmark = pytest.mark.db_test
TEST_XCOM_KEY = "test_xcom_key"
TEST_XCOM_VALUE = {"key": "value"}
-TEST_XCOM_KEY3 = "test_xcom_key_non_existing"
+TEST_XCOM_KEY_2 = "test_xcom_key_non_existing"
TEST_DAG_ID = "test-dag-id"
TEST_TASK_ID = "test-task-id"
TEST_EXECUTION_DATE = "2005-04-02T00:00:00+00:00"
+TEST_DAG_ID_2 = "test-dag-id-2"
+TEST_TASK_ID_2 = "test-task-id-2"
+
logical_date_parsed = timezone.parse(TEST_EXECUTION_DATE)
+logical_date_formatted = logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ")
run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
@@ -92,18 +97,18 @@ class TestXComEndpoint:
@pytest.fixture(autouse=True)
def setup(self) -> None:
self.clear_db()
+ _create_dag_run()
def teardown_method(self) -> None:
self.clear_db()
- def create_xcom(self, key, value, backend=XCom) -> None:
- _create_dag_run()
+ def _create_xcom(self, key, value, backend=XCom) -> None:
_create_xcom(key, value, backend)
class TestGetXComEntry(TestXComEndpoint):
def test_should_respond_200_stringify(self, test_client):
- self.create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
+ self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
response = test_client.get(
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}"
)
@@ -121,7 +126,7 @@ class TestGetXComEntry(TestXComEndpoint):
}
def test_should_respond_200_native(self, test_client):
- self.create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
+ self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
response = test_client.get(
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}?stringify=false"
)
@@ -140,10 +145,10 @@ class TestGetXComEntry(TestXComEndpoint):
def test_should_raise_404_for_non_existent_xcom(self, test_client):
response = test_client.get(
-
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY3}"
+
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY_2}"
)
assert response.status_code == 404
- assert response.json()["detail"] == f"XCom entry with key:
`{TEST_XCOM_KEY3}` not found"
+ assert response.json()["detail"] == f"XCom entry with key:
`{TEST_XCOM_KEY_2}` not found"
@pytest.mark.parametrize(
"support_deserialize, params, expected_status_or_value",
@@ -191,7 +196,7 @@ class TestGetXComEntry(TestXComEndpoint):
self, support_deserialize: bool, params: str,
expected_status_or_value: int | str, test_client
):
XCom = resolve_xcom_backend()
- self.create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE, backend=XCom)
+ self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE, backend=XCom)
url =
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}"
with
mock.patch("airflow.api_fastapi.core_api.routes.public.xcom.XCom", XCom):
@@ -203,3 +208,279 @@ class TestGetXComEntry(TestXComEndpoint):
else:
assert response.status_code == 200
assert response.json()["value"] == expected_status_or_value
+
+
+class TestGetXComEntries(TestXComEndpoint):
+ @pytest.fixture(autouse=True)
+ def setup(self) -> None:
+ self.clear_db()
+
+ def test_should_respond_200(self, test_client):
+ self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed,
TEST_TASK_ID)
+ response = test_client.get(
+
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries"
+ )
+ assert response.status_code == 200
+ response_data = response.json()
+ for xcom_entry in response_data["xcom_entries"]:
+ xcom_entry["timestamp"] = "TIMESTAMP"
+
+ expected_response = {
+ "xcom_entries": [
+ {
+ "dag_id": TEST_DAG_ID,
+ "logical_date": logical_date_formatted,
+ "key": f"{TEST_XCOM_KEY}-0",
+ "task_id": TEST_TASK_ID,
+ "timestamp": "TIMESTAMP",
+ "map_index": -1,
+ },
+ {
+ "dag_id": TEST_DAG_ID,
+ "logical_date": logical_date_formatted,
+ "key": f"{TEST_XCOM_KEY}-1",
+ "task_id": TEST_TASK_ID,
+ "timestamp": "TIMESTAMP",
+ "map_index": -1,
+ },
+ ],
+ "total_entries": 2,
+ }
+ assert response_data == expected_response
+
+ def test_should_respond_200_with_tilde(self, test_client):
+ self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed,
TEST_TASK_ID)
+ self._create_xcom_entries(TEST_DAG_ID_2, run_id, logical_date_parsed,
TEST_TASK_ID_2)
+
+ response =
test_client.get("/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries")
+ assert response.status_code == 200
+ response_data = response.json()
+ for xcom_entry in response_data["xcom_entries"]:
+ xcom_entry["timestamp"] = "TIMESTAMP"
+
+ expected_response = {
+ "xcom_entries": [
+ {
+ "dag_id": TEST_DAG_ID,
+ "logical_date": logical_date_formatted,
+ "key": f"{TEST_XCOM_KEY}-0",
+ "task_id": TEST_TASK_ID,
+ "timestamp": "TIMESTAMP",
+ "map_index": -1,
+ },
+ {
+ "dag_id": TEST_DAG_ID,
+ "logical_date": logical_date_formatted,
+ "key": f"{TEST_XCOM_KEY}-1",
+ "task_id": TEST_TASK_ID,
+ "timestamp": "TIMESTAMP",
+ "map_index": -1,
+ },
+ {
+ "dag_id": TEST_DAG_ID_2,
+ "logical_date": logical_date_formatted,
+ "key": f"{TEST_XCOM_KEY}-0",
+ "task_id": TEST_TASK_ID_2,
+ "timestamp": "TIMESTAMP",
+ "map_index": -1,
+ },
+ {
+ "dag_id": TEST_DAG_ID_2,
+ "logical_date": logical_date_formatted,
+ "key": f"{TEST_XCOM_KEY}-1",
+ "task_id": TEST_TASK_ID_2,
+ "timestamp": "TIMESTAMP",
+ "map_index": -1,
+ },
+ ],
+ "total_entries": 4,
+ }
+ assert response_data == expected_response
+
+ @pytest.mark.parametrize("map_index", (0, 1, None))
+ def test_should_respond_200_with_map_index(self, map_index, test_client):
+ self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed,
TEST_TASK_ID, mapped_ti=True)
+
+ response = test_client.get(
+ "/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries",
+ params={"map_index": map_index} if map_index is not None else None,
+ )
+ assert response.status_code == 200
+ response_data = response.json()
+
+ if map_index is None:
+ expected_entries = [
+ {
+ "dag_id": TEST_DAG_ID,
+ "logical_date": logical_date_formatted,
+ "key": TEST_XCOM_KEY,
+ "task_id": TEST_TASK_ID,
+ "timestamp": "TIMESTAMP",
+ "map_index": idx,
+ }
+ for idx in range(2)
+ ]
+ else:
+ expected_entries = [
+ {
+ "dag_id": TEST_DAG_ID,
+ "logical_date": logical_date_formatted,
+ "key": TEST_XCOM_KEY,
+ "task_id": TEST_TASK_ID,
+ "timestamp": "TIMESTAMP",
+ "map_index": map_index,
+ }
+ ]
+ for xcom_entry in response_data["xcom_entries"]:
+ xcom_entry["timestamp"] = "TIMESTAMP"
+ assert response_data == {
+ "xcom_entries": expected_entries,
+ "total_entries": len(expected_entries),
+ }
+
+ @pytest.mark.parametrize(
+ "key, expected_entries",
+ [
+ (
+ TEST_XCOM_KEY,
+ [
+ {
+ "dag_id": TEST_DAG_ID,
+ "logical_date": logical_date_formatted,
+ "key": TEST_XCOM_KEY,
+ "task_id": TEST_TASK_ID,
+ "timestamp": "TIMESTAMP",
+ "map_index": 0,
+ },
+ {
+ "dag_id": TEST_DAG_ID,
+ "logical_date": logical_date_formatted,
+ "key": TEST_XCOM_KEY,
+ "task_id": TEST_TASK_ID,
+ "timestamp": "TIMESTAMP",
+ "map_index": 1,
+ },
+ ],
+ ),
+ (f"{TEST_XCOM_KEY}-0", []),
+ ],
+ )
+ def test_should_respond_200_with_xcom_key(self, key, expected_entries,
test_client):
+ self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed,
TEST_TASK_ID, mapped_ti=True)
+ response = test_client.get(
+ "/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries",
+ params={"xcom_key": key} if key is not None else None,
+ )
+
+ assert response.status_code == 200
+ response_data = response.json()
+ for xcom_entry in response_data["xcom_entries"]:
+ xcom_entry["timestamp"] = "TIMESTAMP"
+ assert response_data == {
+ "xcom_entries": expected_entries,
+ "total_entries": len(expected_entries),
+ }
+
+ @provide_session
+ def _create_xcom_entries(self, dag_id, run_id, logical_date, task_id,
mapped_ti=False, session=None):
+ dag = DagModel(dag_id=dag_id)
+ session.add(dag)
+ dagrun = DagRun(
+ dag_id=dag_id,
+ run_id=run_id,
+ logical_date=logical_date,
+ start_date=logical_date,
+ run_type=DagRunType.MANUAL,
+ )
+ session.add(dagrun)
+ if mapped_ti:
+ for i in [0, 1]:
+ ti = TaskInstance(EmptyOperator(task_id=task_id),
run_id=run_id, map_index=i)
+ ti.dag_id = dag_id
+ session.add(ti)
+ else:
+ ti = TaskInstance(EmptyOperator(task_id=task_id), run_id=run_id)
+ ti.dag_id = dag_id
+ session.add(ti)
+ session.commit()
+
+ for i in [0, 1]:
+ if mapped_ti:
+ key = TEST_XCOM_KEY
+ map_index = i
+ else:
+ key = f"{TEST_XCOM_KEY}-{i}"
+ map_index = -1
+
+ XCom.set(
+ key=key,
+ value=TEST_XCOM_VALUE,
+ run_id=run_id,
+ task_id=task_id,
+ dag_id=dag_id,
+ map_index=map_index,
+ )
+
+
+class TestPaginationGetXComEntries(TestXComEndpoint):
+ @pytest.mark.parametrize(
+ "query_params, expected_xcom_ids",
+ [
+ (
+ {"limit": "1"},
+ ["TEST_XCOM_KEY0"],
+ ),
+ (
+ {"limit": "2"},
+ ["TEST_XCOM_KEY0", "TEST_XCOM_KEY1"],
+ ),
+ (
+ {"offset": "5"},
+ [
+ "TEST_XCOM_KEY5",
+ "TEST_XCOM_KEY6",
+ "TEST_XCOM_KEY7",
+ "TEST_XCOM_KEY8",
+ "TEST_XCOM_KEY9",
+ ],
+ ),
+ (
+ {"offset": "0"},
+ [
+ "TEST_XCOM_KEY0",
+ "TEST_XCOM_KEY1",
+ "TEST_XCOM_KEY2",
+ "TEST_XCOM_KEY3",
+ "TEST_XCOM_KEY4",
+ "TEST_XCOM_KEY5",
+ "TEST_XCOM_KEY6",
+ "TEST_XCOM_KEY7",
+ "TEST_XCOM_KEY8",
+ "TEST_XCOM_KEY9",
+ ],
+ ),
+ (
+ {"limit": "1", "offset": "5"},
+ ["TEST_XCOM_KEY5"],
+ ),
+ (
+ {"limit": "1", "offset": "1"},
+ ["TEST_XCOM_KEY1"],
+ ),
+ (
+ {"limit": "2", "offset": "2"},
+ ["TEST_XCOM_KEY2", "TEST_XCOM_KEY3"],
+ ),
+ ],
+ )
+ def test_handle_limit_offset(self, query_params, expected_xcom_ids,
test_client):
+ for i in range(10):
+ self._create_xcom(f"TEST_XCOM_KEY{i}", TEST_XCOM_VALUE)
+ response = test_client.get(
+ "/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries",
params=query_params
+ )
+ assert response.status_code == 200
+ response_data = response.json()
+ assert response_data["total_entries"] == 10
+ conn_ids = [conn["key"] for conn in response_data["xcom_entries"] if
conn]
+ assert conn_ids == expected_xcom_ids