This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new a361291b222 [v2-10-test] Avoid grouping task instance stats by
try_number for dynamic mapped tasks (#44300) (#44319)
a361291b222 is described below
commit a361291b22211ffd197e8fdba9edfb6e22b456aa
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Nov 24 11:37:10 2024 +0000
[v2-10-test] Avoid grouping task instance stats by try_number for dynamic
mapped tasks (#44300) (#44319)
* [v2-10-test] Avoid grouping task instance stats by try_number for dynamic
mapped tasks (#44300)
(cherry picked from commit 5e52bd29abd690098ecf0701b8aab4792566eea3)
Co-authored-by: Shahar Epstein <[email protected]>
* Update test_views_grid.py
---------
Co-authored-by: Shahar Epstein <[email protected]>
---
airflow/www/views.py | 15 +++++++++++--
newsfragments/44300.bugfix.rst | 1 +
tests/www/views/test_views_grid.py | 44 ++++++++++++++++++++++++++++++++++++++
3 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index bb88da2cdfa..222759e6e67 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -316,7 +316,10 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun],
session: Session) ->
TaskInstance.task_id,
TaskInstance.run_id,
TaskInstance.state,
- TaskInstance.try_number,
+ case(
+ (TaskInstance.map_index == -1, TaskInstance.try_number),
+ else_=None,
+ ).label("try_number"),
func.min(TaskInstanceNote.content).label("note"),
func.count(func.coalesce(TaskInstance.state,
sqla.literal("no_status"))).label("state_count"),
func.min(TaskInstance.queued_dttm).label("queued_dttm"),
@@ -328,7 +331,15 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun],
session: Session) ->
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
)
- .group_by(TaskInstance.task_id, TaskInstance.run_id,
TaskInstance.state, TaskInstance.try_number)
+ .group_by(
+ TaskInstance.task_id,
+ TaskInstance.run_id,
+ TaskInstance.state,
+ case(
+ (TaskInstance.map_index == -1, TaskInstance.try_number),
+ else_=None,
+ ),
+ )
.order_by(TaskInstance.task_id, TaskInstance.run_id)
)
diff --git a/newsfragments/44300.bugfix.rst b/newsfragments/44300.bugfix.rst
new file mode 100644
index 00000000000..ffd4b07b2ab
--- /dev/null
+++ b/newsfragments/44300.bugfix.rst
@@ -0,0 +1 @@
+Fix stats of dynamic mapped tasks after automatic retries of failed tasks
diff --git a/tests/www/views/test_views_grid.py
b/tests/www/views/test_views_grid.py
index 0b822798801..7cafb6a4c8e 100644
--- a/tests/www/views/test_views_grid.py
+++ b/tests/www/views/test_views_grid.py
@@ -517,3 +517,47 @@ def test_next_run_datasets_404(admin_client):
resp = admin_client.get("/object/next_run_datasets/missingdag",
follow_redirects=True)
assert resp.status_code == 404, resp.json
assert resp.json == {"error": "can't find dag missingdag"}
+
+
[email protected]("freeze_time_for_dagruns")
+def test_dynamic_mapped_task_with_retries(admin_client, dag_with_runs:
list[DagRun], session):
+ """
+ Test a DAG with a dynamic mapped task with retries
+ """
+ run1, run2 = dag_with_runs
+
+ for ti in run1.task_instances:
+ ti.state = TaskInstanceState.SUCCESS
+ for ti in sorted(run2.task_instances, key=lambda ti: (ti.task_id,
ti.map_index)):
+ if ti.task_id == "task1":
+ ti.state = TaskInstanceState.SUCCESS
+ elif ti.task_id == "group.mapped":
+ if ti.map_index == 0:
+ ti.state = TaskInstanceState.FAILED
+ ti.start_date = pendulum.DateTime(2021, 7, 1, 1, 0, 0,
tzinfo=pendulum.UTC)
+ ti.end_date = pendulum.DateTime(2021, 7, 1, 1, 2, 3,
tzinfo=pendulum.UTC)
+ elif ti.map_index == 1:
+ ti.try_number = 1
+ ti.state = TaskInstanceState.SUCCESS
+ ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4,
tzinfo=pendulum.UTC)
+ ti.end_date = None
+ elif ti.map_index == 2:
+ ti.try_number = 2
+ ti.state = TaskInstanceState.FAILED
+ ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4,
tzinfo=pendulum.UTC)
+ ti.end_date = None
+ elif ti.map_index == 3:
+ ti.try_number = 3
+ ti.state = TaskInstanceState.SUCCESS
+ ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4,
tzinfo=pendulum.UTC)
+ ti.end_date = None
+ session.flush()
+
+ resp = admin_client.get(f"/object/grid_data?dag_id={DAG_ID}",
follow_redirects=True)
+
+ assert resp.status_code == 200, resp.json
+
+ assert
resp.json["groups"]["children"][-1]["children"][-1]["instances"][-1]["mapped_states"]
== {
+ "failed": 2,
+ "success": 2,
+ }