This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c591e5b80 Make XCom display as react json (#40640)
6c591e5b80 is described below

commit 6c591e5b8016a7c46d788a42690a8e176746a967
Author: Jens Scheffler <[email protected]>
AuthorDate: Mon Jul 22 06:58:32 2024 +0200

    Make XCom display as react json (#40640)
    
    * Enable proper JSON view in Xcom display as well
    
    * Review feedback
    
    * Implement a native endpoint for XCom API to prevent retrieval as Python 
JSON string
    
    * Fix pytests
---
 airflow/api_connexion/endpoints/xcom_endpoint.py   | 13 ++++++--
 airflow/api_connexion/openapi/v1.yaml              | 24 ++++++++++++--
 airflow/api_connexion/schemas/xcom_schema.py       | 15 ++++++---
 airflow/www/static/js/api/useTaskXcom.ts           |  2 +-
 .../js/dag/details/taskInstance/Xcom/XcomEntry.tsx | 19 +++++++++--
 airflow/www/static/js/types/api-generated.ts       | 18 ++++++++--
 .../api_connexion/endpoints/test_xcom_endpoint.py  | 38 +++++++++++++++++++---
 tests/api_connexion/schemas/test_xcom_schema.py    |  6 ++--
 8 files changed, 113 insertions(+), 22 deletions(-)

diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py 
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index 68a9ab2d45..59fa9f5aca 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -25,7 +25,12 @@ from sqlalchemy import and_, select
 from airflow.api_connexion import security
 from airflow.api_connexion.exceptions import BadRequest, NotFound
 from airflow.api_connexion.parameters import check_limit, format_parameters
-from airflow.api_connexion.schemas.xcom_schema import XComCollection, 
xcom_collection_schema, xcom_schema
+from airflow.api_connexion.schemas.xcom_schema import (
+    XComCollection,
+    xcom_collection_schema,
+    xcom_schema_native,
+    xcom_schema_string,
+)
 from airflow.auth.managers.models.resource_details import DagAccessEntity
 from airflow.models import DagRun as DR, XCom
 from airflow.settings import conf
@@ -88,6 +93,7 @@ def get_xcom_entry(
     xcom_key: str,
     map_index: int = -1,
     deserialize: bool = False,
+    stringify: bool = True,
     session: Session = NEW_SESSION,
 ) -> APIResponse:
     """Get an XCom entry."""
@@ -119,4 +125,7 @@ def get_xcom_entry(
         stub.value = XCom.deserialize_value(stub)
         item = stub
 
-    return xcom_schema.dump(item)
+    if stringify:
+        return xcom_schema_string.dump(item)
+
+    return xcom_schema_native.dump(item)
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index 798f04ddb1..a620562f88 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -1904,6 +1904,19 @@ paths:
             This parameter is not meaningful when using the default XCom 
backend.
 
             *New in version 2.4.0*
+        - in: query
+          name: stringify
+          schema:
+            type: boolean
+            default: true
+          required: false
+          description: |
+            Whether to convert the XCom value to be a string. XCom values can 
be of Any data type.
+
+            If set to true (default) the Any value will be returned as string, 
e.g. a Python representation
+            of a dict. If set to false it will return the raw data as dict, 
list, string or whatever was stored.
+
+            *New in version 2.10.0*
       responses:
         "200":
           description: Success.
@@ -3922,8 +3935,15 @@ components:
         - type: object
           properties:
             value:
-              type: string
-              description: The value
+              anyOf:
+                - type: string
+                - type: number
+                - type: integer
+                - type: boolean
+                - type: array
+                  items: {}
+                - type: object
+              description: The value(s),
 
     # Python objects
     # Based on
diff --git a/airflow/api_connexion/schemas/xcom_schema.py 
b/airflow/api_connexion/schemas/xcom_schema.py
index 5894db8b1a..625f05bd14 100644
--- a/airflow/api_connexion/schemas/xcom_schema.py
+++ b/airflow/api_connexion/schemas/xcom_schema.py
@@ -40,10 +40,16 @@ class XComCollectionItemSchema(SQLAlchemySchema):
     dag_id = auto_field()
 
 
-class XComSchema(XComCollectionItemSchema):
-    """XCom schema."""
+class XComSchemaNative(XComCollectionItemSchema):
+    """XCom schema with native return type."""
 
-    value = auto_field()
+    value = fields.Raw()
+
+
+class XComSchemaString(XComCollectionItemSchema):
+    """XCom schema forced to be string."""
+
+    value = fields.String()
 
 
 class XComCollection(NamedTuple):
@@ -60,6 +66,7 @@ class XComCollectionSchema(Schema):
     total_entries = fields.Int()
 
 
-xcom_schema = XComSchema()
+xcom_schema_native = XComSchemaNative()
+xcom_schema_string = XComSchemaString()
 xcom_collection_item_schema = XComCollectionItemSchema()
 xcom_collection_schema = XComCollectionSchema()
diff --git a/airflow/www/static/js/api/useTaskXcom.ts 
b/airflow/www/static/js/api/useTaskXcom.ts
index 1faa19005a..403233285e 100644
--- a/airflow/www/static/js/api/useTaskXcom.ts
+++ b/airflow/www/static/js/api/useTaskXcom.ts
@@ -63,7 +63,7 @@ export const useTaskXcomEntry = ({
           .replace("_DAG_RUN_ID_", dagRunId)
           .replace("_TASK_ID_", taskId)
           .replace("_XCOM_KEY_", xcomKey),
-        { params: { map_index: mapIndex } }
+        { params: { map_index: mapIndex, stringify: false } }
       ),
     {
       enabled: !!xcomKey,
diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx 
b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
index 8523181ae2..2e9ba769ae 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react";
+import { Alert, AlertIcon, Spinner, Td, Tr } from "@chakra-ui/react";
 import React from "react";
 
 import { useTaskXcomEntry } from "src/api";
 import ErrorAlert from "src/components/ErrorAlert";
+import RenderedJsonField from "src/components/RenderedJsonField";
 import type { Dag, DagRun, TaskInstance } from "src/types";
 
 interface Props {
@@ -54,18 +55,30 @@ const XcomEntry = ({
     tryNumber: tryNumber || 1,
   });
 
-  let content = <Text fontFamily="monospace">{xcom?.value}</Text>;
+  let content = null;
   if (isLoading) {
     content = <Spinner />;
   } else if (error) {
     content = <ErrorAlert error={error} />;
-  } else if (!xcom) {
+  } else if (!xcom || !xcom.value) {
     content = (
       <Alert status="info">
         <AlertIcon />
         No value found for XCom key
       </Alert>
     );
+  } else {
+    let xcomString = "";
+    if (typeof xcom.value !== "string") {
+      try {
+        xcomString = JSON.stringify(xcom.value);
+      } catch (e) {
+        // skip
+      }
+    } else {
+      xcomString = xcom.value as string;
+    }
+    content = <RenderedJsonField content={xcomString} />;
   }
 
   return (
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index af37f361ea..f3948c545e 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1599,8 +1599,13 @@ export interface components {
     } & components["schemas"]["CollectionInfo"];
     /** @description Full representations of XCom entry. */
     XCom: components["schemas"]["XComCollectionItem"] & {
-      /** @description The value */
-      value?: string;
+      /** @description The value(s), */
+      value?: Partial<string> &
+        Partial<number> &
+        Partial<number> &
+        Partial<boolean> &
+        Partial<unknown[]> &
+        Partial<{ [key: string]: unknown }>;
     };
     /**
      * @description DAG details.
@@ -4439,6 +4444,15 @@ export interface operations {
          * *New in version 2.4.0*
          */
         deserialize?: boolean;
+        /**
+         * Whether to convert the XCom value to be a string. XCom values can 
be of Any data type.
+         *
+         * If set to true (default) the Any value will be returned as string, 
e.g. a Python representation
+         * of a dict. If set to false it will return the raw data as dict, 
list, string or whatever was stored.
+         *
+         * *New in version 2.10.0*
+         */
+        stringify?: boolean;
       };
     };
     responses: {
diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py 
b/tests/api_connexion/endpoints/test_xcom_endpoint.py
index 1e4dbb5678..318e97842d 100644
--- a/tests/api_connexion/endpoints/test_xcom_endpoint.py
+++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py
@@ -122,14 +122,14 @@ class TestXComEndpoint:
 
 
 class TestGetXComEntry(TestXComEndpoint):
-    def test_should_respond_200(self):
+    def test_should_respond_200_stringify(self):
         dag_id = "test-dag-id"
         task_id = "test-task-id"
         execution_date = "2005-04-02T00:00:00+00:00"
         xcom_key = "test-xcom-key"
         execution_date_parsed = parse_execution_date(execution_date)
         run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
execution_date_parsed)
-        self._create_xcom_entry(dag_id, run_id, execution_date_parsed, 
task_id, xcom_key)
+        self._create_xcom_entry(dag_id, run_id, execution_date_parsed, 
task_id, xcom_key, {"key": "value"})
         response = self.client.get(
             
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
             environ_overrides={"REMOTE_USER": "test"},
@@ -145,7 +145,33 @@ class TestGetXComEntry(TestXComEndpoint):
             "task_id": task_id,
             "map_index": -1,
             "timestamp": "TIMESTAMP",
-            "value": "TEST_VALUE",
+            "value": "{'key': 'value'}",
+        }
+
+    def test_should_respond_200_native(self):
+        dag_id = "test-dag-id"
+        task_id = "test-task-id"
+        execution_date = "2005-04-02T00:00:00+00:00"
+        xcom_key = "test-xcom-key"
+        execution_date_parsed = parse_execution_date(execution_date)
+        run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
execution_date_parsed)
+        self._create_xcom_entry(dag_id, run_id, execution_date_parsed, 
task_id, xcom_key, {"key": "value"})
+        response = self.client.get(
+            
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}?stringify=false",
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert 200 == response.status_code
+
+        current_data = response.json
+        current_data["timestamp"] = "TIMESTAMP"
+        assert current_data == {
+            "dag_id": dag_id,
+            "execution_date": execution_date,
+            "key": xcom_key,
+            "task_id": task_id,
+            "map_index": -1,
+            "timestamp": "TIMESTAMP",
+            "value": {"key": "value"},
         }
 
     def test_should_raise_404_for_non_existent_xcom(self):
@@ -192,7 +218,9 @@ class TestGetXComEntry(TestXComEndpoint):
         )
         assert response.status_code == 403
 
-    def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, 
xcom_key, *, backend=XCom):
+    def _create_xcom_entry(
+        self, dag_id, run_id, execution_date, task_id, xcom_key, 
xcom_value="TEST_VALUE", *, backend=XCom
+    ):
         with create_session() as session:
             dagrun = DagRun(
                 dag_id=dag_id,
@@ -207,7 +235,7 @@ class TestGetXComEntry(TestXComEndpoint):
             session.add(ti)
         backend.set(
             key=xcom_key,
-            value="TEST_VALUE",
+            value=xcom_value,
             run_id=run_id,
             task_id=task_id,
             dag_id=dag_id,
diff --git a/tests/api_connexion/schemas/test_xcom_schema.py 
b/tests/api_connexion/schemas/test_xcom_schema.py
index eb3220626d..7a10b7e7a4 100644
--- a/tests/api_connexion/schemas/test_xcom_schema.py
+++ b/tests/api_connexion/schemas/test_xcom_schema.py
@@ -25,7 +25,7 @@ from airflow.api_connexion.schemas.xcom_schema import (
     XComCollection,
     xcom_collection_item_schema,
     xcom_collection_schema,
-    xcom_schema,
+    xcom_schema_string,
 )
 from airflow.models import DagRun, XCom
 from airflow.utils.dates import parse_execution_date
@@ -199,7 +199,7 @@ class TestXComSchema:
             value=pickle.dumps(b"test_binary"),
         )
         xcom_model = session.query(XCom).first()
-        deserialized_xcom = xcom_schema.dump(xcom_model)
+        deserialized_xcom = xcom_schema_string.dump(xcom_model)
         assert deserialized_xcom == {
             "key": "test_key",
             "timestamp": self.default_time,
@@ -220,7 +220,7 @@ class TestXComSchema:
             "dag_id": "test_dag",
             "value": b"test_binary",
         }
-        result = xcom_schema.load(xcom_dump)
+        result = xcom_schema_string.load(xcom_dump)
         assert result == {
             "key": "test_key",
             "timestamp": self.default_time_parsed,

Reply via email to