This is an automated email from the ASF dual-hosted git repository. bbovenzi pushed a commit to branch mapped-task-drawer in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e106d1721b9340ff031a4793d3b485265d33edba Author: Brent Bovenzi <[email protected]> AuthorDate: Wed Mar 2 14:23:10 2022 -0500 make side panel collapsible, useTasks, --- airflow/www/static/js/tree/StatusBox.jsx | 3 +- airflow/www/static/js/tree/Tree.jsx | 126 +++++++++++++------ airflow/www/static/js/tree/api/useDag.js | 2 +- airflow/www/static/js/tree/dagRuns/Bar.jsx | 5 +- airflow/www/static/js/tree/details/Header.jsx | 3 +- airflow/www/static/js/tree/details/content/Dag.jsx | 27 +++- airflow/www/static/js/tree/details/index.jsx | 1 - airflow/www/static/js/tree/renderTaskRows.jsx | 8 +- airflow/www/utils.py | 136 +++++++++++---------- 9 files changed, 196 insertions(+), 115 deletions(-) diff --git a/airflow/www/static/js/tree/StatusBox.jsx b/airflow/www/static/js/tree/StatusBox.jsx index 62762c5..0f5294f 100644 --- a/airflow/www/static/js/tree/StatusBox.jsx +++ b/airflow/www/static/js/tree/StatusBox.jsx @@ -48,8 +48,7 @@ const StatusBox = ({ .forEach((e) => { e.style.backgroundColor = null; }); }; - const onClick = (e) => { - e.stopPropagation(); + const onClick = () => { onMouseLeave(); onSelect({ taskId, runId, instance, task: group, diff --git a/airflow/www/static/js/tree/Tree.jsx b/airflow/www/static/js/tree/Tree.jsx index aeeaeb5..0d84639 100644 --- a/airflow/www/static/js/tree/Tree.jsx +++ b/airflow/www/static/js/tree/Tree.jsx @@ -17,6 +17,8 @@ * under the License. */ +/* global localStorage */ + import React, { useRef, useEffect, useState } from 'react'; import { Table, @@ -29,18 +31,32 @@ import { Text, Thead, Flex, + useDisclosure, + IconButton, } from '@chakra-ui/react'; +import { MdArrowForward, MdArrowBack } from 'react-icons/md'; import useTreeData from './useTreeData'; import renderTaskRows from './renderTaskRows'; import DagRuns from './dagRuns'; import Details from './details'; +import { callModal, callModalDag } from '../dag'; + +const sidePanelKey = 'showSidePanel'; const Tree = () => { const containerRef = useRef(); const scrollRef = useRef(); const { data: { groups = {}, dagRuns = [] }, isRefreshOn, onToggleRefresh } = useTreeData(); - const [selected, setSelected] = useState({}); + const [selected, setSelected] = useState({}); // selected task instance or dag run + const isPanelOpen = JSON.parse(localStorage.getItem(sidePanelKey)); + const { isOpen, onToggle } = useDisclosure({ defaultIsOpen: isPanelOpen }); + + const toggleSidePanel = () => { + if (!isOpen) localStorage.setItem(sidePanelKey, true); + else localStorage.setItem(sidePanelKey, false); + onToggle(); + }; const dagRunIds = dagRuns.map((dr) => dr.runId); @@ -48,46 +64,88 @@ const Tree = () => { // Set initial scroll to far right if it is scrollable const runsContainer = scrollRef.current; if (runsContainer && runsContainer.scrollWidth > runsContainer.clientWidth) { - runsContainer.scrollBy(runsContainer.clientWidth, 0); + runsContainer.scrollBy(runsContainer.clientWidth, 250); } - }, []); + }, [isOpen]); // isOpen is to redo the scroll when the side panel opens/closes const { runId, taskId } = selected; - const onSelect = (newInstance) => ( - (newInstance.runId === runId && newInstance.taskId === taskId) - ? setSelected({}) - : setSelected(newInstance) - ); + + // show task/run info in the side panel, or just call the regular action modal + const onSelect = (newSelected) => { + if (isOpen) { + const isSame = newSelected.runId === runId && newSelected.taskId === taskId; + setSelected(isSame ? {} : newSelected); + } else if (!isOpen) { + if (newSelected.dagRun) { + const { dagRun } = newSelected; + callModalDag({ + execution_date: dagRun.executionDate, + dag_id: dagRun.dagId, + run_id: dagRun.runId, + }); + } else if (newSelected.instance) { + const extraLinks = newSelected.task.extraLinks || []; + const { instance } = newSelected; + callModal( + taskId, + instance.executionDate, + extraLinks, + instance.tryNumber, + instance.operator === 'SubDagOperator' || undefined, + instance.runId, + ); + } + } + }; return ( - <Flex pl="24px" position="relative" flexDirection="row" justifyContent="space-between" ref={containerRef}> + <Box pl="24px" position="relative" ref={containerRef}> <Text transform="rotate(-90deg)" position="absolute" left="-6px" top="130px">Runs</Text> <Text transform="rotate(-90deg)" position="absolute" left="-6px" top="190px">Tasks</Text> - <FormControl display="flex" position="absolute" left="220px"> - {isRefreshOn && <Spinner color="blue.500" speed="1s" mr="4px" />} - <FormLabel htmlFor="auto-refresh" mb={0} fontSize="12px" fontWeight="normal"> - Auto-refresh - </FormLabel> - <Switch id="auto-refresh" onChange={onToggleRefresh} isChecked={isRefreshOn} size="lg" /> - </FormControl> - <Box mr="12px" pb="12px" overflowX="auto" ref={scrollRef} maxWidth="300px" minWidth="300px" position="relative" mt="24px"> - <Table height={0}> - <Thead> - <DagRuns - containerRef={containerRef} - selected={selected} - onSelect={onSelect} - /> - </Thead> - <Tbody> - {renderTaskRows({ - task: groups, containerRef, onSelect, selected, dagRunIds, - })} - </Tbody> - </Table> - </Box> - <Details selected={selected} onSelect={onSelect} /> - </Flex> + <Flex flexGrow={1} justifyContent="flex-end" alignItems="center"> + <FormControl display="flex" width="auto" mr={2}> + {isRefreshOn && <Spinner color="blue.500" speed="1s" mr="4px" />} + <FormLabel htmlFor="auto-refresh" mb={0} fontSize="12px" fontWeight="normal"> + Auto-refresh + </FormLabel> + <Switch id="auto-refresh" onChange={onToggleRefresh} isChecked={isRefreshOn} size="lg" /> + </FormControl> + <IconButton onClick={toggleSidePanel}> + {isOpen + ? <MdArrowForward size="18px" aria-label="Collapse Details" title="Collapse Details" /> + : <MdArrowBack size="18px" title="Expand Details" aria-label="Expand Details" />} + </IconButton> + </Flex> + <Flex flexDirection="row" justifyContent="space-between"> + <Box + mr="12px" + mt="24px" + pb="12px" + overflowX="auto" + ref={scrollRef} + maxWidth={isOpen && '300px'} + minWidth={isOpen && '300px'} + > + <Table height={0}> + <Thead> + <DagRuns + containerRef={containerRef} + selected={selected} + onSelect={onSelect} + /> + </Thead> + <Tbody> + {renderTaskRows({ + task: groups, containerRef, onSelect, selected, dagRunIds, + })} + </Tbody> + </Table> + </Box> + {isOpen && ( + <Details selected={selected} onSelect={onSelect} /> + )} + </Flex> + </Box> ); }; diff --git a/airflow/www/static/js/tree/api/useDag.js b/airflow/www/static/js/tree/api/useDag.js index 6c19ee4..1343302 100644 --- a/airflow/www/static/js/tree/api/useDag.js +++ b/airflow/www/static/js/tree/api/useDag.js @@ -23,6 +23,6 @@ import { useQuery } from 'react-query'; export default function useDag(dagId) { return useQuery( ['dag', dagId], - () => axios.get(`/dags/${dagId}`), + () => axios.get(`/dags/${dagId}/details`), ); } diff --git a/airflow/www/static/js/tree/dagRuns/Bar.jsx b/airflow/www/static/js/tree/dagRuns/Bar.jsx index 94c84e7..14df756 100644 --- a/airflow/www/static/js/tree/dagRuns/Bar.jsx +++ b/airflow/www/static/js/tree/dagRuns/Bar.jsx @@ -57,10 +57,7 @@ const DagRunBar = ({ cursor="pointer" width="14px" zIndex={1} - onClick={(e) => { - e.stopPropagation(); - onSelect({ runId: run.runId, dagRun: run }); - }} + onClick={() => onSelect({ runId: run.runId, dagRun: run })} position="relative" data-peer > diff --git a/airflow/www/static/js/tree/details/Header.jsx b/airflow/www/static/js/tree/details/Header.jsx index c362606..2982a4e 100644 --- a/airflow/www/static/js/tree/details/Header.jsx +++ b/airflow/www/static/js/tree/details/Header.jsx @@ -45,7 +45,6 @@ const Header = ({ dagRuns, }) => { const dagRun = dagRuns.find((r) => r.runId === runId); - // console.log(dagRun); let runLabel = dagRun ? formatDateTime(dagRun.dataIntervalEnd) : ''; if (dagRun && dagRun.runType === 'manual') { runLabel = ( @@ -65,7 +64,7 @@ const Header = ({ </BreadcrumbItem> {runId && ( <BreadcrumbItem isCurrentPage={runId && !taskId}> - <BreadcrumbLink onClick={() => onSelect({ runId })}> + <BreadcrumbLink onClick={() => onSelect({ runId, dagRun })}> <LabelValue label="Run" value={runLabel} /> </BreadcrumbLink> </BreadcrumbItem> diff --git a/airflow/www/static/js/tree/details/content/Dag.jsx b/airflow/www/static/js/tree/details/content/Dag.jsx index e71d9c2..7ee19ec 100644 --- a/airflow/www/static/js/tree/details/content/Dag.jsx +++ b/airflow/www/static/js/tree/details/content/Dag.jsx @@ -27,13 +27,23 @@ import { } from '@chakra-ui/react'; import { getMetaValue } from '../../../utils'; -import { useDag } from '../../api'; +import { useDag, useTasks } from '../../api'; const dagId = getMetaValue('dag_id'); const Dag = () => { const { data: dag } = useDag(dagId); - if (!dag) return null; + const { data: taskData } = useTasks(dagId); + if (!dag || !taskData) return null; + const { tasks = [], totalEntries = '' } = taskData; + const operators = {}; + tasks.forEach((t) => { + if (!operators[t.classRef.className]) { + operators[t.classRef.className] = 1; + } else { + operators[t.classRef.className] += 1; + } + }); const { description, tags, fileloc, owners, } = dag; @@ -50,6 +60,19 @@ const Dag = () => { <Text mr={2}>Owner:</Text> {owners.map((o) => <Text key={o}>{o}</Text>)} </Flex> + <Text> + {totalEntries} + {' '} + Tasks + </Text> + {Object.entries(operators).map(([key, value]) => ( + <Text key={key}> + {value} + {' '} + {key} + {value > 1 && 's'} + </Text> + ))} </> ); }; diff --git a/airflow/www/static/js/tree/details/index.jsx b/airflow/www/static/js/tree/details/index.jsx index ee3b044..c932b05 100644 --- a/airflow/www/static/js/tree/details/index.jsx +++ b/airflow/www/static/js/tree/details/index.jsx @@ -35,7 +35,6 @@ const Details = ({ onSelect, }) => { const { data: { dagRuns = [] } } = useTreeData(); - console.log(selected); return ( <Flex borderLeftWidth="1px" flexDirection="column" p={3} flexGrow={1}> <Header selected={selected} onSelect={onSelect} dagRuns={dagRuns} /> diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx index 36e0b94..f5a2345 100644 --- a/airflow/www/static/js/tree/renderTaskRows.jsx +++ b/airflow/www/static/js/tree/renderTaskRows.jsx @@ -40,18 +40,14 @@ import { getMetaValue } from '../utils'; const dagId = getMetaValue('dag_id'); const renderTaskRows = ({ - task, containerRef, level = 0, isParentOpen, onSelect, selected, dagRunIds, + task, level = 0, ...rest }) => task.children.map((t) => ( <Row + {...rest} key={t.id} task={t} level={level} - containerRef={containerRef} prevTaskId={task.id} - isParentOpen={isParentOpen} - onSelect={onSelect} - selected={selected} - dagRunIds={dagRunIds} /> )); diff --git a/airflow/www/utils.py b/airflow/www/utils.py index ed765d4..b1f5e0d 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -43,8 +43,9 @@ from airflow.models import errors from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone from airflow.utils.code_utils import get_python_source +from airflow.utils.helpers import alchemy_to_dict from airflow.utils.json import AirflowJsonEncoder -from airflow.utils.state import State +from airflow.utils.state import State, TaskInstanceState from airflow.www.forms import DateTimeWithTimezoneField from airflow.www.widgets import AirflowDateTimePickerWidget @@ -55,13 +56,82 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]: return value.isoformat() +def get_mapped_instances(task_instance, session): + return ( + session.query(TaskInstance) + .filter( + TaskInstance.dag_id == task_instance.dag_id, + TaskInstance.run_id == task_instance.run_id, + TaskInstance.task_id == task_instance.task_id, + TaskInstance.map_index >= 0, + ) + .all() + ) + + +def get_instance_with_map(task_instance, session): + if task_instance.map_index == -1: + return alchemy_to_dict(task_instance) + mapped_instances = get_mapped_instances(task_instance, session) + return get_mapped_summary(task_instance, mapped_instances) + + +def get_mapped_summary(parent_instance, task_instances): + priority = [ + TaskInstanceState.FAILED, + TaskInstanceState.UPSTREAM_FAILED, + TaskInstanceState.UP_FOR_RETRY, + TaskInstanceState.UP_FOR_RESCHEDULE, + TaskInstanceState.QUEUED, + TaskInstanceState.SCHEDULED, + TaskInstanceState.DEFERRED, + TaskInstanceState.SENSING, + TaskInstanceState.RUNNING, + TaskInstanceState.SHUTDOWN, + TaskInstanceState.RESTARTING, + TaskInstanceState.REMOVED, + TaskInstanceState.SUCCESS, + TaskInstanceState.SKIPPED, + ] + + mapped_states = [ti.state for ti in task_instances] + + group_state = None + for state in priority: + if state in mapped_states: + group_state = state + break + + group_start_date = datetime_to_string( + min((ti.start_date for ti in task_instances if ti.start_date), default=None) + ) + group_end_date = datetime_to_string( + max((ti.end_date for ti in task_instances if ti.end_date), default=None) + ) + + return { + 'task_id': parent_instance.task_id, + 'run_id': parent_instance.run_id, + 'state': group_state, + 'start_date': group_start_date, + 'end_date': group_end_date, + 'mapped_states': mapped_states, + 'operator': parent_instance.operator, + 'execution_date': datetime_to_string(parent_instance.execution_date), + 'try_number': parent_instance.try_number, + } + + def encode_ti( - task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session + task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Optional[Session] ) -> Optional[Dict[str, Any]]: if not task_instance: return None - summary = { + if is_mapped: + return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session)) + + return { 'task_id': task_instance.task_id, 'dag_id': task_instance.dag_id, 'run_id': task_instance.run_id, @@ -74,66 +144,6 @@ def encode_ti( 'try_number': task_instance.try_number, } - def get_mapped_summary(task_instances): - priority = [ - 'failed', - 'upstream_failed', - 'up_for_retry', - 'up_for_reschedule', - 'queued', - 'scheduled', - 'deferred', - 'sensing', - 'running', - 'shutdown', - 'restarting', - 'removed', - 'no_status', - 'success', - 'skipped', - ] - - mapped_states = [ti.state for ti in task_instances] - - group_state = None - for state in priority: - if state in mapped_states: - group_state = state - break - - group_start_date = datetime_to_string( - min((ti.start_date for ti in task_instances if ti.start_date), default=None) - ) - group_end_date = datetime_to_string( - max((ti.end_date for ti in task_instances if ti.end_date), default=None) - ) - - return { - 'task_id': task_instance.task_id, - 'run_id': task_instance.run_id, - 'state': group_state, - 'start_date': group_start_date, - 'end_date': group_end_date, - 'mapped_states': mapped_states, - 'operator': task_instance.operator, - 'execution_date': datetime_to_string(task_instance.execution_date), - 'try_number': task_instance.try_number, - } - - if is_mapped: - return get_mapped_summary( - session.query(TaskInstance) - .filter( - TaskInstance.dag_id == task_instance.dag_id, - TaskInstance.run_id == task_instance.run_id, - TaskInstance.task_id == task_instance.task_id, - TaskInstance.map_index >= 0, - ) - .all() - ) - - return summary - def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]: if not dag_run:
