This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 753b9eced5 Show mapped task groups in grid view (#28208)
753b9eced5 is described below
commit 753b9eced5938a4216ddaf0c4afcf89bd7a39a4e
Author: Brent Bovenzi <[email protected]>
AuthorDate: Wed Dec 14 13:45:45 2022 -0600
Show mapped task groups in grid view (#28208)
* mapped task groups in grid
* Use .all() instead of list() to get instances
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/www/static/js/dag/InstanceTooltip.tsx | 7 +-
.../static/js/dag/details/taskInstance/Details.tsx | 5 +-
.../static/js/dag/details/taskInstance/index.tsx | 6 +-
airflow/www/static/js/dag/grid/renderTaskRows.tsx | 9 ++-
airflow/www/views.py | 84 ++++++++++++++++++++--
tests/www/views/test_views_grid.py | 79 +++++++++++++++++++-
6 files changed, 175 insertions(+), 15 deletions(-)
diff --git a/airflow/www/static/js/dag/InstanceTooltip.tsx
b/airflow/www/static/js/dag/InstanceTooltip.tsx
index f5aad277bb..a31b712b19 100644
--- a/airflow/www/static/js/dag/InstanceTooltip.tsx
+++ b/airflow/www/static/js/dag/InstanceTooltip.tsx
@@ -44,7 +44,7 @@ const InstanceTooltip = ({
const numMap = finalStatesMap();
let numMapped = 0;
- if (isGroup && group.children) {
+ if (isGroup && group.children && !isMapped) {
group.children.forEach((child) => {
const taskInstance = child.instances.find((ti) => ti.runId === runId);
if (taskInstance) {
@@ -52,7 +52,9 @@ const InstanceTooltip = ({
if (numMap.has(stateKey)) numMap.set(stateKey, (numMap.get(stateKey)
|| 0) + 1);
}
});
- } else if (isMapped && mappedStates) {
+ }
+
+ if (isMapped && mappedStates) {
Object.keys(mappedStates).forEach((stateKey) => {
const num = mappedStates[stateKey];
numMapped += num;
@@ -83,6 +85,7 @@ const InstanceTooltip = ({
{numMapped}
{' '}
mapped task
+ {isGroup && ' group'}
{numMapped > 1 && 's'}
</Text>
)}
diff --git a/airflow/www/static/js/dag/details/taskInstance/Details.tsx
b/airflow/www/static/js/dag/details/taskInstance/Details.tsx
index 7d9dfa1a1c..6deb24617f 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Details.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/Details.tsx
@@ -74,7 +74,7 @@ const Details = ({ instance, group, dagId }: Props) => {
const numMap = finalStatesMap();
let numMapped = 0;
- if (isGroup) {
+ if (isGroup && !isMapped) {
group.children?.forEach((child) => {
const taskInstance = child.instances.find((ti) => ti.runId === runId);
if (taskInstance) {
@@ -167,7 +167,8 @@ const Details = ({ instance, group, dagId }: Props) => {
<Td colSpan={2}>
{numMapped}
{' '}
- {numMapped === 1 ? 'Task ' : 'Tasks '}
+ {isGroup ? 'Task Group' : 'Task'}
+ {numMapped === 1 ? ' ' : 's '}
Mapped
</Td>
</Tr>
diff --git a/airflow/www/static/js/dag/details/taskInstance/index.tsx
b/airflow/www/static/js/dag/details/taskInstance/index.tsx
index 329d84533a..894b22a5c8 100644
--- a/airflow/www/static/js/dag/details/taskInstance/index.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/index.tsx
@@ -101,7 +101,7 @@ const TaskInstance = ({
isPreferedTabDisplayed = true;
break;
case 1:
- isPreferedTabDisplayed = !isGroup;
+ isPreferedTabDisplayed = !isGroup || (isGroup && !!isMapped);
break;
default:
isPreferedTabDisplayed = false;
@@ -139,7 +139,7 @@ const TaskInstance = ({
<Tab>
<Text as="strong">Details</Text>
</Tab>
- {isMappedTaskSummary && (
+ {isMappedTaskSummary && !isGroup && (
<Tab>
<Text as="strong">Mapped Tasks</Text>
</Tab>
@@ -218,7 +218,7 @@ const TaskInstance = ({
{/* Mapped Task Instances Tab */}
{
- isMappedTaskSummary && (
+ isMappedTaskSummary && !isGroup && (
<TabPanel>
<MappedInstances
dagId={dagId}
diff --git a/airflow/www/static/js/dag/grid/renderTaskRows.tsx
b/airflow/www/static/js/dag/grid/renderTaskRows.tsx
index 9e4b3c0b37..9914162889 100644
--- a/airflow/www/static/js/dag/grid/renderTaskRows.tsx
+++ b/airflow/www/static/js/dag/grid/renderTaskRows.tsx
@@ -44,6 +44,7 @@ interface RowProps {
openGroupIds?: string[];
onToggleGroups?: (groupIds: string[]) => void;
hoveredTaskState?: string | null;
+ isParentMapped?: boolean;
}
const renderTaskRows = ({
@@ -114,6 +115,7 @@ const Row = (props: RowProps) => {
openGroupIds = [],
onToggleGroups = () => {},
hoveredTaskState,
+ isParentMapped,
} = props;
const { colors } = useTheme();
const { selected, onSelect } = useSelection();
@@ -168,7 +170,7 @@ const Row = (props: RowProps) => {
<TaskName
onToggle={memoizedToggle}
isGroup={isGroup}
- isMapped={task.isMapped}
+ isMapped={task.isMapped && !isParentMapped}
label={task.label || task.id || ''}
isOpen={isOpen}
level={level}
@@ -191,7 +193,10 @@ const Row = (props: RowProps) => {
</Tr>
{isGroup && isOpen && (
renderTaskRows({
- ...props, level: level + 1, openParentCount: openParentCount + 1,
+ ...props,
+ level: level + 1,
+ openParentCount: openParentCount + 1,
+ isParentMapped: task.isMapped,
})
)}
</>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 16b4818d7d..cf6d371441 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -121,7 +121,7 @@ from airflow.utils.net import get_hostname
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.strings import to_boolean
-from airflow.utils.task_group import task_group_to_dict
+from airflow.utils.task_group import MappedTaskGroup, task_group_to_dict
from airflow.utils.timezone import td_format, utcnow
from airflow.version import version
from airflow.www import auth, utils as wwwutils
@@ -281,7 +281,7 @@ def dag_to_grid(dag, dag_runs, session):
grouped_tis = {task_id: list(tis) for task_id, tis in
itertools.groupby(query, key=lambda ti: ti.task_id)}
- def task_group_to_grid(item, dag_runs, grouped_tis):
+ def task_group_to_grid(item, dag_runs, grouped_tis,
is_parent_mapped=False):
if isinstance(item, AbstractOperator):
def _get_summary(task_instance):
@@ -341,7 +341,7 @@ def dag_to_grid(dag, dag_runs, session):
set_overall_state(record)
yield record
- if isinstance(item, MappedOperator):
+ if isinstance(item, MappedOperator) or is_parent_mapped:
instances = list(_mapped_summary(grouped_tis.get(item.task_id,
[])))
else:
instances = list(map(_get_summary,
grouped_tis.get(item.task_id, [])))
@@ -351,16 +351,18 @@ def dag_to_grid(dag, dag_runs, session):
"instances": instances,
"label": item.label,
"extra_links": item.extra_links,
- "is_mapped": isinstance(item, MappedOperator),
+ "is_mapped": isinstance(item, MappedOperator) or
is_parent_mapped,
"has_outlet_datasets": any(isinstance(i, Dataset) for i in
(item.outlets or [])),
"operator": item.operator_name,
}
# Task Group
task_group = item
+ group_is_mapped = isinstance(task_group, MappedTaskGroup)
children = [
- task_group_to_grid(child, dag_runs, grouped_tis) for child in
task_group.topological_sort()
+ task_group_to_grid(child, dag_runs, grouped_tis, group_is_mapped)
+ for child in task_group.topological_sort()
]
def get_summary(dag_run, children):
@@ -389,6 +391,66 @@ def dag_to_grid(dag, dag_runs, session):
"end_date": group_end_date,
}
+ def get_mapped_group_summaries(run_ids, children):
+ mapped_instances = (
+ session.query(
+ TaskInstance.task_id, TaskInstance.state,
TaskInstance.run_id, TaskInstance.map_index
+ )
+ .filter(
+ TaskInstance.dag_id == dag.dag_id,
+ TaskInstance.task_id.in_(child["id"] for child in
children),
+ TaskInstance.run_id.in_(run_ids),
+ )
+ .all()
+ )
+
+ def get_mapped_group_summary(run_id, mapped_instances):
+ map_length = max(mi.map_index for mi in mapped_instances) + 1
+ child_instances = [child["instances"] for child in children if
"instances" in child]
+ child_instances = [
+ item for sublist in child_instances for item in sublist if
item["run_id"] == run_id
+ ]
+
+ children_start_dates = (item["start_date"] for item in
child_instances if item)
+ children_end_dates = (item["end_date"] for item in
child_instances if item)
+ children_states = {item["state"] for item in child_instances
if item}
+
+ mapped_states = {}
+ for i in range(0, map_length):
+ child_states = [mi.state for mi in mapped_instances if
mi.map_index == i]
+ for state in wwwutils.priority:
+ if state in child_states:
+ value = state.value if state is not None else
"no_status"
+ if value in mapped_states:
+ mapped_states[value] += 1
+ else:
+ mapped_states[value] = 1
+ break
+
+ group_state = None
+ for state in wwwutils.priority:
+ if state in children_states:
+ group_state = state
+ break
+ group_start_date = min(filter(None, children_start_dates),
default=None)
+ group_end_date = max(filter(None, children_end_dates),
default=None)
+
+ return {
+ "task_id": task_group.group_id,
+ "run_id": run_id,
+ "state": group_state,
+ "start_date": group_start_date,
+ "end_date": group_end_date,
+ "mapped_states": mapped_states,
+ }
+
+ return [
+ get_mapped_group_summary(
+ run_id, mapped_instances=[mi for mi in mapped_instances if
mi.run_id == run_id]
+ )
+ for run_id in run_ids
+ ]
+
# We don't need to calculate summaries for the root
if task_group.group_id is None:
return {
@@ -398,6 +460,18 @@ def dag_to_grid(dag, dag_runs, session):
"instances": [],
}
+ if group_is_mapped:
+ mapped_group_summaries = get_mapped_group_summaries([dr.run_id for
dr in dag_runs], children)
+
+ return {
+ "id": task_group.group_id,
+ "label": task_group.label,
+ "children": children,
+ "tooltip": task_group.tooltip,
+ "instances": mapped_group_summaries,
+ "is_mapped": group_is_mapped,
+ }
+
group_summaries = [get_summary(dr, children) for dr in dag_runs]
return {
diff --git a/tests/www/views/test_views_grid.py
b/tests/www/views/test_views_grid.py
index 6985ce103b..f0fef3a63b 100644
--- a/tests/www/views/test_views_grid.py
+++ b/tests/www/views/test_views_grid.py
@@ -22,6 +22,7 @@ import pytest
from dateutil.tz import UTC
from airflow.datasets import Dataset
+from airflow.decorators import task_group
from airflow.lineage.entities import File
from airflow.models import DagBag
from airflow.models.dagrun import DagRun
@@ -64,6 +65,12 @@ def dag_without_runs(dag_maker, session, app, monkeypatch):
with dag_maker(dag_id=DAG_ID, serialized=True, session=session):
EmptyOperator(task_id="task1")
+
+ @task_group
+ def mapped_task_group(arg1):
+ return MockOperator(task_id="subtask2", arg1=arg1)
+
+ mapped_task_group.expand(arg1=["a", "b", "c"])
with TaskGroup(group_id="group"):
MockOperator.partial(task_id="mapped").expand(arg1=["a", "b",
"c", "d"])
@@ -102,6 +109,24 @@ def test_no_runs(admin_client, dag_without_runs):
"label": "task1",
"operator": "EmptyOperator",
},
+ {
+ "children": [
+ {
+ "extra_links": [],
+ "has_outlet_datasets": False,
+ "id": "mapped_task_group.subtask2",
+ "instances": [],
+ "is_mapped": True,
+ "label": "subtask2",
+ "operator": "MockOperator",
+ }
+ ],
+ "is_mapped": True,
+ "id": "mapped_task_group",
+ "instances": [],
+ "label": "mapped_task_group",
+ "tooltip": "",
+ },
{
"children": [
{
@@ -231,6 +256,58 @@ def test_one_run(admin_client, dag_with_runs:
list[DagRun], session):
"label": "task1",
"operator": "EmptyOperator",
},
+ {
+ "children": [
+ {
+ "extra_links": [],
+ "has_outlet_datasets": False,
+ "id": "mapped_task_group.subtask2",
+ "instances": [
+ {
+ "run_id": "run_1",
+ "mapped_states": {"success": 3},
+ "start_date": None,
+ "end_date": None,
+ "state": "success",
+ "task_id": "mapped_task_group.subtask2",
+ },
+ {
+ "run_id": "run_2",
+ "mapped_states": {"no_status": 3},
+ "start_date": None,
+ "end_date": None,
+ "state": None,
+ "task_id": "mapped_task_group.subtask2",
+ },
+ ],
+ "is_mapped": True,
+ "label": "subtask2",
+ "operator": "MockOperator",
+ }
+ ],
+ "is_mapped": True,
+ "id": "mapped_task_group",
+ "instances": [
+ {
+ "end_date": None,
+ "run_id": "run_1",
+ "mapped_states": {"success": 3},
+ "start_date": None,
+ "state": "success",
+ "task_id": "mapped_task_group",
+ },
+ {
+ "run_id": "run_2",
+ "start_date": None,
+ "end_date": None,
+ "state": None,
+ "mapped_states": {"no_status": 3},
+ "task_id": "mapped_task_group",
+ },
+ ],
+ "label": "mapped_task_group",
+ "tooltip": "",
+ },
{
"children": [
{
@@ -291,7 +368,7 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun],
session):
def test_query_count(dag_with_runs, session):
run1, run2 = dag_with_runs
- with assert_queries_count(1):
+ with assert_queries_count(2):
dag_to_grid(run1.dag, (run1, run2), session)