bbovenzi commented on code in PR #24284:
URL: https://github.com/apache/airflow/pull/24284#discussion_r891430623
##########
airflow/www/views.py:
##########
@@ -250,62 +251,102 @@ def _safe_parse_datetime(v):
abort(400, f"Invalid datetime: {v!r}")
-def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
+def dag_to_grid(dag, dag_runs, session):
"""
- Create a nested dict representation of this TaskGroup and its children
used to construct
- the Graph.
+ Create a nested dict representation of the DAG's TaskGroup and its children
+ used to construct the Graph and Grid views.
"""
- if isinstance(task_item_or_group, AbstractOperator):
- return {
- 'id': task_item_or_group.task_id,
- 'instances': wwwutils.get_task_summaries(task_item_or_group,
dag_runs, session),
- 'label': task_item_or_group.label,
- 'extra_links': task_item_or_group.extra_links,
- 'is_mapped': task_item_or_group.is_mapped,
- }
+ query = (
+ session.query(
+ TaskInstance.task_id,
+ TaskInstance.run_id,
+ TaskInstance.map_index,
+ TaskInstance.state,
+ TaskInstance.start_date,
+ TaskInstance.end_date,
+ TaskInstance._try_number,
+ )
+ .filter(
+ TaskInstance.dag_id == dag.dag_id,
+ TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
+ # Only get normal task instances or the first mapped task
+ TaskInstance.map_index <= 0,
+ )
+ .order_by(TaskInstance.task_id)
+ )
- # Task Group
- task_group = task_item_or_group
+ grouped_tis = {task_id: list(tis) for task_id, tis in
itertools.groupby(query, key=lambda ti: ti.task_id)}
- children = [task_group_to_grid(child, dag, dag_runs, session) for child in
task_group.topological_sort()]
+ def task_group_to_grid(item, dag_runs, grouped_tis):
+ if isinstance(item, AbstractOperator):
- def get_summary(dag_run, children):
- child_instances = [child['instances'] for child in children if
'instances' in child]
- child_instances = [item for sublist in child_instances for item in
sublist]
+ def _get_summary(task_instance):
+ try_count = (
+ task_instance._try_number
+ if task_instance._try_number != 0 or task_instance.state
in State.running
+ else task_instance._try_number + 1
+ )
- children_start_dates = [item['start_date'] for item in child_instances
if item]
- children_end_dates = [item['end_date'] for item in child_instances if
item]
- children_states = [item['state'] for item in child_instances if item]
+ return {
+ 'task_id': task_instance.task_id,
+ 'run_id': task_instance.run_id,
+ 'map_index': task_instance.map_index,
+ 'state': task_instance.state,
+ 'start_date': task_instance.start_date,
+ 'end_date': task_instance.end_date,
+ 'try_number': try_count,
Review Comment:
This no longer returns the `mapped_states` to give a status breakdown of
mapped tasks. Do we still want that? If not, I need to change one bit of react
code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]