This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit be86dd37ebc393ef5eee721b15e172e4e1bfd848 Author: Huy Duong <[email protected]> AuthorDate: Mon Dec 4 15:59:37 2023 +0000 Add XCom tab to Grid (#35719) * Add XCom tab to Grid * Combine showLogs and showXcom logic evaluation to isIndividualTaskInstance * Remove link to /xcom page from UI grid view * Use consistent naming to distinguish XcomCollection and XcomEntry * Refactor boolean vars (cherry picked from commit 77c01031d6c569d26f6fabd331597b7e87274baa) --- airflow/www/static/js/api/index.ts | 3 + airflow/www/static/js/api/useTaskXcom.ts | 71 +++++++++++++ airflow/www/static/js/dag/details/index.tsx | 71 ++++++++++--- .../www/static/js/dag/details/taskInstance/Nav.tsx | 3 - .../js/dag/details/taskInstance/Xcom/XcomEntry.tsx | 82 +++++++++++++++ .../js/dag/details/taskInstance/Xcom/index.tsx | 116 +++++++++++++++++++++ airflow/www/templates/airflow/dag.html | 3 +- 7 files changed, 329 insertions(+), 20 deletions(-) diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index 782a4f99a1..6369a819d2 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -48,6 +48,7 @@ import usePools from "./usePools"; import useDags from "./useDags"; import useDagRuns from "./useDagRuns"; import useHistoricalMetricsData from "./useHistoricalMetricsData"; +import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom"; axios.interceptors.request.use((config) => { config.paramsSerializer = { @@ -91,4 +92,6 @@ export { useTaskInstance, useUpstreamDatasetEvents, useHistoricalMetricsData, + useTaskXcomEntry, + useTaskXcomCollection, }; diff --git a/airflow/www/static/js/api/useTaskXcom.ts b/airflow/www/static/js/api/useTaskXcom.ts new file mode 100644 index 0000000000..1faa19005a --- /dev/null +++ b/airflow/www/static/js/api/useTaskXcom.ts @@ -0,0 +1,71 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import type { API } from "src/types"; +import { getMetaValue } from "src/utils"; +import { useQuery } from "react-query"; +import axios, { AxiosResponse } from "axios"; + +// tryNumber is not required to get XCom keys or values but is used +// in query key so refetch will occur if new tries are available +interface TaskXcomCollectionProps extends API.GetXcomEntriesVariables { + tryNumber: number; +} +interface TaskXcomProps extends API.GetXcomEntryVariables { + tryNumber: number; +} + +export const useTaskXcomCollection = ({ + dagId, + dagRunId, + taskId, + mapIndex, + tryNumber, +}: TaskXcomCollectionProps) => + useQuery(["taskXcoms", dagId, dagRunId, taskId, mapIndex, tryNumber], () => + axios.get<AxiosResponse, API.XComCollection>( + getMetaValue("task_xcom_entries_api") + .replace("_DAG_RUN_ID_", dagRunId) + .replace("_TASK_ID_", taskId), + { params: { map_index: mapIndex } } + ) + ); + +export const useTaskXcomEntry = ({ + dagId, + dagRunId, + taskId, + mapIndex, + xcomKey, + tryNumber, +}: TaskXcomProps) => + useQuery( + ["taskXcom", dagId, dagRunId, taskId, mapIndex, xcomKey, tryNumber], + () => + axios.get<AxiosResponse, API.XCom>( + getMetaValue("task_xcom_entry_api") + .replace("_DAG_RUN_ID_", dagRunId) + .replace("_TASK_ID_", taskId) + .replace("_XCOM_KEY_", xcomKey), + { params: { map_index: mapIndex } } + ), + { + enabled: !!xcomKey, + } + ); diff --git a/airflow/www/static/js/dag/details/index.tsx b/airflow/www/static/js/dag/details/index.tsx index b476d61950..3c555c701e 100644 --- a/airflow/www/static/js/dag/details/index.tsx +++ b/airflow/www/static/js/dag/details/index.tsx @@ -39,6 +39,7 @@ import { MdReorder, MdCode, MdOutlineViewTimeline, + MdSyncAlt, } from "react-icons/md"; import { BiBracket } from "react-icons/bi"; import URLSearchParamsWrapper from "src/utils/URLSearchParamWrapper"; @@ -58,6 +59,7 @@ import ClearRun from "./dagRun/ClearRun"; import MarkRunAs from "./dagRun/MarkRunAs"; import ClearInstance from "./taskInstance/taskActions/ClearInstance"; import MarkInstanceAs from "./taskInstance/taskActions/MarkInstanceAs"; +import XcomCollection from "./taskInstance/Xcom"; const dagId = getMetaValue("dag_id")!; @@ -80,6 +82,8 @@ const tabToIndex = (tab?: string) => { case "logs": case "mapped_tasks": return 4; + case "xcom": + return 5; case "details": default: return 0; @@ -89,8 +93,8 @@ const tabToIndex = (tab?: string) => { const indexToTab = ( index: number, taskId: string | null, - showLogs: boolean, - showMappedTasks: boolean + isTaskInstance: boolean, + isMappedTaskSummary: boolean ) => { switch (index) { case 1: @@ -100,8 +104,11 @@ const indexToTab = ( case 3: return "code"; case 4: - if (showMappedTasks) return "mapped_tasks"; - if (showLogs) return "logs"; + if (isMappedTaskSummary) return "mapped_tasks"; + if (isTaskInstance) return "logs"; + return undefined; + case 5: + if (isTaskInstance) return "xcom"; return undefined; case 0: default: @@ -124,7 +131,6 @@ const Details = ({ } = useSelection(); const isDag = !runId && !taskId; const isDagRun = runId && !taskId; - const isTaskInstance = taskId && runId; const { data: { dagRuns, groups }, @@ -132,12 +138,21 @@ const Details = ({ const group = getTask({ taskId, task: groups }); const children = group?.children; const isMapped = group?.isMapped; - - const isMappedTaskSummary = isMapped && mapIndex === undefined && taskId; const isGroup = !!children; - const isGroupOrMappedTaskSummary = isGroup || isMappedTaskSummary; - const showLogs = !!(isTaskInstance && !isGroupOrMappedTaskSummary); - const showMappedTasks = !!(isTaskInstance && isMappedTaskSummary && !isGroup); + + const isMappedTaskSummary = !!( + taskId && + runId && + !isGroup && + isMapped && + mapIndex === undefined + ); + const isTaskInstance = !!( + taskId && + runId && + !isGroup && + !isMappedTaskSummary + ); const [searchParams, setSearchParams] = useSearchParams(); const tab = searchParams.get(TAB_PARAM) || undefined; @@ -146,12 +161,17 @@ const Details = ({ const onChangeTab = useCallback( (index: number) => { const params = new URLSearchParamsWrapper(searchParams); - const newTab = indexToTab(index, taskId, showLogs, showMappedTasks); + const newTab = indexToTab( + index, + taskId, + isTaskInstance, + isMappedTaskSummary + ); if (newTab) params.set(TAB_PARAM, newTab); else params.delete(TAB_PARAM); setSearchParams(params); }, - [setSearchParams, searchParams, showLogs, showMappedTasks, taskId] + [setSearchParams, searchParams, isTaskInstance, isMappedTaskSummary, taskId] ); useEffect(() => { @@ -252,7 +272,7 @@ const Details = ({ Code </Text> </Tab> - {showLogs && ( + {isTaskInstance && ( <Tab> <MdReorder size={16} /> <Text as="strong" ml={1}> @@ -260,7 +280,7 @@ const Details = ({ </Text> </Tab> )} - {showMappedTasks && ( + {isMappedTaskSummary && ( <Tab> <BiBracket size={16} /> <Text as="strong" ml={1}> @@ -268,6 +288,14 @@ const Details = ({ </Text> </Tab> )} + {isTaskInstance && ( + <Tab> + <MdSyncAlt size={16} /> + <Text as="strong" ml={1}> + XCom + </Text> + </Tab> + )} </TabList> <TabPanels height="100%"> <TabPanel height="100%"> @@ -304,7 +332,7 @@ const Details = ({ <TabPanel height="100%"> <DagCode /> </TabPanel> - {showLogs && run && ( + {isTaskInstance && run && ( <TabPanel pt={mapIndex !== undefined ? "0px" : undefined} height="100%" @@ -324,7 +352,7 @@ const Details = ({ /> </TabPanel> )} - {showMappedTasks && ( + {isMappedTaskSummary && ( <TabPanel height="100%"> <MappedInstances dagId={dagId} @@ -336,6 +364,17 @@ const Details = ({ /> </TabPanel> )} + {isTaskInstance && ( + <TabPanel height="100%"> + <XcomCollection + dagId={dagId} + dagRunId={runId} + taskId={taskId} + mapIndex={mapIndex} + tryNumber={instance?.tryNumber} + /> + </TabPanel> + )} </TabPanels> </Tabs> </Flex> diff --git a/airflow/www/static/js/dag/details/taskInstance/Nav.tsx b/airflow/www/static/js/dag/details/taskInstance/Nav.tsx index 3541240ce3..1e5d84d8ba 100644 --- a/airflow/www/static/js/dag/details/taskInstance/Nav.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/Nav.tsx @@ -30,7 +30,6 @@ const isK8sExecutor = getMetaValue("k8s_or_k8scelery_executor") === "True"; const taskInstancesUrl = getMetaValue("task_instances_list_url"); const renderedK8sUrl = getMetaValue("rendered_k8s_url"); const renderedTemplatesUrl = getMetaValue("rendered_templates_url"); -const xcomUrl = getMetaValue("xcom_url"); const taskUrl = getMetaValue("task_url"); const gridUrl = getMetaValue("grid_url"); @@ -52,7 +51,6 @@ const Nav = forwardRef<HTMLDivElement, Props>( }); const detailsLink = `${taskUrl}&${params}`; const renderedLink = `${renderedTemplatesUrl}&${params}`; - const xcomLink = `${xcomUrl}&${params}`; const k8sLink = `${renderedK8sUrl}&${params}`; const listParams = new URLSearchParamsWrapper({ _flt_3_dag_id: dagId, @@ -88,7 +86,6 @@ const Nav = forwardRef<HTMLDivElement, Props>( {isSubDag && ( <LinkButton href={subDagLink}>Zoom into SubDag</LinkButton> )} - <LinkButton href={xcomLink}>XCom</LinkButton> </> )} <LinkButton diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx new file mode 100644 index 0000000000..59e3d06505 --- /dev/null +++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx @@ -0,0 +1,82 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react"; +import React from "react"; +import { useTaskXcomEntry } from "src/api"; +import type { Dag, DagRun, TaskInstance } from "src/types"; + +interface Props { + dagId: Dag["id"]; + dagRunId: DagRun["runId"]; + taskId: TaskInstance["taskId"]; + mapIndex?: TaskInstance["mapIndex"]; + xcomKey: string; + tryNumber: TaskInstance["tryNumber"]; +} + +const XcomEntry = ({ + dagId, + dagRunId, + taskId, + mapIndex, + xcomKey, + tryNumber, +}: Props) => { + const { + data: xcom, + isLoading, + error, + } = useTaskXcomEntry({ + dagId, + dagRunId, + taskId, + mapIndex, + xcomKey, + tryNumber: tryNumber || 1, + }); + + let content = <Text fontFamily="monospace">{xcom?.value}</Text>; + if (isLoading) { + content = <Spinner />; + } else if (error) { + content = ( + <Alert status="error"> + <AlertIcon /> + Error loading XCom entry + </Alert> + ); + } else if (!xcom) { + content = ( + <Alert status="info"> + <AlertIcon /> + No value found for XCom key + </Alert> + ); + } + + return ( + <Tr> + <Td>{xcomKey}</Td> + <Td>{content}</Td> + </Tr> + ); +}; + +export default XcomEntry; diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx b/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx new file mode 100644 index 0000000000..065f9ff497 --- /dev/null +++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/index.tsx @@ -0,0 +1,116 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React, { useRef } from "react"; +import type { Dag, DagRun, TaskInstance } from "src/types"; +import { + Table, + Text, + Thead, + Tbody, + Tr, + Td, + Spinner, + Alert, + AlertIcon, + Box, +} from "@chakra-ui/react"; +import { useTaskXcomCollection } from "src/api"; +import { useOffsetTop } from "src/utils"; +import XcomEntry from "./XcomEntry"; + +interface Props { + dagId: Dag["id"]; + dagRunId: DagRun["runId"]; + taskId: TaskInstance["taskId"]; + mapIndex?: TaskInstance["mapIndex"]; + tryNumber: TaskInstance["tryNumber"]; +} + +const XcomCollection = ({ + dagId, + dagRunId, + taskId, + mapIndex, + tryNumber, +}: Props) => { + const taskXcomRef = useRef<HTMLDivElement>(null); + const offsetTop = useOffsetTop(taskXcomRef); + + const { + data: xcomCollection, + isLoading, + error, + } = useTaskXcomCollection({ + dagId, + dagRunId, + taskId, + mapIndex, + tryNumber: tryNumber || 1, + }); + + return ( + <Box + ref={taskXcomRef} + height="100%" + maxHeight={`calc(100% - ${offsetTop}px)`} + overflowY="auto" + > + {isLoading && <Spinner size="xl" thickness="4px" speed="0.65s" />} + {!!error && ( + <Alert status="error" marginBottom="10px"> + <AlertIcon /> + An error occurred while fetching task XCom. + </Alert> + )} + {xcomCollection && + (xcomCollection.totalEntries === 0 ? ( + <Text>No XCom</Text> + ) : ( + <Table variant="striped"> + <Thead> + <Tr> + <Td> + <Text as="b">Key</Text> + </Td> + <Td> + <Text as="b">Value</Text> + </Td> + </Tr> + </Thead> + <Tbody> + {xcomCollection.xcomEntries?.map((xcomEntry) => ( + <XcomEntry + key={xcomEntry.key} + dagId={dagId} + dagRunId={dagRunId} + taskId={taskId} + mapIndex={mapIndex} + xcomKey={xcomEntry.key || ""} + tryNumber={tryNumber} + /> + ))} + </Tbody> + </Table> + ))} + </Box> + ); +}; + +export default XcomCollection; diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 435dacb50c..8d39d05a40 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -63,13 +63,14 @@ <meta name="graph_url" content="{{ url_for('Airflow.graph', dag_id=dag.dag_id, root=root) }}"> <meta name="task_url" content="{{ url_for('Airflow.task', dag_id=dag.dag_id) }}"> <meta name="log_url" content="{{ url_for('Airflow.log', dag_id=dag.dag_id) }}"> - <meta name="xcom_url" content="{{ url_for('Airflow.xcom', dag_id=dag.dag_id) }}"> <meta name="rendered_templates_url" content="{{ url_for('Airflow.rendered_templates', dag_id=dag.dag_id) }}"> <meta name="rendered_k8s_url" content="{{ url_for('Airflow.rendered_k8s', dag_id=dag.dag_id) }}"> <meta name="task_instances_list_url" content="{{ url_for('TaskInstanceModelView.list') }}"> <meta name="tag_index_url" content="{{ url_for('Airflow.index', tags='_TAG_NAME_') }}"> <meta name="mapped_instances_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_get_mapped_task_instances', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}"> <meta name="task_log_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_log_endpoint_get_log', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_', task_try_number='-1') }}"> + <meta name="task_xcom_entries_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_xcom_endpoint_get_xcom_entries', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}"> + <meta name="task_xcom_entry_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_xcom_endpoint_get_xcom_entry', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_', xcom_key='_XCOM_KEY_') }}"> <meta name="upstream_dataset_events_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dag_run_endpoint_get_upstream_dataset_events', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_') }}"> <meta name="task_instance_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_get_task_instance', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}"> <meta name="set_task_instance_note" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_set_task_instance_note', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_' ) }}">
