This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 753b9eced5 Show mapped task groups in grid view (#28208)
753b9eced5 is described below

commit 753b9eced5938a4216ddaf0c4afcf89bd7a39a4e
Author: Brent Bovenzi <[email protected]>
AuthorDate: Wed Dec 14 13:45:45 2022 -0600

    Show mapped task groups in grid view (#28208)
    
    * mapped task groups in grid
    
    * Use .all() instead of list() to get instances
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/www/static/js/dag/InstanceTooltip.tsx      |  7 +-
 .../static/js/dag/details/taskInstance/Details.tsx |  5 +-
 .../static/js/dag/details/taskInstance/index.tsx   |  6 +-
 airflow/www/static/js/dag/grid/renderTaskRows.tsx  |  9 ++-
 airflow/www/views.py                               | 84 ++++++++++++++++++++--
 tests/www/views/test_views_grid.py                 | 79 +++++++++++++++++++-
 6 files changed, 175 insertions(+), 15 deletions(-)

diff --git a/airflow/www/static/js/dag/InstanceTooltip.tsx 
b/airflow/www/static/js/dag/InstanceTooltip.tsx
index f5aad277bb..a31b712b19 100644
--- a/airflow/www/static/js/dag/InstanceTooltip.tsx
+++ b/airflow/www/static/js/dag/InstanceTooltip.tsx
@@ -44,7 +44,7 @@ const InstanceTooltip = ({
 
   const numMap = finalStatesMap();
   let numMapped = 0;
-  if (isGroup && group.children) {
+  if (isGroup && group.children && !isMapped) {
     group.children.forEach((child) => {
       const taskInstance = child.instances.find((ti) => ti.runId === runId);
       if (taskInstance) {
@@ -52,7 +52,9 @@ const InstanceTooltip = ({
         if (numMap.has(stateKey)) numMap.set(stateKey, (numMap.get(stateKey) 
|| 0) + 1);
       }
     });
-  } else if (isMapped && mappedStates) {
+  }
+
+  if (isMapped && mappedStates) {
     Object.keys(mappedStates).forEach((stateKey) => {
       const num = mappedStates[stateKey];
       numMapped += num;
@@ -83,6 +85,7 @@ const InstanceTooltip = ({
           {numMapped}
           {' '}
           mapped task
+          {isGroup && ' group'}
           {numMapped > 1 && 's'}
         </Text>
       )}
diff --git a/airflow/www/static/js/dag/details/taskInstance/Details.tsx 
b/airflow/www/static/js/dag/details/taskInstance/Details.tsx
index 7d9dfa1a1c..6deb24617f 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Details.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/Details.tsx
@@ -74,7 +74,7 @@ const Details = ({ instance, group, dagId }: Props) => {
 
   const numMap = finalStatesMap();
   let numMapped = 0;
-  if (isGroup) {
+  if (isGroup && !isMapped) {
     group.children?.forEach((child) => {
       const taskInstance = child.instances.find((ti) => ti.runId === runId);
       if (taskInstance) {
@@ -167,7 +167,8 @@ const Details = ({ instance, group, dagId }: Props) => {
               <Td colSpan={2}>
                 {numMapped}
                 {' '}
-                {numMapped === 1 ? 'Task ' : 'Tasks '}
+                {isGroup ? 'Task Group' : 'Task'}
+                {numMapped === 1 ? ' ' : 's '}
                 Mapped
               </Td>
             </Tr>
diff --git a/airflow/www/static/js/dag/details/taskInstance/index.tsx 
b/airflow/www/static/js/dag/details/taskInstance/index.tsx
index 329d84533a..894b22a5c8 100644
--- a/airflow/www/static/js/dag/details/taskInstance/index.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/index.tsx
@@ -101,7 +101,7 @@ const TaskInstance = ({
       isPreferedTabDisplayed = true;
       break;
     case 1:
-      isPreferedTabDisplayed = !isGroup;
+      isPreferedTabDisplayed = !isGroup || (isGroup && !!isMapped);
       break;
     default:
       isPreferedTabDisplayed = false;
@@ -139,7 +139,7 @@ const TaskInstance = ({
           <Tab>
             <Text as="strong">Details</Text>
           </Tab>
-          {isMappedTaskSummary && (
+          {isMappedTaskSummary && !isGroup && (
             <Tab>
               <Text as="strong">Mapped Tasks</Text>
             </Tab>
@@ -218,7 +218,7 @@ const TaskInstance = ({
 
           {/* Mapped Task Instances Tab */}
           {
-            isMappedTaskSummary && (
+            isMappedTaskSummary && !isGroup && (
               <TabPanel>
                 <MappedInstances
                   dagId={dagId}
diff --git a/airflow/www/static/js/dag/grid/renderTaskRows.tsx 
b/airflow/www/static/js/dag/grid/renderTaskRows.tsx
index 9e4b3c0b37..9914162889 100644
--- a/airflow/www/static/js/dag/grid/renderTaskRows.tsx
+++ b/airflow/www/static/js/dag/grid/renderTaskRows.tsx
@@ -44,6 +44,7 @@ interface RowProps {
   openGroupIds?: string[];
   onToggleGroups?: (groupIds: string[]) => void;
   hoveredTaskState?: string | null;
+  isParentMapped?: boolean;
 }
 
 const renderTaskRows = ({
@@ -114,6 +115,7 @@ const Row = (props: RowProps) => {
     openGroupIds = [],
     onToggleGroups = () => {},
     hoveredTaskState,
+    isParentMapped,
   } = props;
   const { colors } = useTheme();
   const { selected, onSelect } = useSelection();
@@ -168,7 +170,7 @@ const Row = (props: RowProps) => {
           <TaskName
             onToggle={memoizedToggle}
             isGroup={isGroup}
-            isMapped={task.isMapped}
+            isMapped={task.isMapped && !isParentMapped}
             label={task.label || task.id || ''}
             isOpen={isOpen}
             level={level}
@@ -191,7 +193,10 @@ const Row = (props: RowProps) => {
       </Tr>
       {isGroup && isOpen && (
         renderTaskRows({
-          ...props, level: level + 1, openParentCount: openParentCount + 1,
+          ...props,
+          level: level + 1,
+          openParentCount: openParentCount + 1,
+          isParentMapped: task.isMapped,
         })
       )}
     </>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 16b4818d7d..cf6d371441 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -121,7 +121,7 @@ from airflow.utils.net import get_hostname
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.strings import to_boolean
-from airflow.utils.task_group import task_group_to_dict
+from airflow.utils.task_group import MappedTaskGroup, task_group_to_dict
 from airflow.utils.timezone import td_format, utcnow
 from airflow.version import version
 from airflow.www import auth, utils as wwwutils
@@ -281,7 +281,7 @@ def dag_to_grid(dag, dag_runs, session):
 
     grouped_tis = {task_id: list(tis) for task_id, tis in 
itertools.groupby(query, key=lambda ti: ti.task_id)}
 
-    def task_group_to_grid(item, dag_runs, grouped_tis):
+    def task_group_to_grid(item, dag_runs, grouped_tis, 
is_parent_mapped=False):
         if isinstance(item, AbstractOperator):
 
             def _get_summary(task_instance):
@@ -341,7 +341,7 @@ def dag_to_grid(dag, dag_runs, session):
                     set_overall_state(record)
                     yield record
 
-            if isinstance(item, MappedOperator):
+            if isinstance(item, MappedOperator) or is_parent_mapped:
                 instances = list(_mapped_summary(grouped_tis.get(item.task_id, 
[])))
             else:
                 instances = list(map(_get_summary, 
grouped_tis.get(item.task_id, [])))
@@ -351,16 +351,18 @@ def dag_to_grid(dag, dag_runs, session):
                 "instances": instances,
                 "label": item.label,
                 "extra_links": item.extra_links,
-                "is_mapped": isinstance(item, MappedOperator),
+                "is_mapped": isinstance(item, MappedOperator) or 
is_parent_mapped,
                 "has_outlet_datasets": any(isinstance(i, Dataset) for i in 
(item.outlets or [])),
                 "operator": item.operator_name,
             }
 
         # Task Group
         task_group = item
+        group_is_mapped = isinstance(task_group, MappedTaskGroup)
 
         children = [
-            task_group_to_grid(child, dag_runs, grouped_tis) for child in 
task_group.topological_sort()
+            task_group_to_grid(child, dag_runs, grouped_tis, group_is_mapped)
+            for child in task_group.topological_sort()
         ]
 
         def get_summary(dag_run, children):
@@ -389,6 +391,66 @@ def dag_to_grid(dag, dag_runs, session):
                 "end_date": group_end_date,
             }
 
+        def get_mapped_group_summaries(run_ids, children):
+            mapped_instances = (
+                session.query(
+                    TaskInstance.task_id, TaskInstance.state, 
TaskInstance.run_id, TaskInstance.map_index
+                )
+                .filter(
+                    TaskInstance.dag_id == dag.dag_id,
+                    TaskInstance.task_id.in_(child["id"] for child in 
children),
+                    TaskInstance.run_id.in_(run_ids),
+                )
+                .all()
+            )
+
+            def get_mapped_group_summary(run_id, mapped_instances):
+                map_length = max(mi.map_index for mi in mapped_instances) + 1
+                child_instances = [child["instances"] for child in children if 
"instances" in child]
+                child_instances = [
+                    item for sublist in child_instances for item in sublist if 
item["run_id"] == run_id
+                ]
+
+                children_start_dates = (item["start_date"] for item in 
child_instances if item)
+                children_end_dates = (item["end_date"] for item in 
child_instances if item)
+                children_states = {item["state"] for item in child_instances 
if item}
+
+                mapped_states = {}
+                for i in range(0, map_length):
+                    child_states = [mi.state for mi in mapped_instances if 
mi.map_index == i]
+                    for state in wwwutils.priority:
+                        if state in child_states:
+                            value = state.value if state is not None else 
"no_status"
+                            if value in mapped_states:
+                                mapped_states[value] += 1
+                            else:
+                                mapped_states[value] = 1
+                            break
+
+                group_state = None
+                for state in wwwutils.priority:
+                    if state in children_states:
+                        group_state = state
+                        break
+                group_start_date = min(filter(None, children_start_dates), 
default=None)
+                group_end_date = max(filter(None, children_end_dates), 
default=None)
+
+                return {
+                    "task_id": task_group.group_id,
+                    "run_id": run_id,
+                    "state": group_state,
+                    "start_date": group_start_date,
+                    "end_date": group_end_date,
+                    "mapped_states": mapped_states,
+                }
+
+            return [
+                get_mapped_group_summary(
+                    run_id, mapped_instances=[mi for mi in mapped_instances if 
mi.run_id == run_id]
+                )
+                for run_id in run_ids
+            ]
+
         # We don't need to calculate summaries for the root
         if task_group.group_id is None:
             return {
@@ -398,6 +460,18 @@ def dag_to_grid(dag, dag_runs, session):
                 "instances": [],
             }
 
+        if group_is_mapped:
+            mapped_group_summaries = get_mapped_group_summaries([dr.run_id for 
dr in dag_runs], children)
+
+            return {
+                "id": task_group.group_id,
+                "label": task_group.label,
+                "children": children,
+                "tooltip": task_group.tooltip,
+                "instances": mapped_group_summaries,
+                "is_mapped": group_is_mapped,
+            }
+
         group_summaries = [get_summary(dr, children) for dr in dag_runs]
 
         return {
diff --git a/tests/www/views/test_views_grid.py 
b/tests/www/views/test_views_grid.py
index 6985ce103b..f0fef3a63b 100644
--- a/tests/www/views/test_views_grid.py
+++ b/tests/www/views/test_views_grid.py
@@ -22,6 +22,7 @@ import pytest
 from dateutil.tz import UTC
 
 from airflow.datasets import Dataset
+from airflow.decorators import task_group
 from airflow.lineage.entities import File
 from airflow.models import DagBag
 from airflow.models.dagrun import DagRun
@@ -64,6 +65,12 @@ def dag_without_runs(dag_maker, session, app, monkeypatch):
 
         with dag_maker(dag_id=DAG_ID, serialized=True, session=session):
             EmptyOperator(task_id="task1")
+
+            @task_group
+            def mapped_task_group(arg1):
+                return MockOperator(task_id="subtask2", arg1=arg1)
+
+            mapped_task_group.expand(arg1=["a", "b", "c"])
             with TaskGroup(group_id="group"):
                 MockOperator.partial(task_id="mapped").expand(arg1=["a", "b", 
"c", "d"])
 
@@ -102,6 +109,24 @@ def test_no_runs(admin_client, dag_without_runs):
                     "label": "task1",
                     "operator": "EmptyOperator",
                 },
+                {
+                    "children": [
+                        {
+                            "extra_links": [],
+                            "has_outlet_datasets": False,
+                            "id": "mapped_task_group.subtask2",
+                            "instances": [],
+                            "is_mapped": True,
+                            "label": "subtask2",
+                            "operator": "MockOperator",
+                        }
+                    ],
+                    "is_mapped": True,
+                    "id": "mapped_task_group",
+                    "instances": [],
+                    "label": "mapped_task_group",
+                    "tooltip": "",
+                },
                 {
                     "children": [
                         {
@@ -231,6 +256,58 @@ def test_one_run(admin_client, dag_with_runs: 
list[DagRun], session):
                     "label": "task1",
                     "operator": "EmptyOperator",
                 },
+                {
+                    "children": [
+                        {
+                            "extra_links": [],
+                            "has_outlet_datasets": False,
+                            "id": "mapped_task_group.subtask2",
+                            "instances": [
+                                {
+                                    "run_id": "run_1",
+                                    "mapped_states": {"success": 3},
+                                    "start_date": None,
+                                    "end_date": None,
+                                    "state": "success",
+                                    "task_id": "mapped_task_group.subtask2",
+                                },
+                                {
+                                    "run_id": "run_2",
+                                    "mapped_states": {"no_status": 3},
+                                    "start_date": None,
+                                    "end_date": None,
+                                    "state": None,
+                                    "task_id": "mapped_task_group.subtask2",
+                                },
+                            ],
+                            "is_mapped": True,
+                            "label": "subtask2",
+                            "operator": "MockOperator",
+                        }
+                    ],
+                    "is_mapped": True,
+                    "id": "mapped_task_group",
+                    "instances": [
+                        {
+                            "end_date": None,
+                            "run_id": "run_1",
+                            "mapped_states": {"success": 3},
+                            "start_date": None,
+                            "state": "success",
+                            "task_id": "mapped_task_group",
+                        },
+                        {
+                            "run_id": "run_2",
+                            "start_date": None,
+                            "end_date": None,
+                            "state": None,
+                            "mapped_states": {"no_status": 3},
+                            "task_id": "mapped_task_group",
+                        },
+                    ],
+                    "label": "mapped_task_group",
+                    "tooltip": "",
+                },
                 {
                     "children": [
                         {
@@ -291,7 +368,7 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun], 
session):
 
 def test_query_count(dag_with_runs, session):
     run1, run2 = dag_with_runs
-    with assert_queries_count(1):
+    with assert_queries_count(2):
         dag_to_grid(run1.dag, (run1, run2), session)
 
 

Reply via email to