This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit be86dd37ebc393ef5eee721b15e172e4e1bfd848
Author: Huy Duong <[email protected]>
AuthorDate: Mon Dec 4 15:59:37 2023 +0000

    Add XCom tab to Grid (#35719)
    
    * Add XCom tab to Grid
    
    * Combine showLogs and showXcom logic evaluation to isIndividualTaskInstance
    
    * Remove link to /xcom page from UI grid view
    
    * Use consistent naming to distinguish XcomCollection and XcomEntry
    
    * Refactor boolean vars
    
    (cherry picked from commit 77c01031d6c569d26f6fabd331597b7e87274baa)
---
 airflow/www/static/js/api/index.ts                 |   3 +
 airflow/www/static/js/api/useTaskXcom.ts           |  71 +++++++++++++
 airflow/www/static/js/dag/details/index.tsx        |  71 ++++++++++---
 .../www/static/js/dag/details/taskInstance/Nav.tsx |   3 -
 .../js/dag/details/taskInstance/Xcom/XcomEntry.tsx |  82 +++++++++++++++
 .../js/dag/details/taskInstance/Xcom/index.tsx     | 116 +++++++++++++++++++++
 airflow/www/templates/airflow/dag.html             |   3 +-
 7 files changed, 329 insertions(+), 20 deletions(-)

diff --git a/airflow/www/static/js/api/index.ts 
b/airflow/www/static/js/api/index.ts
index 782a4f99a1..6369a819d2 100644
--- a/airflow/www/static/js/api/index.ts
+++ b/airflow/www/static/js/api/index.ts
@@ -48,6 +48,7 @@ import usePools from "./usePools";
 import useDags from "./useDags";
 import useDagRuns from "./useDagRuns";
 import useHistoricalMetricsData from "./useHistoricalMetricsData";
+import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom";
 
 axios.interceptors.request.use((config) => {
   config.paramsSerializer = {
@@ -91,4 +92,6 @@ export {
   useTaskInstance,
   useUpstreamDatasetEvents,
   useHistoricalMetricsData,
+  useTaskXcomEntry,
+  useTaskXcomCollection,
 };
diff --git a/airflow/www/static/js/api/useTaskXcom.ts 
b/airflow/www/static/js/api/useTaskXcom.ts
new file mode 100644
index 0000000000..1faa19005a
--- /dev/null
+++ b/airflow/www/static/js/api/useTaskXcom.ts
@@ -0,0 +1,71 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import type { API } from "src/types";
+import { getMetaValue } from "src/utils";
+import { useQuery } from "react-query";
+import axios, { AxiosResponse } from "axios";
+
+// tryNumber is not required to get XCom keys or values but is used
+// in query key so refetch will occur if new tries are available
+interface TaskXcomCollectionProps extends API.GetXcomEntriesVariables {
+  tryNumber: number;
+}
+interface TaskXcomProps extends API.GetXcomEntryVariables {
+  tryNumber: number;
+}
+
+export const useTaskXcomCollection = ({
+  dagId,
+  dagRunId,
+  taskId,
+  mapIndex,
+  tryNumber,
+}: TaskXcomCollectionProps) =>
+  useQuery(["taskXcoms", dagId, dagRunId, taskId, mapIndex, tryNumber], () =>
+    axios.get<AxiosResponse, API.XComCollection>(
+      getMetaValue("task_xcom_entries_api")
+        .replace("_DAG_RUN_ID_", dagRunId)
+        .replace("_TASK_ID_", taskId),
+      { params: { map_index: mapIndex } }
+    )
+  );
+
+export const useTaskXcomEntry = ({
+  dagId,
+  dagRunId,
+  taskId,
+  mapIndex,
+  xcomKey,
+  tryNumber,
+}: TaskXcomProps) =>
+  useQuery(
+    ["taskXcom", dagId, dagRunId, taskId, mapIndex, xcomKey, tryNumber],
+    () =>
+      axios.get<AxiosResponse, API.XCom>(
+        getMetaValue("task_xcom_entry_api")
+          .replace("_DAG_RUN_ID_", dagRunId)
+          .replace("_TASK_ID_", taskId)
+          .replace("_XCOM_KEY_", xcomKey),
+        { params: { map_index: mapIndex } }
+      ),
+    {
+      enabled: !!xcomKey,
+    }
+  );
diff --git a/airflow/www/static/js/dag/details/index.tsx 
b/airflow/www/static/js/dag/details/index.tsx
index b476d61950..3c555c701e 100644
--- a/airflow/www/static/js/dag/details/index.tsx
+++ b/airflow/www/static/js/dag/details/index.tsx
@@ -39,6 +39,7 @@ import {
   MdReorder,
   MdCode,
   MdOutlineViewTimeline,
+  MdSyncAlt,
 } from "react-icons/md";
 import { BiBracket } from "react-icons/bi";
 import URLSearchParamsWrapper from "src/utils/URLSearchParamWrapper";
@@ -58,6 +59,7 @@ import ClearRun from "./dagRun/ClearRun";
 import MarkRunAs from "./dagRun/MarkRunAs";
 import ClearInstance from "./taskInstance/taskActions/ClearInstance";
 import MarkInstanceAs from "./taskInstance/taskActions/MarkInstanceAs";
+import XcomCollection from "./taskInstance/Xcom";
 
 const dagId = getMetaValue("dag_id")!;
 
@@ -80,6 +82,8 @@ const tabToIndex = (tab?: string) => {
     case "logs":
     case "mapped_tasks":
       return 4;
+    case "xcom":
+      return 5;
     case "details":
     default:
       return 0;
@@ -89,8 +93,8 @@ const tabToIndex = (tab?: string) => {
 const indexToTab = (
   index: number,
   taskId: string | null,
-  showLogs: boolean,
-  showMappedTasks: boolean
+  isTaskInstance: boolean,
+  isMappedTaskSummary: boolean
 ) => {
   switch (index) {
     case 1:
@@ -100,8 +104,11 @@ const indexToTab = (
     case 3:
       return "code";
     case 4:
-      if (showMappedTasks) return "mapped_tasks";
-      if (showLogs) return "logs";
+      if (isMappedTaskSummary) return "mapped_tasks";
+      if (isTaskInstance) return "logs";
+      return undefined;
+    case 5:
+      if (isTaskInstance) return "xcom";
       return undefined;
     case 0:
     default:
@@ -124,7 +131,6 @@ const Details = ({
   } = useSelection();
   const isDag = !runId && !taskId;
   const isDagRun = runId && !taskId;
-  const isTaskInstance = taskId && runId;
 
   const {
     data: { dagRuns, groups },
@@ -132,12 +138,21 @@ const Details = ({
   const group = getTask({ taskId, task: groups });
   const children = group?.children;
   const isMapped = group?.isMapped;
-
-  const isMappedTaskSummary = isMapped && mapIndex === undefined && taskId;
   const isGroup = !!children;
-  const isGroupOrMappedTaskSummary = isGroup || isMappedTaskSummary;
-  const showLogs = !!(isTaskInstance && !isGroupOrMappedTaskSummary);
-  const showMappedTasks = !!(isTaskInstance && isMappedTaskSummary && 
!isGroup);
+
+  const isMappedTaskSummary = !!(
+    taskId &&
+    runId &&
+    !isGroup &&
+    isMapped &&
+    mapIndex === undefined
+  );
+  const isTaskInstance = !!(
+    taskId &&
+    runId &&
+    !isGroup &&
+    !isMappedTaskSummary
+  );
 
   const [searchParams, setSearchParams] = useSearchParams();
   const tab = searchParams.get(TAB_PARAM) || undefined;
@@ -146,12 +161,17 @@ const Details = ({
   const onChangeTab = useCallback(
     (index: number) => {
       const params = new URLSearchParamsWrapper(searchParams);
-      const newTab = indexToTab(index, taskId, showLogs, showMappedTasks);
+      const newTab = indexToTab(
+        index,
+        taskId,
+        isTaskInstance,
+        isMappedTaskSummary
+      );
       if (newTab) params.set(TAB_PARAM, newTab);
       else params.delete(TAB_PARAM);
       setSearchParams(params);
     },
-    [setSearchParams, searchParams, showLogs, showMappedTasks, taskId]
+    [setSearchParams, searchParams, isTaskInstance, isMappedTaskSummary, 
taskId]
   );
 
   useEffect(() => {
@@ -252,7 +272,7 @@ const Details = ({
               Code
             </Text>
           </Tab>
-          {showLogs && (
+          {isTaskInstance && (
             <Tab>
               <MdReorder size={16} />
               <Text as="strong" ml={1}>
@@ -260,7 +280,7 @@ const Details = ({
               </Text>
             </Tab>
           )}
-          {showMappedTasks && (
+          {isMappedTaskSummary && (
             <Tab>
               <BiBracket size={16} />
               <Text as="strong" ml={1}>
@@ -268,6 +288,14 @@ const Details = ({
               </Text>
             </Tab>
           )}
+          {isTaskInstance && (
+            <Tab>
+              <MdSyncAlt size={16} />
+              <Text as="strong" ml={1}>
+                XCom
+              </Text>
+            </Tab>
+          )}
         </TabList>
         <TabPanels height="100%">
           <TabPanel height="100%">
@@ -304,7 +332,7 @@ const Details = ({
           <TabPanel height="100%">
             <DagCode />
           </TabPanel>
-          {showLogs && run && (
+          {isTaskInstance && run && (
             <TabPanel
               pt={mapIndex !== undefined ? "0px" : undefined}
               height="100%"
@@ -324,7 +352,7 @@ const Details = ({
               />
             </TabPanel>
           )}
-          {showMappedTasks && (
+          {isMappedTaskSummary && (
             <TabPanel height="100%">
               <MappedInstances
                 dagId={dagId}
@@ -336,6 +364,17 @@ const Details = ({
               />
             </TabPanel>
           )}
+          {isTaskInstance && (
+            <TabPanel height="100%">
+              <XcomCollection
+                dagId={dagId}
+                dagRunId={runId}
+                taskId={taskId}
+                mapIndex={mapIndex}
+                tryNumber={instance?.tryNumber}
+              />
+            </TabPanel>
+          )}
         </TabPanels>
       </Tabs>
     </Flex>
diff --git a/airflow/www/static/js/dag/details/taskInstance/Nav.tsx 
b/airflow/www/static/js/dag/details/taskInstance/Nav.tsx
index 3541240ce3..1e5d84d8ba 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Nav.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/Nav.tsx
@@ -30,7 +30,6 @@ const isK8sExecutor = 
getMetaValue("k8s_or_k8scelery_executor") === "True";
 const taskInstancesUrl = getMetaValue("task_instances_list_url");
 const renderedK8sUrl = getMetaValue("rendered_k8s_url");
 const renderedTemplatesUrl = getMetaValue("rendered_templates_url");
-const xcomUrl = getMetaValue("xcom_url");
 const taskUrl = getMetaValue("task_url");
 const gridUrl = getMetaValue("grid_url");
 
@@ -52,7 +51,6 @@ const Nav = forwardRef<HTMLDivElement, Props>(
     });
     const detailsLink = `${taskUrl}&${params}`;
     const renderedLink = `${renderedTemplatesUrl}&${params}`;
-    const xcomLink = `${xcomUrl}&${params}`;
     const k8sLink = `${renderedK8sUrl}&${params}`;
     const listParams = new URLSearchParamsWrapper({
       _flt_3_dag_id: dagId,
@@ -88,7 +86,6 @@ const Nav = forwardRef<HTMLDivElement, Props>(
             {isSubDag && (
               <LinkButton href={subDagLink}>Zoom into SubDag</LinkButton>
             )}
-            <LinkButton href={xcomLink}>XCom</LinkButton>
           </>
         )}
         <LinkButton
diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx 
b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
new file mode 100644
index 0000000000..59e3d06505
--- /dev/null
+++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
@@ -0,0 +1,82 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react";
+import React from "react";
+import { useTaskXcomEntry } from "src/api";
+import type { Dag, DagRun, TaskInstance } from "src/types";
+
+interface Props {
+  dagId: Dag["id"];
+  dagRunId: DagRun["runId"];
+  taskId: TaskInstance["taskId"];
+  mapIndex?: TaskInstance["mapIndex"];
+  xcomKey: string;
+  tryNumber: TaskInstance["tryNumber"];
+}
+
+const XcomEntry = ({
+  dagId,
+  dagRunId,
+  taskId,
+  mapIndex,
+  xcomKey,
+  tryNumber,
+}: Props) => {
+  const {
+    data: xcom,
+    isLoading,
+    error,
+  } = useTaskXcomEntry({
+    dagId,
+    dagRunId,
+    taskId,
+    mapIndex,
+    xcomKey,
+    tryNumber: tryNumber || 1,
+  });
+
+  let content = <Text fontFamily="monospace">{xcom?.value}</Text>;
+  if (isLoading) {
+    content = <Spinner />;
+  } else if (error) {
+    content = (
+      <Alert status="error">
+        <AlertIcon />
+        Error loading XCom entry
+      </Alert>
+    );
+  } else if (!xcom) {
+    content = (
+      <Alert status="info">
+        <AlertIcon />
+        No value found for XCom key
+      </Alert>
+    );
+  }
+
+  return (
+    <Tr>
+      <Td>{xcomKey}</Td>
+      <Td>{content}</Td>
+    </Tr>
+  );
+};
+
+export default XcomEntry;
diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx 
b/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx
new file mode 100644
index 0000000000..065f9ff497
--- /dev/null
+++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx
@@ -0,0 +1,116 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React, { useRef } from "react";
+import type { Dag, DagRun, TaskInstance } from "src/types";
+import {
+  Table,
+  Text,
+  Thead,
+  Tbody,
+  Tr,
+  Td,
+  Spinner,
+  Alert,
+  AlertIcon,
+  Box,
+} from "@chakra-ui/react";
+import { useTaskXcomCollection } from "src/api";
+import { useOffsetTop } from "src/utils";
+import XcomEntry from "./XcomEntry";
+
+interface Props {
+  dagId: Dag["id"];
+  dagRunId: DagRun["runId"];
+  taskId: TaskInstance["taskId"];
+  mapIndex?: TaskInstance["mapIndex"];
+  tryNumber: TaskInstance["tryNumber"];
+}
+
+const XcomCollection = ({
+  dagId,
+  dagRunId,
+  taskId,
+  mapIndex,
+  tryNumber,
+}: Props) => {
+  const taskXcomRef = useRef<HTMLDivElement>(null);
+  const offsetTop = useOffsetTop(taskXcomRef);
+
+  const {
+    data: xcomCollection,
+    isLoading,
+    error,
+  } = useTaskXcomCollection({
+    dagId,
+    dagRunId,
+    taskId,
+    mapIndex,
+    tryNumber: tryNumber || 1,
+  });
+
+  return (
+    <Box
+      ref={taskXcomRef}
+      height="100%"
+      maxHeight={`calc(100% - ${offsetTop}px)`}
+      overflowY="auto"
+    >
+      {isLoading && <Spinner size="xl" thickness="4px" speed="0.65s" />}
+      {!!error && (
+        <Alert status="error" marginBottom="10px">
+          <AlertIcon />
+          An error occurred while fetching task XCom.
+        </Alert>
+      )}
+      {xcomCollection &&
+        (xcomCollection.totalEntries === 0 ? (
+          <Text>No XCom</Text>
+        ) : (
+          <Table variant="striped">
+            <Thead>
+              <Tr>
+                <Td>
+                  <Text as="b">Key</Text>
+                </Td>
+                <Td>
+                  <Text as="b">Value</Text>
+                </Td>
+              </Tr>
+            </Thead>
+            <Tbody>
+              {xcomCollection.xcomEntries?.map((xcomEntry) => (
+                <XcomEntry
+                  key={xcomEntry.key}
+                  dagId={dagId}
+                  dagRunId={dagRunId}
+                  taskId={taskId}
+                  mapIndex={mapIndex}
+                  xcomKey={xcomEntry.key || ""}
+                  tryNumber={tryNumber}
+                />
+              ))}
+            </Tbody>
+          </Table>
+        ))}
+    </Box>
+  );
+};
+
+export default XcomCollection;
diff --git a/airflow/www/templates/airflow/dag.html 
b/airflow/www/templates/airflow/dag.html
index 435dacb50c..8d39d05a40 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -63,13 +63,14 @@
   <meta name="graph_url" content="{{ url_for('Airflow.graph', 
dag_id=dag.dag_id, root=root) }}">
   <meta name="task_url" content="{{ url_for('Airflow.task', dag_id=dag.dag_id) 
}}">
   <meta name="log_url" content="{{ url_for('Airflow.log', dag_id=dag.dag_id) 
}}">
-  <meta name="xcom_url" content="{{ url_for('Airflow.xcom', dag_id=dag.dag_id) 
}}">
   <meta name="rendered_templates_url" content="{{ 
url_for('Airflow.rendered_templates', dag_id=dag.dag_id) }}">
   <meta name="rendered_k8s_url" content="{{ url_for('Airflow.rendered_k8s', 
dag_id=dag.dag_id) }}">
   <meta name="task_instances_list_url" content="{{ 
url_for('TaskInstanceModelView.list') }}">
   <meta name="tag_index_url" content="{{ url_for('Airflow.index', 
tags='_TAG_NAME_') }}">
   <meta name="mapped_instances_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_get_mapped_task_instances',
 dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}">
   <meta name="task_log_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_log_endpoint_get_log', 
dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_', 
task_try_number='-1') }}">
+  <meta name="task_xcom_entries_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_xcom_endpoint_get_xcom_entries',
 dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}">
+  <meta name="task_xcom_entry_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_xcom_endpoint_get_xcom_entry', 
dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_', 
xcom_key='_XCOM_KEY_') }}">
   <meta name="upstream_dataset_events_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_dag_run_endpoint_get_upstream_dataset_events',
 dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_') }}">
   <meta name="task_instance_api" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_get_task_instance',
 dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}">
   <meta name="set_task_instance_note" content="{{ 
url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_set_task_instance_note',
 dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_' ) }}">

Reply via email to