This is an automated email from the ASF dual-hosted git repository. bbovenzi pushed a commit to branch show-mapped-task-in-tree-view in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9616be41fdfe984fd38604a6d423cdd0d7beb3fc Author: Brent Bovenzi <[email protected]> AuthorDate: Wed Feb 16 11:27:29 2022 -0500 make UI and tree work with mapped tasks --- airflow/www/static/js/tree/InstanceTooltip.jsx | 64 ++++++++++++++++++------ airflow/www/static/js/tree/renderTaskRows.jsx | 7 ++- airflow/www/utils.py | 68 +++++++++++++++++++++++++- airflow/www/views.py | 28 ++++++----- 4 files changed, 138 insertions(+), 29 deletions(-) diff --git a/airflow/www/static/js/tree/InstanceTooltip.jsx b/airflow/www/static/js/tree/InstanceTooltip.jsx index a1ef192..612905b 100644 --- a/airflow/www/static/js/tree/InstanceTooltip.jsx +++ b/airflow/www/static/js/tree/InstanceTooltip.jsx @@ -24,30 +24,33 @@ import { Box, Text } from '@chakra-ui/react'; import { formatDateTime, getDuration, formatDuration } from '../datetime_utils'; +const STATES = [ + ['success', 0], + ['failed', 0], + ['upstream_failed', 0], + ['up_for_retry', 0], + ['up_for_reschedule', 0], + ['running', 0], + ['deferred', 0], + ['sensing', 0], + ['queued', 0], + ['scheduled', 0], + ['skipped', 0], + ['no_status', 0], +]; + const InstanceTooltip = ({ group, instance: { - duration, operator, startDate, endDate, state, taskId, runId, + duration, operator, startDate, endDate, state, taskId, runId, mappedStates, }, }) => { const isGroup = !!group.children; const groupSummary = []; + const mapSummary = []; if (isGroup) { - const numMap = new Map([ - ['success', 0], - ['failed', 0], - ['upstream_failed', 0], - ['up_for_retry', 0], - ['up_for_reschedule', 0], - ['running', 0], - ['deferred', 0], - ['sensing', 0], - ['queued', 0], - ['scheduled', 0], - ['skipped', 0], - ['no_status', 0], - ]); + const numMap = new Map(STATES); group.children.forEach((child) => { const taskInstance = child.instances.find((ti) => ti.runId === runId); if (taskInstance) { @@ -69,6 +72,26 @@ const InstanceTooltip = ({ }); } + if (group.isMapped && mappedStates) { + const numMap = new Map(STATES); + mappedStates.forEach((s) => { + const stateKey = s || 'no_status'; + if (numMap.has(stateKey)) numMap.set(stateKey, numMap.get(stateKey) + 1); + }); + numMap.forEach((key, val) => { + if (key > 0) { + mapSummary.push( + // eslint-disable-next-line react/no-array-index-key + <Text key={val} ml="10px"> + {val} + {': '} + {key} + </Text>, + ); + } + }); + } + const taskIdTitle = isGroup ? 'Task Group Id: ' : 'Task Id: '; return ( @@ -88,6 +111,17 @@ const InstanceTooltip = ({ {groupSummary} </> )} + {group.isMapped && ( + <> + <br /> + <Text as="strong"> + {mappedStates.length} + {' '} + Tasks Mapped + </Text> + {mapSummary} + </> + )} <br /> <Text> {taskIdTitle} diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx index 8dcd88d..224885b 100644 --- a/airflow/www/static/js/tree/renderTaskRows.jsx +++ b/airflow/www/static/js/tree/renderTaskRows.jsx @@ -53,7 +53,7 @@ const renderTaskRows = ({ )); const TaskName = ({ - isGroup, onToggle, isOpen, level, taskName, + isGroup = false, isMapped = false, onToggle, isOpen, level, taskName, }) => ( <Box _groupHover={{ backgroundColor: 'rgba(113, 128, 150, 0.1)' }} transition="background-color 0.2s"> <Flex @@ -74,6 +74,9 @@ const TaskName = ({ isTruncated > {taskName} + {isMapped && ( + ' [ ]' + )} </Text> {isGroup && ( isOpen ? <FiChevronDown data-testid="open-group" /> : <FiChevronUp data-testid="closed-group" /> @@ -107,6 +110,7 @@ const Row = ({ }) => { const { data: { dagRuns = [] } } = useTreeData(); const isGroup = !!task.children; + const taskName = prevTaskId ? task.id.replace(`${prevTaskId}.`, '') : task.id; const storageKey = `${dagId}-open-groups`; @@ -148,6 +152,7 @@ const Row = ({ <TaskName onToggle={onToggle} isGroup={isGroup} + isMapped={task.isMapped} taskName={taskName} isOpen={isOpen} level={level} diff --git a/airflow/www/utils.py b/airflow/www/utils.py index d9ec10f..ed765d4 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -36,9 +36,11 @@ from pendulum.datetime import DateTime from pygments import highlight, lexers from pygments.formatters import HtmlFormatter from sqlalchemy.ext.associationproxy import AssociationProxy +from sqlalchemy.orm import Session from airflow import models from airflow.models import errors +from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone from airflow.utils.code_utils import get_python_source from airflow.utils.json import AirflowJsonEncoder @@ -53,11 +55,13 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]: return value.isoformat() -def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str, Any]]: +def encode_ti( + task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session +) -> Optional[Dict[str, Any]]: if not task_instance: return None - return { + summary = { 'task_id': task_instance.task_id, 'dag_id': task_instance.dag_id, 'run_id': task_instance.run_id, @@ -70,6 +74,66 @@ def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str 'try_number': task_instance.try_number, } + def get_mapped_summary(task_instances): + priority = [ + 'failed', + 'upstream_failed', + 'up_for_retry', + 'up_for_reschedule', + 'queued', + 'scheduled', + 'deferred', + 'sensing', + 'running', + 'shutdown', + 'restarting', + 'removed', + 'no_status', + 'success', + 'skipped', + ] + + mapped_states = [ti.state for ti in task_instances] + + group_state = None + for state in priority: + if state in mapped_states: + group_state = state + break + + group_start_date = datetime_to_string( + min((ti.start_date for ti in task_instances if ti.start_date), default=None) + ) + group_end_date = datetime_to_string( + max((ti.end_date for ti in task_instances if ti.end_date), default=None) + ) + + return { + 'task_id': task_instance.task_id, + 'run_id': task_instance.run_id, + 'state': group_state, + 'start_date': group_start_date, + 'end_date': group_end_date, + 'mapped_states': mapped_states, + 'operator': task_instance.operator, + 'execution_date': datetime_to_string(task_instance.execution_date), + 'try_number': task_instance.try_number, + } + + if is_mapped: + return get_mapped_summary( + session.query(TaskInstance) + .filter( + TaskInstance.dag_id == task_instance.dag_id, + TaskInstance.run_id == task_instance.run_id, + TaskInstance.task_id == task_instance.task_id, + TaskInstance.map_index >= 0, + ) + .all() + ) + + return summary + def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]: if not dag_run: diff --git a/airflow/www/views.py b/airflow/www/views.py index cd45952..63c9fb1 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -99,7 +99,7 @@ from airflow.jobs.base_job import BaseJob from airflow.jobs.scheduler_job import SchedulerJob from airflow.jobs.triggerer_job import TriggererJob from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors -from airflow.models.baseoperator import BaseOperator +from airflow.models.abstractoperator import AbstractOperator from airflow.models.dagcode import DagCode from airflow.models.dagrun import DagRun, DagRunType from airflow.models.serialized_dag import SerializedDagModel @@ -223,23 +223,30 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): } -def task_group_to_tree(task_item_or_group, dag, dag_runs, tis): +def task_group_to_tree(task_item_or_group, dag, dag_runs, tis, session): """ Create a nested dict representation of this TaskGroup and its children used to construct the Graph. """ - if isinstance(task_item_or_group, BaseOperator): + if isinstance(task_item_or_group, AbstractOperator): return { 'id': task_item_or_group.task_id, - 'instances': [wwwutils.encode_ti(ti) for ti in tis if ti.task_id == task_item_or_group.task_id], + 'instances': [ + wwwutils.encode_ti(ti, task_item_or_group.is_mapped, session) + for ti in tis + if ti.task_id == task_item_or_group.task_id + ], 'label': task_item_or_group.label, - 'extra_links': task_item_or_group.extra_links, + 'extra_links': [], + 'is_mapped': task_item_or_group.is_mapped, } # Task Group task_group = task_item_or_group - children = [task_group_to_tree(child, dag, dag_runs, tis) for child in task_group.children.values()] + children = [ + task_group_to_tree(child, dag, dag_runs, tis, session) for child in task_group.children.values() + ] def get_summary(dag_run, children): priority = [ @@ -307,7 +314,7 @@ def task_group_to_dict(task_item_or_group): Create a nested dict representation of this TaskGroup and its children used to construct the Graph. """ - if isinstance(task_item_or_group, BaseOperator): + if isinstance(task_item_or_group, AbstractOperator): return { 'id': task_item_or_group.task_id, 'value': { @@ -423,7 +430,7 @@ def dag_edges(dag): def collect_edges(task_group): """Update edges_to_add and edges_to_skip according to TaskGroups.""" - if isinstance(task_group, BaseOperator): + if isinstance(task_group, AbstractOperator): return for target_id in task_group.downstream_group_ids: @@ -2467,7 +2474,7 @@ class Airflow(AirflowBaseView): tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session) data = { - 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis), + 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session), 'dag_runs': encoded_runs, } @@ -3248,7 +3255,6 @@ class Airflow(AirflowBaseView): (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), ] ) - @action_logging def tree_data(self): """Returns tree data""" dag_id = request.args.get('dag_id') @@ -3286,7 +3292,7 @@ class Airflow(AirflowBaseView): min_date = min(dag_run_dates, default=None) tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session) data = { - 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis), + 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session), 'dag_runs': encoded_runs, }
