This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 77c01031d6 Add XCom tab to Grid (#35719)
77c01031d6 is described below
commit 77c01031d6c569d26f6fabd331597b7e87274baa
Author: Huy Duong <[email protected]>
AuthorDate: Mon Dec 4 15:59:37 2023 +0000
Add XCom tab to Grid (#35719)
* Add XCom tab to Grid
* Combine showLogs and showXcom logic evaluation to isIndividualTaskInstance
* Remove link to /xcom page from UI grid view
* Use consistent naming to distinguish XcomCollection and XcomEntry
* Refactor boolean vars
---
airflow/www/static/js/api/index.ts | 3 +
airflow/www/static/js/api/useTaskXcom.ts | 71 +++++++++++++
airflow/www/static/js/dag/details/index.tsx | 71 ++++++++++---
.../www/static/js/dag/details/taskInstance/Nav.tsx | 3 -
.../js/dag/details/taskInstance/Xcom/XcomEntry.tsx | 82 +++++++++++++++
.../js/dag/details/taskInstance/Xcom/index.tsx | 116 +++++++++++++++++++++
airflow/www/templates/airflow/dag.html | 3 +-
7 files changed, 329 insertions(+), 20 deletions(-)
diff --git a/airflow/www/static/js/api/index.ts
b/airflow/www/static/js/api/index.ts
index 782a4f99a1..6369a819d2 100644
--- a/airflow/www/static/js/api/index.ts
+++ b/airflow/www/static/js/api/index.ts
@@ -48,6 +48,7 @@ import usePools from "./usePools";
import useDags from "./useDags";
import useDagRuns from "./useDagRuns";
import useHistoricalMetricsData from "./useHistoricalMetricsData";
+import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom";
axios.interceptors.request.use((config) => {
config.paramsSerializer = {
@@ -91,4 +92,6 @@ export {
useTaskInstance,
useUpstreamDatasetEvents,
useHistoricalMetricsData,
+ useTaskXcomEntry,
+ useTaskXcomCollection,
};
diff --git a/airflow/www/static/js/api/useTaskXcom.ts
b/airflow/www/static/js/api/useTaskXcom.ts
new file mode 100644
index 0000000000..1faa19005a
--- /dev/null
+++ b/airflow/www/static/js/api/useTaskXcom.ts
@@ -0,0 +1,71 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import type { API } from "src/types";
+import { getMetaValue } from "src/utils";
+import { useQuery } from "react-query";
+import axios, { AxiosResponse } from "axios";
+
+// tryNumber is not required to get XCom keys or values but is used
+// in query key so refetch will occur if new tries are available
+interface TaskXcomCollectionProps extends API.GetXcomEntriesVariables {
+ tryNumber: number;
+}
+interface TaskXcomProps extends API.GetXcomEntryVariables {
+ tryNumber: number;
+}
+
+export const useTaskXcomCollection = ({
+ dagId,
+ dagRunId,
+ taskId,
+ mapIndex,
+ tryNumber,
+}: TaskXcomCollectionProps) =>
+ useQuery(["taskXcoms", dagId, dagRunId, taskId, mapIndex, tryNumber], () =>
+ axios.get<AxiosResponse, API.XComCollection>(
+ getMetaValue("task_xcom_entries_api")
+ .replace("_DAG_RUN_ID_", dagRunId)
+ .replace("_TASK_ID_", taskId),
+ { params: { map_index: mapIndex } }
+ )
+ );
+
+export const useTaskXcomEntry = ({
+ dagId,
+ dagRunId,
+ taskId,
+ mapIndex,
+ xcomKey,
+ tryNumber,
+}: TaskXcomProps) =>
+ useQuery(
+ ["taskXcom", dagId, dagRunId, taskId, mapIndex, xcomKey, tryNumber],
+ () =>
+ axios.get<AxiosResponse, API.XCom>(
+ getMetaValue("task_xcom_entry_api")
+ .replace("_DAG_RUN_ID_", dagRunId)
+ .replace("_TASK_ID_", taskId)
+ .replace("_XCOM_KEY_", xcomKey),
+ { params: { map_index: mapIndex } }
+ ),
+ {
+ enabled: !!xcomKey,
+ }
+ );
diff --git a/airflow/www/static/js/dag/details/index.tsx
b/airflow/www/static/js/dag/details/index.tsx
index b476d61950..3c555c701e 100644
--- a/airflow/www/static/js/dag/details/index.tsx
+++ b/airflow/www/static/js/dag/details/index.tsx
@@ -39,6 +39,7 @@ import {
MdReorder,
MdCode,
MdOutlineViewTimeline,
+ MdSyncAlt,
} from "react-icons/md";
import { BiBracket } from "react-icons/bi";
import URLSearchParamsWrapper from "src/utils/URLSearchParamWrapper";
@@ -58,6 +59,7 @@ import ClearRun from "./dagRun/ClearRun";
import MarkRunAs from "./dagRun/MarkRunAs";
import ClearInstance from "./taskInstance/taskActions/ClearInstance";
import MarkInstanceAs from "./taskInstance/taskActions/MarkInstanceAs";
+import XcomCollection from "./taskInstance/Xcom";
const dagId = getMetaValue("dag_id")!;
@@ -80,6 +82,8 @@ const tabToIndex = (tab?: string) => {
case "logs":
case "mapped_tasks":
return 4;
+ case "xcom":
+ return 5;
case "details":
default:
return 0;
@@ -89,8 +93,8 @@ const tabToIndex = (tab?: string) => {
const indexToTab = (
index: number,
taskId: string | null,
- showLogs: boolean,
- showMappedTasks: boolean
+ isTaskInstance: boolean,
+ isMappedTaskSummary: boolean
) => {
switch (index) {
case 1:
@@ -100,8 +104,11 @@ const indexToTab = (
case 3:
return "code";
case 4:
- if (showMappedTasks) return "mapped_tasks";
- if (showLogs) return "logs";
+ if (isMappedTaskSummary) return "mapped_tasks";
+ if (isTaskInstance) return "logs";
+ return undefined;
+ case 5:
+ if (isTaskInstance) return "xcom";
return undefined;
case 0:
default:
@@ -124,7 +131,6 @@ const Details = ({
} = useSelection();
const isDag = !runId && !taskId;
const isDagRun = runId && !taskId;
- const isTaskInstance = taskId && runId;
const {
data: { dagRuns, groups },
@@ -132,12 +138,21 @@ const Details = ({
const group = getTask({ taskId, task: groups });
const children = group?.children;
const isMapped = group?.isMapped;
-
- const isMappedTaskSummary = isMapped && mapIndex === undefined && taskId;
const isGroup = !!children;
- const isGroupOrMappedTaskSummary = isGroup || isMappedTaskSummary;
- const showLogs = !!(isTaskInstance && !isGroupOrMappedTaskSummary);
- const showMappedTasks = !!(isTaskInstance && isMappedTaskSummary &&
!isGroup);
+
+ const isMappedTaskSummary = !!(
+ taskId &&
+ runId &&
+ !isGroup &&
+ isMapped &&
+ mapIndex === undefined
+ );
+ const isTaskInstance = !!(
+ taskId &&
+ runId &&
+ !isGroup &&
+ !isMappedTaskSummary
+ );
const [searchParams, setSearchParams] = useSearchParams();
const tab = searchParams.get(TAB_PARAM) || undefined;
@@ -146,12 +161,17 @@ const Details = ({
const onChangeTab = useCallback(
(index: number) => {
const params = new URLSearchParamsWrapper(searchParams);
- const newTab = indexToTab(index, taskId, showLogs, showMappedTasks);
+ const newTab = indexToTab(
+ index,
+ taskId,
+ isTaskInstance,
+ isMappedTaskSummary
+ );
if (newTab) params.set(TAB_PARAM, newTab);
else params.delete(TAB_PARAM);
setSearchParams(params);
},
- [setSearchParams, searchParams, showLogs, showMappedTasks, taskId]
+ [setSearchParams, searchParams, isTaskInstance, isMappedTaskSummary,
taskId]
);
useEffect(() => {
@@ -252,7 +272,7 @@ const Details = ({
Code
</Text>
</Tab>
- {showLogs && (
+ {isTaskInstance && (
<Tab>
<MdReorder size={16} />
<Text as="strong" ml={1}>
@@ -260,7 +280,7 @@ const Details = ({
</Text>
</Tab>
)}
- {showMappedTasks && (
+ {isMappedTaskSummary && (
<Tab>
<BiBracket size={16} />
<Text as="strong" ml={1}>
@@ -268,6 +288,14 @@ const Details = ({
</Text>
</Tab>
)}
+ {isTaskInstance && (
+ <Tab>
+ <MdSyncAlt size={16} />
+ <Text as="strong" ml={1}>
+ XCom
+ </Text>
+ </Tab>
+ )}
</TabList>
<TabPanels height="100%">
<TabPanel height="100%">
@@ -304,7 +332,7 @@ const Details = ({
<TabPanel height="100%">
<DagCode />
</TabPanel>
- {showLogs && run && (
+ {isTaskInstance && run && (
<TabPanel
pt={mapIndex !== undefined ? "0px" : undefined}
height="100%"
@@ -324,7 +352,7 @@ const Details = ({
/>
</TabPanel>
)}
- {showMappedTasks && (
+ {isMappedTaskSummary && (
<TabPanel height="100%">
<MappedInstances
dagId={dagId}
@@ -336,6 +364,17 @@ const Details = ({
/>
</TabPanel>
)}
+ {isTaskInstance && (
+ <TabPanel height="100%">
+ <XcomCollection
+ dagId={dagId}
+ dagRunId={runId}
+ taskId={taskId}
+ mapIndex={mapIndex}
+ tryNumber={instance?.tryNumber}
+ />
+ </TabPanel>
+ )}
</TabPanels>
</Tabs>
</Flex>
diff --git a/airflow/www/static/js/dag/details/taskInstance/Nav.tsx
b/airflow/www/static/js/dag/details/taskInstance/Nav.tsx
index 3541240ce3..1e5d84d8ba 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Nav.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/Nav.tsx
@@ -30,7 +30,6 @@ const isK8sExecutor =
getMetaValue("k8s_or_k8scelery_executor") === "True";
const taskInstancesUrl = getMetaValue("task_instances_list_url");
const renderedK8sUrl = getMetaValue("rendered_k8s_url");
const renderedTemplatesUrl = getMetaValue("rendered_templates_url");
-const xcomUrl = getMetaValue("xcom_url");
const taskUrl = getMetaValue("task_url");
const gridUrl = getMetaValue("grid_url");
@@ -52,7 +51,6 @@ const Nav = forwardRef<HTMLDivElement, Props>(
});
const detailsLink = `${taskUrl}&${params}`;
const renderedLink = `${renderedTemplatesUrl}&${params}`;
- const xcomLink = `${xcomUrl}&${params}`;
const k8sLink = `${renderedK8sUrl}&${params}`;
const listParams = new URLSearchParamsWrapper({
_flt_3_dag_id: dagId,
@@ -88,7 +86,6 @@ const Nav = forwardRef<HTMLDivElement, Props>(
{isSubDag && (
<LinkButton href={subDagLink}>Zoom into SubDag</LinkButton>
)}
- <LinkButton href={xcomLink}>XCom</LinkButton>
</>
)}
<LinkButton
diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
new file mode 100644
index 0000000000..59e3d06505
--- /dev/null
+++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
@@ -0,0 +1,82 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react";
+import React from "react";
+import { useTaskXcomEntry } from "src/api";
+import type { Dag, DagRun, TaskInstance } from "src/types";
+
+interface Props {
+ dagId: Dag["id"];
+ dagRunId: DagRun["runId"];
+ taskId: TaskInstance["taskId"];
+ mapIndex?: TaskInstance["mapIndex"];
+ xcomKey: string;
+ tryNumber: TaskInstance["tryNumber"];
+}
+
+const XcomEntry = ({
+ dagId,
+ dagRunId,
+ taskId,
+ mapIndex,
+ xcomKey,
+ tryNumber,
+}: Props) => {
+ const {
+ data: xcom,
+ isLoading,
+ error,
+ } = useTaskXcomEntry({
+ dagId,
+ dagRunId,
+ taskId,
+ mapIndex,
+ xcomKey,
+ tryNumber: tryNumber || 1,
+ });
+
+ let content = <Text fontFamily="monospace">{xcom?.value}</Text>;
+ if (isLoading) {
+ content = <Spinner />;
+ } else if (error) {
+ content = (
+ <Alert status="error">
+ <AlertIcon />
+ Error loading XCom entry
+ </Alert>
+ );
+ } else if (!xcom) {
+ content = (
+ <Alert status="info">
+ <AlertIcon />
+ No value found for XCom key
+ </Alert>
+ );
+ }
+
+ return (
+ <Tr>
+ <Td>{xcomKey}</Td>
+ <Td>{content}</Td>
+ </Tr>
+ );
+};
+
+export default XcomEntry;
diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx
b/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx
new file mode 100644
index 0000000000..065f9ff497
--- /dev/null
+++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx
@@ -0,0 +1,116 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React, { useRef } from "react";
+import type { Dag, DagRun, TaskInstance } from "src/types";
+import {
+ Table,
+ Text,
+ Thead,
+ Tbody,
+ Tr,
+ Td,
+ Spinner,
+ Alert,
+ AlertIcon,
+ Box,
+} from "@chakra-ui/react";
+import { useTaskXcomCollection } from "src/api";
+import { useOffsetTop } from "src/utils";
+import XcomEntry from "./XcomEntry";
+
+interface Props {
+ dagId: Dag["id"];
+ dagRunId: DagRun["runId"];
+ taskId: TaskInstance["taskId"];
+ mapIndex?: TaskInstance["mapIndex"];
+ tryNumber: TaskInstance["tryNumber"];
+}
+
+const XcomCollection = ({
+ dagId,
+ dagRunId,
+ taskId,
+ mapIndex,
+ tryNumber,
+}: Props) => {
+ const taskXcomRef = useRef<HTMLDivElement>(null);
+ const offsetTop = useOffsetTop(taskXcomRef);
+
+ const {
+ data: xcomCollection,
+ isLoading,
+ error,
+ } = useTaskXcomCollection({
+ dagId,
+ dagRunId,
+ taskId,
+ mapIndex,
+ tryNumber: tryNumber || 1,
+ });
+
+ return (
+ <Box
+ ref={taskXcomRef}
+ height="100%"
+ maxHeight={`calc(100% - ${offsetTop}px)`}
+ overflowY="auto"
+ >
+ {isLoading && <Spinner size="xl" thickness="4px" speed="0.65s" />}
+ {!!error && (
+ <Alert status="error" marginBottom="10px">
+ <AlertIcon />
+ An error occurred while fetching task XCom.
+ </Alert>
+ )}
+ {xcomCollection &&
+ (xcomCollection.totalEntries === 0 ? (
+ <Text>No XCom</Text>
+ ) : (
+ <Table variant="striped">
+ <Thead>
+ <Tr>
+ <Td>
+ <Text as="b">Key</Text>
+ </Td>
+ <Td>
+ <Text as="b">Value</Text>
+ </Td>
+ </Tr>
+ </Thead>
+ <Tbody>
+ {xcomCollection.xcomEntries?.map((xcomEntry) => (
+ <XcomEntry
+ key={xcomEntry.key}
+ dagId={dagId}
+ dagRunId={dagRunId}
+ taskId={taskId}
+ mapIndex={mapIndex}
+ xcomKey={xcomEntry.key || ""}
+ tryNumber={tryNumber}
+ />
+ ))}
+ </Tbody>
+ </Table>
+ ))}
+ </Box>
+ );
+};
+
+export default XcomCollection;
diff --git a/airflow/www/templates/airflow/dag.html
b/airflow/www/templates/airflow/dag.html
index 435dacb50c..8d39d05a40 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -63,13 +63,14 @@
<meta name="graph_url" content="{{ url_for('Airflow.graph',
dag_id=dag.dag_id, root=root) }}">
<meta name="task_url" content="{{ url_for('Airflow.task', dag_id=dag.dag_id)
}}">
<meta name="log_url" content="{{ url_for('Airflow.log', dag_id=dag.dag_id)
}}">
- <meta name="xcom_url" content="{{ url_for('Airflow.xcom', dag_id=dag.dag_id)
}}">
<meta name="rendered_templates_url" content="{{
url_for('Airflow.rendered_templates', dag_id=dag.dag_id) }}">
<meta name="rendered_k8s_url" content="{{ url_for('Airflow.rendered_k8s',
dag_id=dag.dag_id) }}">
<meta name="task_instances_list_url" content="{{
url_for('TaskInstanceModelView.list') }}">
<meta name="tag_index_url" content="{{ url_for('Airflow.index',
tags='_TAG_NAME_') }}">
<meta name="mapped_instances_api" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_get_mapped_task_instances',
dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}">
<meta name="task_log_api" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_log_endpoint_get_log',
dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_',
task_try_number='-1') }}">
+ <meta name="task_xcom_entries_api" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_xcom_endpoint_get_xcom_entries',
dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}">
+ <meta name="task_xcom_entry_api" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_xcom_endpoint_get_xcom_entry',
dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_',
xcom_key='_XCOM_KEY_') }}">
<meta name="upstream_dataset_events_api" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_dag_run_endpoint_get_upstream_dataset_events',
dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_') }}">
<meta name="task_instance_api" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_get_task_instance',
dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}">
<meta name="set_task_instance_note" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_set_task_instance_note',
dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_' ) }}">