This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a18bcd7a1b5 AIP-84 Migrate XCom get entries endpoint to Fastapi 
(#44366)
a18bcd7a1b5 is described below

commit a18bcd7a1b54062d248c025d8a8148d4e859a9ae
Author: michaeljs-c <[email protected]>
AuthorDate: Tue Nov 26 20:21:09 2024 +0000

    AIP-84 Migrate XCom get entries endpoint to Fastapi (#44366)
---
 airflow/api_connexion/endpoints/xcom_endpoint.py   |   1 +
 airflow/api_fastapi/core_api/datamodels/xcom.py    |   7 +
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 336 +++++++++++++++------
 .../api_fastapi/core_api/routes/public/__init__.py |   2 +-
 airflow/api_fastapi/core_api/routes/public/xcom.py |  54 +++-
 airflow/ui/openapi-gen/queries/common.ts           |  99 ++++--
 airflow/ui/openapi-gen/queries/prefetch.ts         | 167 ++++++----
 airflow/ui/openapi-gen/queries/queries.ts          | 170 +++++++----
 airflow/ui/openapi-gen/queries/suspense.ts         | 170 +++++++----
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |  62 ++++
 airflow/ui/openapi-gen/requests/services.gen.ts    | 135 ++++++---
 airflow/ui/openapi-gen/requests/types.gen.ts       | 149 ++++++---
 .../core_api/routes/public/test_xcom.py            | 297 +++++++++++++++++-
 13 files changed, 1256 insertions(+), 393 deletions(-)

diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py 
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index c86617391ab..cb3faf7379e 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -45,6 +45,7 @@ if TYPE_CHECKING:
     from airflow.api_connexion.types import APIResponse
 
 
+@mark_fastapi_migration_done
 @security.requires_access_dag("GET", DagAccessEntity.XCOM)
 @format_parameters({"limit": check_limit})
 @provide_session
diff --git a/airflow/api_fastapi/core_api/datamodels/xcom.py 
b/airflow/api_fastapi/core_api/datamodels/xcom.py
index 370aa651cb2..4e3c6f54a7a 100644
--- a/airflow/api_fastapi/core_api/datamodels/xcom.py
+++ b/airflow/api_fastapi/core_api/datamodels/xcom.py
@@ -49,3 +49,10 @@ class XComResponseString(XComResponse):
     @field_validator("value", mode="before")
     def value_to_string(cls, v):
         return str(v) if v is not None else None
+
+
+class XComCollection(BaseModel):
+    """List of XCom items."""
+
+    xcom_entries: list[XComResponse]
+    total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 2610dbcf612..c53f68a8438 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -3651,6 +3651,200 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
+    get:
+      tags:
+      - XCom
+      summary: Get Xcom Entry
+      description: Get an XCom entry.
+      operationId: get_xcom_entry
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: task_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Task Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      - name: xcom_key
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Xcom Key
+      - name: map_index
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: -1
+          default: -1
+          title: Map Index
+      - name: deserialize
+        in: query
+        required: false
+        schema:
+          type: boolean
+          default: false
+          title: Deserialize
+      - name: stringify
+        in: query
+        required: false
+        schema:
+          type: boolean
+          default: true
+          title: Stringify
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                anyOf:
+                - $ref: '#/components/schemas/XComResponseNative'
+                - $ref: '#/components/schemas/XComResponseString'
+                title: Response Get Xcom Entry
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Bad Request
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+  
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries:
+    get:
+      tags:
+      - XCom
+      summary: Get Xcom Entries
+      description: 'Get all XCom entries.
+
+
+        This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id 
to
+        retrieve XCom entries for all DAGs.'
+      operationId: get_xcom_entries
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      - name: task_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Task Id
+      - name: xcom_key
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Xcom Key
+      - name: map_index
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: integer
+            minimum: -1
+          - type: 'null'
+          title: Map Index
+      - name: limit
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: 0
+          default: 100
+          title: Limit
+      - name: offset
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: 0
+          default: 0
+          title: Offset
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/XComCollection'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Bad Request
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}:
     get:
       tags:
@@ -5199,100 +5393,6 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
-    get:
-      tags:
-      - XCom
-      summary: Get Xcom Entry
-      description: Get an XCom entry.
-      operationId: get_xcom_entry
-      parameters:
-      - name: dag_id
-        in: path
-        required: true
-        schema:
-          type: string
-          title: Dag Id
-      - name: task_id
-        in: path
-        required: true
-        schema:
-          type: string
-          title: Task Id
-      - name: dag_run_id
-        in: path
-        required: true
-        schema:
-          type: string
-          title: Dag Run Id
-      - name: xcom_key
-        in: path
-        required: true
-        schema:
-          type: string
-          title: Xcom Key
-      - name: map_index
-        in: query
-        required: false
-        schema:
-          type: integer
-          minimum: -1
-          default: -1
-          title: Map Index
-      - name: deserialize
-        in: query
-        required: false
-        schema:
-          type: boolean
-          default: false
-          title: Deserialize
-      - name: stringify
-        in: query
-        required: false
-        schema:
-          type: boolean
-          default: true
-          title: Stringify
-      responses:
-        '200':
-          description: Successful Response
-          content:
-            application/json:
-              schema:
-                anyOf:
-                - $ref: '#/components/schemas/XComResponseNative'
-                - $ref: '#/components/schemas/XComResponseString'
-                title: Response Get Xcom Entry
-        '401':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Unauthorized
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Forbidden
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Bad Request
-        '404':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Not Found
-        '422':
-          description: Validation Error
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPValidationError'
   
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}:
     get:
       tags:
@@ -8715,6 +8815,54 @@ components:
       - git_version
       title: VersionInfo
       description: Version information serializer for responses.
+    XComCollection:
+      properties:
+        xcom_entries:
+          items:
+            $ref: '#/components/schemas/XComResponse'
+          type: array
+          title: Xcom Entries
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - xcom_entries
+      - total_entries
+      title: XComCollection
+      description: List of XCom items.
+    XComResponse:
+      properties:
+        key:
+          type: string
+          title: Key
+        timestamp:
+          type: string
+          format: date-time
+          title: Timestamp
+        logical_date:
+          type: string
+          format: date-time
+          title: Logical Date
+        map_index:
+          type: integer
+          title: Map Index
+        task_id:
+          type: string
+          title: Task Id
+        dag_id:
+          type: string
+          title: Dag Id
+      type: object
+      required:
+      - key
+      - timestamp
+      - logical_date
+      - map_index
+      - task_id
+      - dag_id
+      title: XComResponse
+      description: Serializer for a xcom item.
     XComResponseNative:
       properties:
         key:
diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py 
b/airflow/api_fastapi/core_api/routes/public/__init__.py
index e2dbed54f71..3e05eb87680 100644
--- a/airflow/api_fastapi/core_api/routes/public/__init__.py
+++ b/airflow/api_fastapi/core_api/routes/public/__init__.py
@@ -68,10 +68,10 @@ authenticated_router.include_router(job_router)
 authenticated_router.include_router(plugins_router)
 authenticated_router.include_router(pools_router)
 authenticated_router.include_router(providers_router)
+authenticated_router.include_router(xcom_router)
 authenticated_router.include_router(task_instances_router)
 authenticated_router.include_router(tasks_router)
 authenticated_router.include_router(variables_router)
-authenticated_router.include_router(xcom_router)
 authenticated_router.include_router(task_instances_log_router)
 
 
diff --git a/airflow/api_fastapi/core_api/routes/public/xcom.py 
b/airflow/api_fastapi/core_api/routes/public/xcom.py
index 3a8e6130e45..1d4b154fd87 100644
--- a/airflow/api_fastapi/core_api/routes/public/xcom.py
+++ b/airflow/api_fastapi/core_api/routes/public/xcom.py
@@ -23,9 +23,11 @@ from fastapi import Depends, HTTPException, Query, status
 from sqlalchemy import and_, select
 from sqlalchemy.orm import Session
 
-from airflow.api_fastapi.common.db.common import get_session
+from airflow.api_fastapi.common.db.common import get_session, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.datamodels.xcom import (
+    XComCollection,
     XComResponseNative,
     XComResponseString,
 )
@@ -90,5 +92,53 @@ def get_xcom_entry(
 
     if stringify:
         return XComResponseString.model_validate(item)
-
     return XComResponseNative.model_validate(item)
+
+
+@xcom_router.get(
+    "",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+)
+def get_xcom_entries(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: Annotated[Session, Depends(get_session)],
+    xcom_key: Annotated[str | None, Query()] = None,
+    map_index: Annotated[int | None, Query(ge=-1)] = None,
+) -> XComCollection:
+    """
+    Get all XCom entries.
+
+    This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to 
retrieve XCom entries for all DAGs.
+    """
+    query = select(XCom)
+    if dag_id != "~":
+        query = query.where(XCom.dag_id == dag_id)
+    query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id == 
DR.run_id))
+
+    if task_id != "~":
+        query = query.where(XCom.task_id == task_id)
+    if dag_run_id != "~":
+        query = query.where(DR.run_id == dag_run_id)
+    if map_index is not None:
+        query = query.where(XCom.map_index == map_index)
+    if xcom_key is not None:
+        query = query.where(XCom.key == xcom_key)
+
+    query, total_entries = paginated_select(
+        statement=query,
+        offset=offset,
+        limit=limit,
+        session=session,
+    )
+    query = query.order_by(XCom.dag_id, XCom.task_id, XCom.run_id, 
XCom.map_index, XCom.key)
+    xcoms = session.scalars(query)
+    return XComCollection(xcom_entries=xcoms, total_entries=total_entries)
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 4696713d203..e4bc5f300d8 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1393,6 +1393,72 @@ export const UseProviderServiceGetProvidersKeyFn = (
   } = {},
   queryKey?: Array<unknown>,
 ) => [useProviderServiceGetProvidersKey, ...(queryKey ?? [{ limit, offset }])];
