This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 79b8cfc0fa Sort data before groupby in TIS duration calculation
(#33535)
79b8cfc0fa is described below
commit 79b8cfc0fa77f11491fc1de4d5f009e176aa7c3a
Author: Andrey Anshin <[email protected]>
AuthorDate: Thu Aug 24 12:58:29 2023 +0400
Sort data before groupby in TIS duration calculation (#33535)
---
airflow/www/views.py | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 018b720b4f..0e55ed60ab 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3236,10 +3236,12 @@ class Airflow(AirflowBaseView):
if failed_task_instance.duration:
fails_totals[dict_key] += failed_task_instance.duration
- # we must group any mapped TIs by dag_id, task_id, run_id
+ # We must group any mapped TIs by dag_id, task_id, run_id
+ def grouping_key(ti: TaskInstance):
+ return ti.dag_id, ti.task_id, ti.run_id
+
mapped_tis = set()
- tis_grouped = itertools.groupby(task_instances, lambda x: (x.dag_id,
x.task_id, x.run_id))
- for _, group in tis_grouped:
+ for _, group in itertools.groupby(sorted(task_instances,
key=grouping_key), key=grouping_key):
tis = list(group)
duration = sum(x.duration for x in tis if x.duration)
if duration: