This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new a361291b222 [v2-10-test] Avoid grouping task instance stats by 
try_number for dynamic mapped tasks (#44300) (#44319)
a361291b222 is described below

commit a361291b22211ffd197e8fdba9edfb6e22b456aa
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Nov 24 11:37:10 2024 +0000

    [v2-10-test] Avoid grouping task instance stats by try_number for dynamic 
mapped tasks (#44300) (#44319)
    
    * [v2-10-test] Avoid grouping task instance stats by try_number for dynamic 
mapped tasks (#44300)
    (cherry picked from commit 5e52bd29abd690098ecf0701b8aab4792566eea3)
    
    Co-authored-by: Shahar Epstein <[email protected]>
    
    * Update test_views_grid.py
    
    ---------
    
    Co-authored-by: Shahar Epstein <[email protected]>
---
 airflow/www/views.py               | 15 +++++++++++--
 newsfragments/44300.bugfix.rst     |  1 +
 tests/www/views/test_views_grid.py | 44 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 58 insertions(+), 2 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index bb88da2cdfa..222759e6e67 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -316,7 +316,10 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], 
session: Session) ->
             TaskInstance.task_id,
             TaskInstance.run_id,
             TaskInstance.state,
-            TaskInstance.try_number,
+            case(
+                (TaskInstance.map_index == -1, TaskInstance.try_number),
+                else_=None,
+            ).label("try_number"),
             func.min(TaskInstanceNote.content).label("note"),
             func.count(func.coalesce(TaskInstance.state, 
sqla.literal("no_status"))).label("state_count"),
             func.min(TaskInstance.queued_dttm).label("queued_dttm"),
@@ -328,7 +331,15 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], 
session: Session) ->
             TaskInstance.dag_id == dag.dag_id,
             TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
         )
-        .group_by(TaskInstance.task_id, TaskInstance.run_id, 
TaskInstance.state, TaskInstance.try_number)
+        .group_by(
+            TaskInstance.task_id,
+            TaskInstance.run_id,
+            TaskInstance.state,
+            case(
+                (TaskInstance.map_index == -1, TaskInstance.try_number),
+                else_=None,
+            ),
+        )
         .order_by(TaskInstance.task_id, TaskInstance.run_id)
     )
 
diff --git a/newsfragments/44300.bugfix.rst b/newsfragments/44300.bugfix.rst
new file mode 100644
index 00000000000..ffd4b07b2ab
--- /dev/null
+++ b/newsfragments/44300.bugfix.rst
@@ -0,0 +1 @@
+Fix stats of dynamic mapped tasks after automatic retries of failed tasks
diff --git a/tests/www/views/test_views_grid.py 
b/tests/www/views/test_views_grid.py
index 0b822798801..7cafb6a4c8e 100644
--- a/tests/www/views/test_views_grid.py
+++ b/tests/www/views/test_views_grid.py
@@ -517,3 +517,47 @@ def test_next_run_datasets_404(admin_client):
     resp = admin_client.get("/object/next_run_datasets/missingdag", 
follow_redirects=True)
     assert resp.status_code == 404, resp.json
     assert resp.json == {"error": "can't find dag missingdag"}
+
+
[email protected]("freeze_time_for_dagruns")
+def test_dynamic_mapped_task_with_retries(admin_client, dag_with_runs: 
list[DagRun], session):
+    """
+    Test a DAG with a dynamic mapped task with retries
+    """
+    run1, run2 = dag_with_runs
+
+    for ti in run1.task_instances:
+        ti.state = TaskInstanceState.SUCCESS
+    for ti in sorted(run2.task_instances, key=lambda ti: (ti.task_id, 
ti.map_index)):
+        if ti.task_id == "task1":
+            ti.state = TaskInstanceState.SUCCESS
+        elif ti.task_id == "group.mapped":
+            if ti.map_index == 0:
+                ti.state = TaskInstanceState.FAILED
+                ti.start_date = pendulum.DateTime(2021, 7, 1, 1, 0, 0, 
tzinfo=pendulum.UTC)
+                ti.end_date = pendulum.DateTime(2021, 7, 1, 1, 2, 3, 
tzinfo=pendulum.UTC)
+            elif ti.map_index == 1:
+                ti.try_number = 1
+                ti.state = TaskInstanceState.SUCCESS
+                ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4, 
tzinfo=pendulum.UTC)
+                ti.end_date = None
+            elif ti.map_index == 2:
+                ti.try_number = 2
+                ti.state = TaskInstanceState.FAILED
+                ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4, 
tzinfo=pendulum.UTC)
+                ti.end_date = None
+            elif ti.map_index == 3:
+                ti.try_number = 3
+                ti.state = TaskInstanceState.SUCCESS
+                ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4, 
tzinfo=pendulum.UTC)
+                ti.end_date = None
+    session.flush()
+
+    resp = admin_client.get(f"/object/grid_data?dag_id={DAG_ID}", 
follow_redirects=True)
+
+    assert resp.status_code == 200, resp.json
+
+    assert 
resp.json["groups"]["children"][-1]["children"][-1]["instances"][-1]["mapped_states"]
 == {
+        "failed": 2,
+        "success": 2,
+    }

Reply via email to