+export type XcomServiceGetXcomEntryDefaultResponse = Awaited<
+  ReturnType<typeof XcomService.getXcomEntry>
+>;
+export type XcomServiceGetXcomEntryQueryResult<
+  TData = XcomServiceGetXcomEntryDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useXcomServiceGetXcomEntryKey = "XcomServiceGetXcomEntry";
+export const UseXcomServiceGetXcomEntryKeyFn = (
+  {
+    dagId,
+    dagRunId,
+    deserialize,
+    mapIndex,
+    stringify,
+    taskId,
+    xcomKey,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    deserialize?: boolean;
+    mapIndex?: number;
+    stringify?: boolean;
+    taskId: string;
+    xcomKey: string;
+  },
+  queryKey?: Array<unknown>,
+) => [
+  useXcomServiceGetXcomEntryKey,
+  ...(queryKey ?? [
+    { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
+  ]),
+];
+export type XcomServiceGetXcomEntriesDefaultResponse = Awaited<
+  ReturnType<typeof XcomService.getXcomEntries>
+>;
+export type XcomServiceGetXcomEntriesQueryResult<
+  TData = XcomServiceGetXcomEntriesDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useXcomServiceGetXcomEntriesKey = "XcomServiceGetXcomEntries";
+export const UseXcomServiceGetXcomEntriesKeyFn = (
+  {
+    dagId,
+    dagRunId,
+    limit,
+    mapIndex,
+    offset,
+    taskId,
+    xcomKey,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    limit?: number;
+    mapIndex?: number;
+    offset?: number;
+    taskId: string;
+    xcomKey?: string;
+  },
+  queryKey?: Array<unknown>,
+) => [
+  useXcomServiceGetXcomEntriesKey,
+  ...(queryKey ?? [
+    { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey },
+  ]),
+];
 export type TaskServiceGetTasksDefaultResponse = Awaited<
   ReturnType<typeof TaskService.getTasks>
 >;
@@ -1468,39 +1534,6 @@ export const UseVariableServiceGetVariablesKeyFn = (
   useVariableServiceGetVariablesKey,
   ...(queryKey ?? [{ limit, offset, orderBy }]),
 ];
-export type XcomServiceGetXcomEntryDefaultResponse = Awaited<
-  ReturnType<typeof XcomService.getXcomEntry>
->;
-export type XcomServiceGetXcomEntryQueryResult<
-  TData = XcomServiceGetXcomEntryDefaultResponse,
-  TError = unknown,
-> = UseQueryResult<TData, TError>;
-export const useXcomServiceGetXcomEntryKey = "XcomServiceGetXcomEntry";
-export const UseXcomServiceGetXcomEntryKeyFn = (
-  {
-    dagId,
-    dagRunId,
-    deserialize,
-    mapIndex,
-    stringify,
-    taskId,
-    xcomKey,
-  }: {
-    dagId: string;
-    dagRunId: string;
-    deserialize?: boolean;
-    mapIndex?: number;
-    stringify?: boolean;
-    taskId: string;
-    xcomKey: string;
-  },
-  queryKey?: Array<unknown>,
-) => [
-  useXcomServiceGetXcomEntryKey,
-  ...(queryKey ?? [
-    { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
-  ]),
-];
 export type MonitorServiceGetHealthDefaultResponse = Awaited<
   ReturnType<typeof MonitorService.getHealth>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index ff83018fc89..8f423b4c083 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1891,6 +1891,118 @@ export const prefetchUseProviderServiceGetProviders = (
     queryKey: Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }),
     queryFn: () => ProviderService.getProviders({ limit, offset }),
   });
+/**
+ * Get Xcom Entry
+ * Get an XCom entry.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.taskId
+ * @param data.dagRunId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.deserialize
+ * @param data.stringify
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseXcomServiceGetXcomEntry = (
+  queryClient: QueryClient,
+  {
+    dagId,
+    dagRunId,
+    deserialize,
+    mapIndex,
+    stringify,
+    taskId,
+    xcomKey,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    deserialize?: boolean;
+    mapIndex?: number;
+    stringify?: boolean;
+    taskId: string;
+    xcomKey: string;
+  },
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseXcomServiceGetXcomEntryKeyFn({
+      dagId,
+      dagRunId,
+      deserialize,
+      mapIndex,
+      stringify,
+      taskId,
+      xcomKey,
+    }),
+    queryFn: () =>
+      XcomService.getXcomEntry({
+        dagId,
+        dagRunId,
+        deserialize,
+        mapIndex,
+        stringify,
+        taskId,
+        xcomKey,
+      }),
+  });
+/**
+ * Get Xcom Entries
+ * Get all XCom entries.
+ *
+ * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to 
retrieve XCom entries for all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.limit
+ * @param data.offset
+ * @returns XComCollection Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseXcomServiceGetXcomEntries = (
+  queryClient: QueryClient,
+  {
+    dagId,
+    dagRunId,
+    limit,
+    mapIndex,
+    offset,
+    taskId,
+    xcomKey,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    limit?: number;
+    mapIndex?: number;
+    offset?: number;
+    taskId: string;
+    xcomKey?: string;
+  },
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({
+      dagId,
+      dagRunId,
+      limit,
+      mapIndex,
+      offset,
+      taskId,
+      xcomKey,
+    }),
+    queryFn: () =>
+      XcomService.getXcomEntries({
+        dagId,
+        dagRunId,
+        limit,
+        mapIndex,
+        offset,
+        taskId,
+        xcomKey,
+      }),
+  });
 /**
  * Get Tasks
  * Get tasks for DAG.
@@ -1987,61 +2099,6 @@ export const prefetchUseVariableServiceGetVariables = (
     }),
     queryFn: () => VariableService.getVariables({ limit, offset, orderBy }),
   });
-/**
- * Get Xcom Entry
- * Get an XCom entry.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.taskId
- * @param data.dagRunId
- * @param data.xcomKey
- * @param data.mapIndex
- * @param data.deserialize
- * @param data.stringify
- * @returns unknown Successful Response
- * @throws ApiError
- */
-export const prefetchUseXcomServiceGetXcomEntry = (
-  queryClient: QueryClient,
-  {
-    dagId,
-    dagRunId,
-    deserialize,
-    mapIndex,
-    stringify,
-    taskId,
-    xcomKey,
-  }: {
-    dagId: string;
-    dagRunId: string;
-    deserialize?: boolean;
-    mapIndex?: number;
-    stringify?: boolean;
-    taskId: string;
-    xcomKey: string;
-  },
-) =>
-  queryClient.prefetchQuery({
-    queryKey: Common.UseXcomServiceGetXcomEntryKeyFn({
-      dagId,
-      dagRunId,
-      deserialize,
-      mapIndex,
-      stringify,
-      taskId,
-      xcomKey,
-    }),
-    queryFn: () =>
-      XcomService.getXcomEntry({
-        dagId,
-        dagRunId,
-        deserialize,
-        mapIndex,
-        stringify,
-        taskId,
-        xcomKey,
-      }),
-  });
 /**
  * Get Health
  * @returns HealthInfoSchema Successful Response
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index ccc8bc1a12d..6ff3e83ccce 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -2247,6 +2247,120 @@ export const useProviderServiceGetProviders = <
     queryFn: () => ProviderService.getProviders({ limit, offset }) as TData,
     ...options,
   });
+/**
+ * Get Xcom Entry
+ * Get an XCom entry.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.taskId
+ * @param data.dagRunId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.deserialize
+ * @param data.stringify
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useXcomServiceGetXcomEntry = <
+  TData = Common.XcomServiceGetXcomEntryDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    dagRunId,
+    deserialize,
+    mapIndex,
+    stringify,
+    taskId,
+    xcomKey,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    deserialize?: boolean;
+    mapIndex?: number;
+    stringify?: boolean;
+    taskId: string;
+    xcomKey: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseXcomServiceGetXcomEntryKeyFn(
+      { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
+      queryKey,
+    ),
+    queryFn: () =>
+      XcomService.getXcomEntry({
+        dagId,
+        dagRunId,
+        deserialize,
+        mapIndex,
+        stringify,
+        taskId,
+        xcomKey,
+      }) as TData,
+    ...options,
+  });
+/**
+ * Get Xcom Entries
+ * Get all XCom entries.
+ *
+ * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to 
retrieve XCom entries for all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.limit
+ * @param data.offset
+ * @returns XComCollection Successful Response
+ * @throws ApiError
+ */
+export const useXcomServiceGetXcomEntries = <
+  TData = Common.XcomServiceGetXcomEntriesDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    dagRunId,
+    limit,
+    mapIndex,
+    offset,
+    taskId,
+    xcomKey,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    limit?: number;
+    mapIndex?: number;
+    offset?: number;
+    taskId: string;
+    xcomKey?: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn(
+      { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey },
+      queryKey,
+    ),
+    queryFn: () =>
+      XcomService.getXcomEntries({
+        dagId,
+        dagRunId,
+        limit,
+        mapIndex,
+        offset,
+        taskId,
+        xcomKey,
+      }) as TData,
+    ...options,
+  });
 /**
  * Get Tasks
  * Get tasks for DAG.
@@ -2370,62 +2484,6 @@ export const useVariableServiceGetVariables = <
       VariableService.getVariables({ limit, offset, orderBy }) as TData,
     ...options,
   });
-/**
- * Get Xcom Entry
- * Get an XCom entry.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.taskId
- * @param data.dagRunId
- * @param data.xcomKey
- * @param data.mapIndex
- * @param data.deserialize
- * @param data.stringify
- * @returns unknown Successful Response
- * @throws ApiError
- */
-export const useXcomServiceGetXcomEntry = <
-  TData = Common.XcomServiceGetXcomEntryDefaultResponse,
-  TError = unknown,
-  TQueryKey extends Array<unknown> = unknown[],
->(
-  {
-    dagId,
-    dagRunId,
-    deserialize,
-    mapIndex,
-    stringify,
-    taskId,
-    xcomKey,
-  }: {
-    dagId: string;
-    dagRunId: string;
-    deserialize?: boolean;
-    mapIndex?: number;
-    stringify?: boolean;
-    taskId: string;
-    xcomKey: string;
-  },
-  queryKey?: TQueryKey,
-  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
-  useQuery<TData, TError>({
-    queryKey: Common.UseXcomServiceGetXcomEntryKeyFn(
-      { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
-      queryKey,
-    ),
-    queryFn: () =>
-      XcomService.getXcomEntry({
-        dagId,
-        dagRunId,
-        deserialize,
-        mapIndex,
-        stringify,
-        taskId,
-        xcomKey,
-      }) as TData,
-    ...options,
-  });
 /**
  * Get Health
  * @returns HealthInfoSchema Successful Response
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 49125fef08d..11386ab5d1f 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -2225,6 +2225,120 @@ export const useProviderServiceGetProvidersSuspense = <
     queryFn: () => ProviderService.getProviders({ limit, offset }) as TData,
     ...options,
   });
+/**
+ * Get Xcom Entry
+ * Get an XCom entry.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.taskId
+ * @param data.dagRunId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.deserialize
+ * @param data.stringify
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useXcomServiceGetXcomEntrySuspense = <
+  TData = Common.XcomServiceGetXcomEntryDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    dagRunId,
+    deserialize,
+    mapIndex,
+    stringify,
+    taskId,
+    xcomKey,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    deserialize?: boolean;
+    mapIndex?: number;
+    stringify?: boolean;
+    taskId: string;
+    xcomKey: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseXcomServiceGetXcomEntryKeyFn(
+      { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
+      queryKey,
+    ),
+    queryFn: () =>
+      XcomService.getXcomEntry({
+        dagId,
+        dagRunId,
+        deserialize,
+        mapIndex,
+        stringify,
+        taskId,
+        xcomKey,
+      }) as TData,
+    ...options,
+  });
+/**
+ * Get Xcom Entries
+ * Get all XCom entries.
+ *
+ * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to 
retrieve XCom entries for all DAGs.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.xcomKey
+ * @param data.mapIndex
+ * @param data.limit
+ * @param data.offset
+ * @returns XComCollection Successful Response
+ * @throws ApiError
+ */
+export const useXcomServiceGetXcomEntriesSuspense = <
+  TData = Common.XcomServiceGetXcomEntriesDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    dagRunId,
+    limit,
+    mapIndex,
+    offset,
+    taskId,
+    xcomKey,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    limit?: number;
+    mapIndex?: number;
+    offset?: number;
+    taskId: string;
+    xcomKey?: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn(
+      { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey },
+      queryKey,
+    ),
+    queryFn: () =>
+      XcomService.getXcomEntries({
+        dagId,
+        dagRunId,
+        limit,
+        mapIndex,
+        offset,
+        taskId,
+        xcomKey,
+      }) as TData,
+    ...options,
+  });
 /**
  * Get Tasks
  * Get tasks for DAG.
@@ -2348,62 +2462,6 @@ export const useVariableServiceGetVariablesSuspense = <
       VariableService.getVariables({ limit, offset, orderBy }) as TData,
     ...options,
   });
-/**
- * Get Xcom Entry
- * Get an XCom entry.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.taskId
- * @param data.dagRunId
- * @param data.xcomKey
- * @param data.mapIndex
- * @param data.deserialize
- * @param data.stringify
- * @returns unknown Successful Response
- * @throws ApiError
- */
-export const useXcomServiceGetXcomEntrySuspense = <
-  TData = Common.XcomServiceGetXcomEntryDefaultResponse,
-  TError = unknown,
-  TQueryKey extends Array<unknown> = unknown[],
->(
-  {
-    dagId,
-    dagRunId,
-    deserialize,
-    mapIndex,
-    stringify,
-    taskId,
-    xcomKey,
-  }: {
-    dagId: string;
-    dagRunId: string;
-    deserialize?: boolean;
-    mapIndex?: number;
-    stringify?: boolean;
-    taskId: string;
-    xcomKey: string;
-  },
-  queryKey?: TQueryKey,
-  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
-  useSuspenseQuery<TData, TError>({
-    queryKey: Common.UseXcomServiceGetXcomEntryKeyFn(
-      { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey },
-      queryKey,
-    ),
-    queryFn: () =>
-      XcomService.getXcomEntry({
-        dagId,
-        dagRunId,
-        deserialize,
-        mapIndex,
-        stringify,
-        taskId,
-        xcomKey,
-      }) as TData,
-    ...options,
-  });
 /**
  * Get Health
  * @returns HealthInfoSchema Successful Response
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 767b86fc82a..8002b9d37f6 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -5064,6 +5064,68 @@ export const $VersionInfo = {
   description: "Version information serializer for responses.",
 } as const;
 
+export const $XComCollection = {
+  properties: {
+    xcom_entries: {
+      items: {
+        $ref: "#/components/schemas/XComResponse",
+      },
+      type: "array",
+      title: "Xcom Entries",
+    },
+    total_entries: {
+      type: "integer",
+      title: "Total Entries",
+    },
+  },
+  type: "object",
+  required: ["xcom_entries", "total_entries"],
+  title: "XComCollection",
+  description: "List of XCom items.",
+} as const;
+
+export const $XComResponse = {
+  properties: {
+    key: {
+      type: "string",
+      title: "Key",
+    },
+    timestamp: {
+      type: "string",
+      format: "date-time",
+      title: "Timestamp",
+    },
+    logical_date: {
+      type: "string",
+      format: "date-time",
+      title: "Logical Date",
+    },
+    map_index: {
+      type: "integer",
+      title: "Map Index",
+    },
+    task_id: {
+      type: "string",
+      title: "Task Id",
+    },
+    dag_id: {
+      type: "string",
+      title: "Dag Id",
+    },
+  },
+  type: "object",
+  required: [
+    "key",
+    "timestamp",
+    "logical_date",
+    "map_index",
+    "task_id",
+    "dag_id",
+  ],
+  title: "XComResponse",
+  description: "Serializer for a xcom item.",
+} as const;
+
 export const $XComResponseNative = {
   properties: {
     key: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 1f14639496d..d8cb33bceec 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -150,6 +150,10 @@ import type {
   PostPoolsResponse,
   GetProvidersData,
   GetProvidersResponse,
+  GetXcomEntryData,
+  GetXcomEntryResponse,
+  GetXcomEntriesData,
+  GetXcomEntriesResponse,
   GetTasksData,
   GetTasksResponse,
   GetTaskData,
@@ -164,8 +168,6 @@ import type {
   GetVariablesResponse,
   PostVariableData,
   PostVariableResponse,
-  GetXcomEntryData,
-  GetXcomEntryResponse,
   GetHealthResponse,
   GetVersionResponse,
 } from "./types.gen";
@@ -2608,6 +2610,92 @@ export class ProviderService {
   }
 }
 
+export class XcomService {
+  /**
+   * Get Xcom Entry
+   * Get an XCom entry.
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.taskId
+   * @param data.dagRunId
+   * @param data.xcomKey
+   * @param data.mapIndex
+   * @param data.deserialize
+   * @param data.stringify
+   * @returns unknown Successful Response
+   * @throws ApiError
+   */
+  public static getXcomEntry(
+    data: GetXcomEntryData,
+  ): CancelablePromise<GetXcomEntryResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: 
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
+      path: {
+        dag_id: data.dagId,
+        task_id: data.taskId,
+        dag_run_id: data.dagRunId,
+        xcom_key: data.xcomKey,
+      },
+      query: {
+        map_index: data.mapIndex,
+        deserialize: data.deserialize,
+        stringify: data.stringify,
+      },
+      errors: {
+        400: "Bad Request",
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
+
+  /**
+   * Get Xcom Entries
+   * Get all XCom entries.
+   *
+   * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to 
retrieve XCom entries for all DAGs.
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.dagRunId
+   * @param data.taskId
+   * @param data.xcomKey
+   * @param data.mapIndex
+   * @param data.limit
+   * @param data.offset
+   * @returns XComCollection Successful Response
+   * @throws ApiError
+   */
+  public static getXcomEntries(
+    data: GetXcomEntriesData,
+  ): CancelablePromise<GetXcomEntriesResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: 
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+      path: {
+        dag_id: data.dagId,
+        dag_run_id: data.dagRunId,
+        task_id: data.taskId,
+      },
+      query: {
+        xcom_key: data.xcomKey,
+        map_index: data.mapIndex,
+        limit: data.limit,
+        offset: data.offset,
+      },
+      errors: {
+        400: "Bad Request",
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
+}
+
 export class TaskService {
   /**
    * Get Tasks
@@ -2809,49 +2897,6 @@ export class VariableService {
   }
 }
 
-export class XcomService {
-  /**
-   * Get Xcom Entry
-   * Get an XCom entry.
-   * @param data The data for the request.
-   * @param data.dagId
-   * @param data.taskId
-   * @param data.dagRunId
-   * @param data.xcomKey
-   * @param data.mapIndex
-   * @param data.deserialize
-   * @param data.stringify
-   * @returns unknown Successful Response
-   * @throws ApiError
-   */
-  public static getXcomEntry(
-    data: GetXcomEntryData,
-  ): CancelablePromise<GetXcomEntryResponse> {
-    return __request(OpenAPI, {
-      method: "GET",
-      url: 
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
-      path: {
-        dag_id: data.dagId,
-        task_id: data.taskId,
-        dag_run_id: data.dagRunId,
-        xcom_key: data.xcomKey,
-      },
-      query: {
-        map_index: data.mapIndex,
-        deserialize: data.deserialize,
-        stringify: data.stringify,
-      },
-      errors: {
-        400: "Bad Request",
-        401: "Unauthorized",
-        403: "Forbidden",
-        404: "Not Found",
-        422: "Validation Error",
-      },
-    });
-  }
-}
-
 export class MonitorService {
   /**
    * Get Health
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index f130c187c71..bdcce0157dc 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1185,6 +1185,26 @@ export type VersionInfo = {
   git_version: string | null;
 };
 
+/**
+ * List of XCom items.
+ */
+export type XComCollection = {
+  xcom_entries: Array<XComResponse>;
+  total_entries: number;
+};
+
+/**
+ * Serializer for a xcom item.
+ */
+export type XComResponse = {
+  key: string;
+  timestamp: string;
+  logical_date: string;
+  map_index: number;
+  task_id: string;
+  dag_id: string;
+};
+
 /**
  * XCom response serializer with native return type.
  */
@@ -1866,6 +1886,30 @@ export type GetProvidersData = {
 
 export type GetProvidersResponse = ProviderCollectionResponse;
 
+export type GetXcomEntryData = {
+  dagId: string;
+  dagRunId: string;
+  deserialize?: boolean;
+  mapIndex?: number;
+  stringify?: boolean;
+  taskId: string;
+  xcomKey: string;
+};
+
+export type GetXcomEntryResponse = XComResponseNative | XComResponseString;
+
+export type GetXcomEntriesData = {
+  dagId: string;
+  dagRunId: string;
+  limit?: number;
+  mapIndex?: number | null;
+  offset?: number;
+  taskId: string;
+  xcomKey?: string | null;
+};
+
+export type GetXcomEntriesResponse = XComCollection;
+
 export type GetTasksData = {
   dagId: string;
   orderBy?: string;
@@ -1914,18 +1958,6 @@ export type PostVariableData = {
 
 export type PostVariableResponse = VariableResponse;
 
-export type GetXcomEntryData = {
-  dagId: string;
-  dagRunId: string;
-  deserialize?: boolean;
-  mapIndex?: number;
-  stringify?: boolean;
-  taskId: string;
-  xcomKey: string;
-};
-
-export type GetXcomEntryResponse = XComResponseNative | XComResponseString;
-
 export type GetHealthResponse = HealthInfoSchema;
 
 export type GetVersionResponse = VersionInfo;
@@ -3906,6 +3938,68 @@ export type $OpenApiTs = {
       };
     };
   };
+  
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}":
 {
+    get: {
+      req: GetXcomEntryData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: XComResponseNative | XComResponseString;
+        /**
+         * Bad Request
+         */
+        400: HTTPExceptionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
+  
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries":
 {
+    get: {
+      req: GetXcomEntriesData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: XComCollection;
+        /**
+         * Bad Request
+         */
+        400: HTTPExceptionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/dags/{dag_id}/tasks": {
     get: {
       req: GetTasksData;
@@ -4093,37 +4187,6 @@ export type $OpenApiTs = {
       };
     };
   };
-  
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}":
 {
-    get: {
-      req: GetXcomEntryData;
-      res: {
-        /**
-         * Successful Response
-         */
-        200: XComResponseNative | XComResponseString;
-        /**
-         * Bad Request
-         */
-        400: HTTPExceptionResponse;
-        /**
-         * Unauthorized
-         */
-        401: HTTPExceptionResponse;
-        /**
-         * Forbidden
-         */
-        403: HTTPExceptionResponse;
-        /**
-         * Not Found
-         */
-        404: HTTPExceptionResponse;
-        /**
-         * Validation Error
-         */
-        422: HTTPValidationError;
-      };
-    };
-  };
   "/public/monitor/health": {
     get: {
       res: {
diff --git a/tests/api_fastapi/core_api/routes/public/test_xcom.py 
b/tests/api_fastapi/core_api/routes/public/test_xcom.py
index e0010c79fef..022987125fb 100644
--- a/tests/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/tests/api_fastapi/core_api/routes/public/test_xcom.py
@@ -21,6 +21,7 @@ from unittest import mock
 import pytest
 
 from airflow.models import XCom
+from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.xcom import BaseXCom, resolve_xcom_backend
@@ -36,13 +37,17 @@ pytestmark = pytest.mark.db_test
 
 TEST_XCOM_KEY = "test_xcom_key"
 TEST_XCOM_VALUE = {"key": "value"}
-TEST_XCOM_KEY3 = "test_xcom_key_non_existing"
+TEST_XCOM_KEY_2 = "test_xcom_key_non_existing"
 
 TEST_DAG_ID = "test-dag-id"
 TEST_TASK_ID = "test-task-id"
 TEST_EXECUTION_DATE = "2005-04-02T00:00:00+00:00"
 
+TEST_DAG_ID_2 = "test-dag-id-2"
+TEST_TASK_ID_2 = "test-task-id-2"
+
 logical_date_parsed = timezone.parse(TEST_EXECUTION_DATE)
+logical_date_formatted = logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ")
 run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
 
 
@@ -92,18 +97,18 @@ class TestXComEndpoint:
     @pytest.fixture(autouse=True)
     def setup(self) -> None:
         self.clear_db()
+        _create_dag_run()
 
     def teardown_method(self) -> None:
         self.clear_db()
 
-    def create_xcom(self, key, value, backend=XCom) -> None:
-        _create_dag_run()
+    def _create_xcom(self, key, value, backend=XCom) -> None:
         _create_xcom(key, value, backend)
 
 
 class TestGetXComEntry(TestXComEndpoint):
     def test_should_respond_200_stringify(self, test_client):
-        self.create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
+        self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
         response = test_client.get(
             
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}"
         )
@@ -121,7 +126,7 @@ class TestGetXComEntry(TestXComEndpoint):
         }
 
     def test_should_respond_200_native(self, test_client):
-        self.create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
+        self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
         response = test_client.get(
             
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}?stringify=false"
         )
@@ -140,10 +145,10 @@ class TestGetXComEntry(TestXComEndpoint):
 
     def test_should_raise_404_for_non_existent_xcom(self, test_client):
         response = test_client.get(
-            
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY3}"
+            
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY_2}"
         )
         assert response.status_code == 404
-        assert response.json()["detail"] == f"XCom entry with key: 
`{TEST_XCOM_KEY3}` not found"
+        assert response.json()["detail"] == f"XCom entry with key: 
`{TEST_XCOM_KEY_2}` not found"
 
     @pytest.mark.parametrize(
         "support_deserialize, params, expected_status_or_value",
@@ -191,7 +196,7 @@ class TestGetXComEntry(TestXComEndpoint):
         self, support_deserialize: bool, params: str, 
expected_status_or_value: int | str, test_client
     ):
         XCom = resolve_xcom_backend()
-        self.create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE, backend=XCom)
+        self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE, backend=XCom)
 
         url = 
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}"
         with 
mock.patch("airflow.api_fastapi.core_api.routes.public.xcom.XCom", XCom):
@@ -203,3 +208,279 @@ class TestGetXComEntry(TestXComEndpoint):
         else:
             assert response.status_code == 200
             assert response.json()["value"] == expected_status_or_value
+
+
+class TestGetXComEntries(TestXComEndpoint):
+    @pytest.fixture(autouse=True)
+    def setup(self) -> None:
+        self.clear_db()
+
+    def test_should_respond_200(self, test_client):
+        self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, 
TEST_TASK_ID)
+        response = test_client.get(
+            
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries"
+        )
+        assert response.status_code == 200
+        response_data = response.json()
+        for xcom_entry in response_data["xcom_entries"]:
+            xcom_entry["timestamp"] = "TIMESTAMP"
+
+        expected_response = {
+            "xcom_entries": [
+                {
+                    "dag_id": TEST_DAG_ID,
+                    "logical_date": logical_date_formatted,
+                    "key": f"{TEST_XCOM_KEY}-0",
+                    "task_id": TEST_TASK_ID,
+                    "timestamp": "TIMESTAMP",
+                    "map_index": -1,
+                },
+                {
+                    "dag_id": TEST_DAG_ID,
+                    "logical_date": logical_date_formatted,
+                    "key": f"{TEST_XCOM_KEY}-1",
+                    "task_id": TEST_TASK_ID,
+                    "timestamp": "TIMESTAMP",
+                    "map_index": -1,
+                },
+            ],
+            "total_entries": 2,
+        }
+        assert response_data == expected_response
+
+    def test_should_respond_200_with_tilde(self, test_client):
+        self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, 
TEST_TASK_ID)
+        self._create_xcom_entries(TEST_DAG_ID_2, run_id, logical_date_parsed, 
TEST_TASK_ID_2)
+
+        response = 
test_client.get("/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries")
+        assert response.status_code == 200
+        response_data = response.json()
+        for xcom_entry in response_data["xcom_entries"]:
+            xcom_entry["timestamp"] = "TIMESTAMP"
+
+        expected_response = {
+            "xcom_entries": [
+                {
+                    "dag_id": TEST_DAG_ID,
+                    "logical_date": logical_date_formatted,
+                    "key": f"{TEST_XCOM_KEY}-0",
+                    "task_id": TEST_TASK_ID,
+                    "timestamp": "TIMESTAMP",
+                    "map_index": -1,
+                },
+                {
+                    "dag_id": TEST_DAG_ID,
+                    "logical_date": logical_date_formatted,
+                    "key": f"{TEST_XCOM_KEY}-1",
+                    "task_id": TEST_TASK_ID,
+                    "timestamp": "TIMESTAMP",
+                    "map_index": -1,
+                },
+                {
+                    "dag_id": TEST_DAG_ID_2,
+                    "logical_date": logical_date_formatted,
+                    "key": f"{TEST_XCOM_KEY}-0",
+                    "task_id": TEST_TASK_ID_2,
+                    "timestamp": "TIMESTAMP",
+                    "map_index": -1,
+                },
+                {
+                    "dag_id": TEST_DAG_ID_2,
+                    "logical_date": logical_date_formatted,
+                    "key": f"{TEST_XCOM_KEY}-1",
+                    "task_id": TEST_TASK_ID_2,
+                    "timestamp": "TIMESTAMP",
+                    "map_index": -1,
+                },
+            ],
+            "total_entries": 4,
+        }
+        assert response_data == expected_response
+
+    @pytest.mark.parametrize("map_index", (0, 1, None))
+    def test_should_respond_200_with_map_index(self, map_index, test_client):
+        self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, 
TEST_TASK_ID, mapped_ti=True)
+
+        response = test_client.get(
+            "/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries",
+            params={"map_index": map_index} if map_index is not None else None,
+        )
+        assert response.status_code == 200
+        response_data = response.json()
+
+        if map_index is None:
+            expected_entries = [
+                {
+                    "dag_id": TEST_DAG_ID,
+                    "logical_date": logical_date_formatted,
+                    "key": TEST_XCOM_KEY,
+                    "task_id": TEST_TASK_ID,
+                    "timestamp": "TIMESTAMP",
+                    "map_index": idx,
+                }
+                for idx in range(2)
+            ]
+        else:
+            expected_entries = [
+                {
+                    "dag_id": TEST_DAG_ID,
+                    "logical_date": logical_date_formatted,
+                    "key": TEST_XCOM_KEY,
+                    "task_id": TEST_TASK_ID,
+                    "timestamp": "TIMESTAMP",
+                    "map_index": map_index,
+                }
+            ]
+        for xcom_entry in response_data["xcom_entries"]:
+            xcom_entry["timestamp"] = "TIMESTAMP"
+        assert response_data == {
+            "xcom_entries": expected_entries,
+            "total_entries": len(expected_entries),
+        }
+
+    @pytest.mark.parametrize(
+        "key, expected_entries",
+        [
+            (
+                TEST_XCOM_KEY,
+                [
+                    {
+                        "dag_id": TEST_DAG_ID,
+                        "logical_date": logical_date_formatted,
+                        "key": TEST_XCOM_KEY,
+                        "task_id": TEST_TASK_ID,
+                        "timestamp": "TIMESTAMP",
+                        "map_index": 0,
+                    },
+                    {
+                        "dag_id": TEST_DAG_ID,
+                        "logical_date": logical_date_formatted,
+                        "key": TEST_XCOM_KEY,
+                        "task_id": TEST_TASK_ID,
+                        "timestamp": "TIMESTAMP",
+                        "map_index": 1,
+                    },
+                ],
+            ),
+            (f"{TEST_XCOM_KEY}-0", []),
+        ],
+    )
+    def test_should_respond_200_with_xcom_key(self, key, expected_entries, 
test_client):
+        self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, 
TEST_TASK_ID, mapped_ti=True)
+        response = test_client.get(
+            "/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries",
+            params={"xcom_key": key} if key is not None else None,
+        )
+
+        assert response.status_code == 200
+        response_data = response.json()
+        for xcom_entry in response_data["xcom_entries"]:
+            xcom_entry["timestamp"] = "TIMESTAMP"
+        assert response_data == {
+            "xcom_entries": expected_entries,
+            "total_entries": len(expected_entries),
+        }
+
+    @provide_session
+    def _create_xcom_entries(self, dag_id, run_id, logical_date, task_id, 
mapped_ti=False, session=None):
+        dag = DagModel(dag_id=dag_id)
+        session.add(dag)
+        dagrun = DagRun(
+            dag_id=dag_id,
+            run_id=run_id,
+            logical_date=logical_date,
+            start_date=logical_date,
+            run_type=DagRunType.MANUAL,
+        )
+        session.add(dagrun)
+        if mapped_ti:
+            for i in [0, 1]:
+                ti = TaskInstance(EmptyOperator(task_id=task_id), 
run_id=run_id, map_index=i)
+                ti.dag_id = dag_id
+                session.add(ti)
+        else:
+            ti = TaskInstance(EmptyOperator(task_id=task_id), run_id=run_id)
+            ti.dag_id = dag_id
+            session.add(ti)
+        session.commit()
+
+        for i in [0, 1]:
+            if mapped_ti:
+                key = TEST_XCOM_KEY
+                map_index = i
+            else:
+                key = f"{TEST_XCOM_KEY}-{i}"
+                map_index = -1
+
+            XCom.set(
+                key=key,
+                value=TEST_XCOM_VALUE,
+                run_id=run_id,
+                task_id=task_id,
+                dag_id=dag_id,
+                map_index=map_index,
+            )
+
+
+class TestPaginationGetXComEntries(TestXComEndpoint):
+    @pytest.mark.parametrize(
+        "query_params, expected_xcom_ids",
+        [
+            (
+                {"limit": "1"},
+                ["TEST_XCOM_KEY0"],
+            ),
+            (
+                {"limit": "2"},
+                ["TEST_XCOM_KEY0", "TEST_XCOM_KEY1"],
+            ),
+            (
+                {"offset": "5"},
+                [
+                    "TEST_XCOM_KEY5",
+                    "TEST_XCOM_KEY6",
+                    "TEST_XCOM_KEY7",
+                    "TEST_XCOM_KEY8",
+                    "TEST_XCOM_KEY9",
+                ],
+            ),
+            (
+                {"offset": "0"},
+                [
+                    "TEST_XCOM_KEY0",
+                    "TEST_XCOM_KEY1",
+                    "TEST_XCOM_KEY2",
+                    "TEST_XCOM_KEY3",
+                    "TEST_XCOM_KEY4",
+                    "TEST_XCOM_KEY5",
+                    "TEST_XCOM_KEY6",
+                    "TEST_XCOM_KEY7",
+                    "TEST_XCOM_KEY8",
+                    "TEST_XCOM_KEY9",
+                ],
+            ),
+            (
+                {"limit": "1", "offset": "5"},
+                ["TEST_XCOM_KEY5"],
+            ),
+            (
+                {"limit": "1", "offset": "1"},
+                ["TEST_XCOM_KEY1"],
+            ),
+            (
+                {"limit": "2", "offset": "2"},
+                ["TEST_XCOM_KEY2", "TEST_XCOM_KEY3"],
+            ),
+        ],
+    )
+    def test_handle_limit_offset(self, query_params, expected_xcom_ids, 
test_client):
+        for i in range(10):
+            self._create_xcom(f"TEST_XCOM_KEY{i}", TEST_XCOM_VALUE)
+        response = test_client.get(
+            "/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries", 
params=query_params
+        )
+        assert response.status_code == 200
+        response_data = response.json()
+        assert response_data["total_entries"] == 10
+        conn_ids = [conn["key"] for conn in response_data["xcom_entries"] if 
conn]
+        assert conn_ids == expected_xcom_ids

Reply via email to