This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 800f733c22f fix: enable api to clear ti instances by specifying map 
indexes (#56346)
800f733c22f is described below

commit 800f733c22f1e1c480f870a6aa5e0f0e4cea2a53
Author: Zhen-Lun (Kevin) Hong <[email protected]>
AuthorDate: Tue Oct 21 18:59:25 2025 +0800

    fix: enable api to clear ti instances by specifying map indexes (#56346)
    
    * fix: enable api to clear ti instances by specifying map indexes
    
    * chore: add tests for clearing mapped task instances from api endpoint
    
    * chore: add descriptions to task_ids in the payload
    
    * fix: deal with tests that were broken when map_indexes was introduced
    
    * chore: generate datamodel and api spec
    
    * chore: rewrite clear task_ids for clarity and remove duplicate tasks
    
    * Small adjustments
    
    ---------
    
    Co-authored-by: pierrejeambrun <[email protected]>
---
 .../core_api/datamodels/task_instances.py          |   6 +-
 .../core_api/openapi/v2-rest-api-generated.yaml    |   3 +
 .../core_api/routes/public/task_instances.py       |  27 ++--
 .../airflow/serialization/serialized_objects.py    |   3 +
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |   3 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |   3 +
 .../core_api/routes/public/test_task_instances.py  | 141 ++++++++++++++++++---
 .../src/airflowctl/api/datamodels/generated.py     |   8 +-
 8 files changed, 167 insertions(+), 27 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 45fa65762c6..1d96a91c51d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -150,7 +150,11 @@ class ClearTaskInstancesBody(StrictBaseModel):
     only_failed: bool = True
     only_running: bool = False
     reset_dag_runs: bool = True
-    task_ids: list[str | tuple[str, int]] | None = None
+    task_ids: list[str | tuple[str, int]] | None = Field(
+        default=None,
+        description="A list of `task_id` or [`task_id`, `map_index`]. "
+        "If only the `task_id` is provided for a mapped task, all of its map 
indices will be targeted.",
+    )
     dag_run_id: str | None = None
     include_upstream: bool = False
     include_downstream: bool = False
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 419746b7bf4..eae45b1f1c1 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -9501,6 +9501,9 @@ components:
             type: array
           - type: 'null'
           title: Task Ids
+          description: A list of `task_id` or [`task_id`, `map_index`]. If 
only the
+            `task_id` is provided for a mapped task, all of its map indices 
will be
+            targeted.
         dag_run_id:
           anyOf:
           - type: string
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index f834d3b5180..28485e945f3 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -745,16 +745,25 @@ def post_clear_task_instances(
 
     task_ids = body.task_ids
     if task_ids is not None:
-        task_id = [task[0] if isinstance(task, tuple) else task for task in 
task_ids]
-        dag = dag.partial_subset(
-            task_ids=task_id,
-            include_downstream=downstream,
-            include_upstream=upstream,
-        )
+        tasks = set(task_ids)
+        mapped_tasks_tuples = set(t for t in tasks if isinstance(t, tuple))
+        # Unmapped tasks are expressed in their task_ids (without map_indexes)
+        unmapped_task_ids = set(t for t in tasks if not isinstance(t, tuple))
+
+        if upstream or downstream:
+            mapped_task_ids = set(tid for tid, _ in mapped_tasks_tuples)
+            relatives = dag.partial_subset(
+                task_ids=unmapped_task_ids | mapped_task_ids,
+                include_downstream=downstream,
+                include_upstream=upstream,
+                exclude_original=True,
+            )
+            unmapped_task_ids = unmapped_task_ids | 
set(relatives.task_dict.keys())
 
-        if len(dag.task_dict) > 1:
-            # If we had upstream/downstream etc then also include those!
-            task_ids.extend(tid for tid in dag.task_dict if tid != task_id)
+        mapped_tasks_list = [
+            (tid, map_id) for tid, map_id in mapped_tasks_tuples if tid not in 
unmapped_task_ids
+        ]
+        task_ids = mapped_tasks_list + list(unmapped_task_ids)
 
     # Prepare common parameters
     common_params = {
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 94a0a3dfafd..47e790da150 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -2930,6 +2930,7 @@ class SerializedDAG(BaseSerialization):
         include_downstream: bool = False,
         include_upstream: bool = True,
         include_direct_upstream: bool = False,
+        exclude_original: bool = False,
     ):
         from airflow.models.mappedoperator import MappedOperator as 
SerializedMappedOperator
 
@@ -2980,6 +2981,8 @@ class SerializedDAG(BaseSerialization):
             return copy.deepcopy(t, memo)
 
         # Compiling the unique list of tasks that made the cut
+        if exclude_original:
+            matched_tasks = []
         dag.task_dict = {
             t.task_id: _deepcopy_task(t)
             for t in itertools.chain(matched_tasks, also_include, 
direct_upstreams)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 159478097f6..5e826f98f4b 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1290,7 +1290,8 @@ export const $ClearTaskInstancesBody = {
                     type: 'null'
                 }
             ],
-            title: 'Task Ids'
+            title: 'Task Ids',
+            description: 'A list of `task_id` or [`task_id`, `map_index`]. If 
only the `task_id` is provided for a mapped task, all of its map indices will 
be targeted.'
         },
         dag_run_id: {
             anyOf: [
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 892f630989d..6db170921b0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -406,6 +406,9 @@ export type ClearTaskInstancesBody = {
     only_failed?: boolean;
     only_running?: boolean;
     reset_dag_runs?: boolean;
+    /**
+     * A list of `task_id` or [`task_id`, `map_index`]. If only the `task_id` 
is provided for a mapped task, all of its map indices will be targeted.
+     */
     task_ids?: Array<(string | [
     string,
     number
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 6aaa54b8479..85f32d2f817 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -119,12 +119,14 @@ class TestTaskInstanceEndpoint:
         dag_version = DagVersion.get_latest_version(dag.dag_id, 
session=session)
         tis = []
         for i in range(counter):
-            if task_instances is None:
-                pass
-            elif update_extras:
-                self.ti_extras.update(task_instances[i])
-            else:
-                self.ti_init.update(task_instances[i])
+            map_indexes = (-1,)
+            if task_instances:
+                map_index = task_instances[i].get("map_index", -1)
+                map_indexes = task_instances[i].pop("map_indexes", 
(map_index,))
+                if update_extras:
+                    self.ti_extras.update(task_instances[i])
+                else:
+                    self.ti_init.update(task_instances[i])
 
             if "logical_date" in self.ti_init:
                 run_id = f"TEST_DAG_RUN_ID_{i}"
@@ -143,14 +145,17 @@ class TestTaskInstanceEndpoint:
                 session.flush()
             if TYPE_CHECKING:
                 assert dag_version
-            ti = TaskInstance(task=tasks[i], **self.ti_init, 
dag_version_id=dag_version.id)
-            session.add(ti)
-            ti.dag_run = dr
-            ti.note = "placeholder-note"
 
-            for key, value in self.ti_extras.items():
-                setattr(ti, key, value)
-            tis.append(ti)
+            for mi in map_indexes:
+                kwargs = self.ti_init | {"map_index": mi}
+                ti = TaskInstance(task=tasks[i], **kwargs, 
dag_version_id=dag_version.id)
+                session.add(ti)
+                ti.dag_run = dr
+                ti.note = "placeholder-note"
+
+                for key, value in self.ti_extras.items():
+                    setattr(ti, key, value)
+                tis.append(ti)
 
         session.flush()
 
@@ -2502,10 +2507,116 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
                 "example_python_operator",
                 {
                     "dry_run": False,
-                    "task_ids": [["print_the_context", 0], "sleep_for_1"],
+                    "task_ids": [["print_the_context", -1], "sleep_for_1"],
                 },
                 2,
-                id="clear mapped task and unmapped tasks together",
+                id="clear unmapped tasks with and without map index",
+            ),
+            pytest.param(
+                "example_task_mapping_second_order",
+                [
+                    {
+                        "logical_date": DEFAULT_DATETIME_1,
+                        "state": State.FAILED,
+                    },
+                    {
+                        "logical_date": DEFAULT_DATETIME_1 + 
dt.timedelta(days=1),
+                        "state": State.FAILED,
+                        "map_indexes": (0, 1, 2),
+                    },
+                    {
+                        "logical_date": DEFAULT_DATETIME_1 + 
dt.timedelta(days=2),
+                        "state": State.FAILED,
+                        "map_indexes": (0, 1, 2),
+                    },
+                ],
+                "example_task_mapping_second_order",
+                {
+                    "dry_run": False,
+                    "task_ids": [["times_2", 0], ["add_10", 1]],
+                },
+                2,
+                id="clear multiple mapped tasks",
+            ),
+            pytest.param(
+                "example_task_mapping_second_order",
+                [
+                    {
+                        "logical_date": DEFAULT_DATETIME_1,
+                        "state": State.FAILED,
+                    },
+                    {
+                        "logical_date": DEFAULT_DATETIME_1 + 
dt.timedelta(days=1),
+                        "state": State.FAILED,
+                        "map_indexes": (0, 1, 2),
+                    },
+                    {
+                        "logical_date": DEFAULT_DATETIME_1 + 
dt.timedelta(days=2),
+                        "state": State.FAILED,
+                        "map_indexes": (0, 1, 2),
+                    },
+                ],
+                "example_task_mapping_second_order",
+                {
+                    "dry_run": False,
+                    "task_ids": [["times_2", 0], ["add_10", 1]],
+                    "include_upstream": True,
+                },
+                5,
+                id="clear mapped tasks and upstream tasks",
+            ),
+            pytest.param(
+                "example_task_mapping_second_order",
+                [
+                    {
+                        "logical_date": DEFAULT_DATETIME_1,
+                        "state": State.FAILED,
+                    },
+                    {
+                        "logical_date": DEFAULT_DATETIME_1 + 
dt.timedelta(days=1),
+                        "state": State.FAILED,
+                        "map_indexes": (0, 1, 2),
+                    },
+                    {
+                        "logical_date": DEFAULT_DATETIME_1 + 
dt.timedelta(days=2),
+                        "state": State.FAILED,
+                        "map_indexes": (0, 1, 2),
+                    },
+                ],
+                "example_task_mapping_second_order",
+                {
+                    "dry_run": False,
+                    "task_ids": [["times_2", 0], ["add_10", 1]],
+                    "include_downstream": True,
+                },
+                4,
+                id="clear mapped tasks and downstream tasks",
+            ),
+            pytest.param(
+                "example_task_mapping_second_order",
+                [
+                    {
+                        "logical_date": DEFAULT_DATETIME_1,
+                        "state": State.FAILED,
+                    },
+                    {
+                        "logical_date": DEFAULT_DATETIME_1 + 
dt.timedelta(days=1),
+                        "state": State.FAILED,
+                        "map_indexes": (0, 1, 2),
+                    },
+                    {
+                        "logical_date": DEFAULT_DATETIME_1 + 
dt.timedelta(days=2),
+                        "state": State.FAILED,
+                        "map_indexes": (0, 1, 2),
+                    },
+                ],
+                "example_task_mapping_second_order",
+                {
+                    "dry_run": False,
+                    "task_ids": [["times_2", 0], "add_10"],
+                },
+                4,
+                id="clear mapped tasks with and without map index",
             ),
         ],
     )
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 15d090f7d10..dc87837b8c2 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -153,7 +153,13 @@ class ClearTaskInstancesBody(BaseModel):
     only_failed: Annotated[bool | None, Field(title="Only Failed")] = True
     only_running: Annotated[bool | None, Field(title="Only Running")] = False
     reset_dag_runs: Annotated[bool | None, Field(title="Reset Dag Runs")] = 
True
-    task_ids: Annotated[list[str | TaskIds] | None, Field(title="Task Ids")] = 
None
+    task_ids: Annotated[
+        list[str | TaskIds] | None,
+        Field(
+            description="A list of `task_id` or [`task_id`, `map_index`]. If 
only the `task_id` is provided for a mapped task, all of its map indices will 
be targeted.",
+            title="Task Ids",
+        ),
+    ] = None
     dag_run_id: Annotated[str | None, Field(title="Dag Run Id")] = None
     include_upstream: Annotated[bool | None, Field(title="Include Upstream")] 
= False
     include_downstream: Annotated[bool | None, Field(title="Include 
Downstream")] = False

Reply via email to