This is an automated email from the ASF dual-hosted git repository. bbovenzi pushed a commit to branch mapped-task-drawer in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f4763f7474be6d3b4da981dcf587b5f3e6567083 Author: Brent Bovenzi <[email protected]> AuthorDate: Wed Feb 16 11:27:29 2022 -0500 make UI and tree work with mapped tasks --- airflow/www/static/js/tree/InstanceTooltip.jsx | 15 +++ airflow/www/utils.py | 136 ++++++++++++------------- 2 files changed, 78 insertions(+), 73 deletions(-) diff --git a/airflow/www/static/js/tree/InstanceTooltip.jsx b/airflow/www/static/js/tree/InstanceTooltip.jsx index e22189d..bc0d58c 100644 --- a/airflow/www/static/js/tree/InstanceTooltip.jsx +++ b/airflow/www/static/js/tree/InstanceTooltip.jsx @@ -25,6 +25,21 @@ import { Box, Text } from '@chakra-ui/react'; import { formatDateTime, getDuration, formatDuration } from '../datetime_utils'; import { finalStatesMap } from '../utils'; +const STATES = [ + ['success', 0], + ['failed', 0], + ['upstream_failed', 0], + ['up_for_retry', 0], + ['up_for_reschedule', 0], + ['running', 0], + ['deferred', 0], + ['sensing', 0], + ['queued', 0], + ['scheduled', 0], + ['skipped', 0], + ['no_status', 0], +]; + const InstanceTooltip = ({ group, instance: { diff --git a/airflow/www/utils.py b/airflow/www/utils.py index b1f5e0d..ed765d4 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -43,9 +43,8 @@ from airflow.models import errors from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone from airflow.utils.code_utils import get_python_source -from airflow.utils.helpers import alchemy_to_dict from airflow.utils.json import AirflowJsonEncoder -from airflow.utils.state import State, TaskInstanceState +from airflow.utils.state import State from airflow.www.forms import DateTimeWithTimezoneField from airflow.www.widgets import AirflowDateTimePickerWidget @@ -56,82 +55,13 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]: return value.isoformat() -def get_mapped_instances(task_instance, session): - return ( - session.query(TaskInstance) - .filter( - TaskInstance.dag_id == task_instance.dag_id, - TaskInstance.run_id == task_instance.run_id, - TaskInstance.task_id == task_instance.task_id, - TaskInstance.map_index >= 0, - ) - .all() - ) - - -def get_instance_with_map(task_instance, session): - if task_instance.map_index == -1: - return alchemy_to_dict(task_instance) - mapped_instances = get_mapped_instances(task_instance, session) - return get_mapped_summary(task_instance, mapped_instances) - - -def get_mapped_summary(parent_instance, task_instances): - priority = [ - TaskInstanceState.FAILED, - TaskInstanceState.UPSTREAM_FAILED, - TaskInstanceState.UP_FOR_RETRY, - TaskInstanceState.UP_FOR_RESCHEDULE, - TaskInstanceState.QUEUED, - TaskInstanceState.SCHEDULED, - TaskInstanceState.DEFERRED, - TaskInstanceState.SENSING, - TaskInstanceState.RUNNING, - TaskInstanceState.SHUTDOWN, - TaskInstanceState.RESTARTING, - TaskInstanceState.REMOVED, - TaskInstanceState.SUCCESS, - TaskInstanceState.SKIPPED, - ] - - mapped_states = [ti.state for ti in task_instances] - - group_state = None - for state in priority: - if state in mapped_states: - group_state = state - break - - group_start_date = datetime_to_string( - min((ti.start_date for ti in task_instances if ti.start_date), default=None) - ) - group_end_date = datetime_to_string( - max((ti.end_date for ti in task_instances if ti.end_date), default=None) - ) - - return { - 'task_id': parent_instance.task_id, - 'run_id': parent_instance.run_id, - 'state': group_state, - 'start_date': group_start_date, - 'end_date': group_end_date, - 'mapped_states': mapped_states, - 'operator': parent_instance.operator, - 'execution_date': datetime_to_string(parent_instance.execution_date), - 'try_number': parent_instance.try_number, - } - - def encode_ti( - task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Optional[Session] + task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session ) -> Optional[Dict[str, Any]]: if not task_instance: return None - if is_mapped: - return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session)) - - return { + summary = { 'task_id': task_instance.task_id, 'dag_id': task_instance.dag_id, 'run_id': task_instance.run_id, @@ -144,6 +74,66 @@ def encode_ti( 'try_number': task_instance.try_number, } + def get_mapped_summary(task_instances): + priority = [ + 'failed', + 'upstream_failed', + 'up_for_retry', + 'up_for_reschedule', + 'queued', + 'scheduled', + 'deferred', + 'sensing', + 'running', + 'shutdown', + 'restarting', + 'removed', + 'no_status', + 'success', + 'skipped', + ] + + mapped_states = [ti.state for ti in task_instances] + + group_state = None + for state in priority: + if state in mapped_states: + group_state = state + break + + group_start_date = datetime_to_string( + min((ti.start_date for ti in task_instances if ti.start_date), default=None) + ) + group_end_date = datetime_to_string( + max((ti.end_date for ti in task_instances if ti.end_date), default=None) + ) + + return { + 'task_id': task_instance.task_id, + 'run_id': task_instance.run_id, + 'state': group_state, + 'start_date': group_start_date, + 'end_date': group_end_date, + 'mapped_states': mapped_states, + 'operator': task_instance.operator, + 'execution_date': datetime_to_string(task_instance.execution_date), + 'try_number': task_instance.try_number, + } + + if is_mapped: + return get_mapped_summary( + session.query(TaskInstance) + .filter( + TaskInstance.dag_id == task_instance.dag_id, + TaskInstance.run_id == task_instance.run_id, + TaskInstance.task_id == task_instance.task_id, + TaskInstance.map_index >= 0, + ) + .all() + ) + + return summary + def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]: if not dag_run:
