bbovenzi commented on code in PR #24284:
URL: https://github.com/apache/airflow/pull/24284#discussion_r891576098


##########
airflow/www/views.py:
##########
@@ -250,62 +251,102 @@ def _safe_parse_datetime(v):
         abort(400, f"Invalid datetime: {v!r}")
 
 
-def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
+def dag_to_grid(dag, dag_runs, session):
     """
-    Create a nested dict representation of this TaskGroup and its children 
used to construct
-    the Graph.
+    Create a nested dict representation of the DAG's TaskGroup and its children
+    used to construct the Graph and Grid views.
     """
-    if isinstance(task_item_or_group, AbstractOperator):
-        return {
-            'id': task_item_or_group.task_id,
-            'instances': wwwutils.get_task_summaries(task_item_or_group, 
dag_runs, session),
-            'label': task_item_or_group.label,
-            'extra_links': task_item_or_group.extra_links,
-            'is_mapped': task_item_or_group.is_mapped,
-        }
+    query = (
+        session.query(
+            TaskInstance.task_id,
+            TaskInstance.run_id,
+            TaskInstance.map_index,
+            TaskInstance.state,
+            TaskInstance.start_date,
+            TaskInstance.end_date,
+            TaskInstance._try_number,
+        )
+        .filter(
+            TaskInstance.dag_id == dag.dag_id,
+            TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
+            # Only get normal task instances or the first mapped task
+            TaskInstance.map_index <= 0,
+        )
+        .order_by(TaskInstance.task_id)
+    )
 
-    # Task Group
-    task_group = task_item_or_group
+    grouped_tis = {task_id: list(tis) for task_id, tis in 
itertools.groupby(query, key=lambda ti: ti.task_id)}
 
-    children = [task_group_to_grid(child, dag, dag_runs, session) for child in 
task_group.topological_sort()]
+    def task_group_to_grid(item, dag_runs, grouped_tis):
+        if isinstance(item, AbstractOperator):
 
-    def get_summary(dag_run, children):
-        child_instances = [child['instances'] for child in children if 
'instances' in child]
-        child_instances = [item for sublist in child_instances for item in 
sublist]
+            def _get_summary(task_instance):
+                try_count = (
+                    task_instance._try_number
+                    if task_instance._try_number != 0 or task_instance.state 
in State.running
+                    else task_instance._try_number + 1
+                )
 
-        children_start_dates = [item['start_date'] for item in child_instances 
if item]
-        children_end_dates = [item['end_date'] for item in child_instances if 
item]
-        children_states = [item['state'] for item in child_instances if item]
+                return {
+                    'task_id': task_instance.task_id,
+                    'run_id': task_instance.run_id,
+                    'map_index': task_instance.map_index,
+                    'state': task_instance.state,
+                    'start_date': task_instance.start_date,
+                    'end_date': task_instance.end_date,
+                    'try_number': try_count,

Review Comment:
   We can make the decision on the server. We were giving an overall status and 
a breakdown of the status of all the children that form that overall status. 
But we can just do an overall status if you'd like
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to