This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 800f733c22f fix: enable api to clear ti instances by specifying map
indexes (#56346)
800f733c22f is described below
commit 800f733c22f1e1c480f870a6aa5e0f0e4cea2a53
Author: Zhen-Lun (Kevin) Hong <[email protected]>
AuthorDate: Tue Oct 21 18:59:25 2025 +0800
fix: enable api to clear ti instances by specifying map indexes (#56346)
* fix: enable api to clear ti instances by specifying map indexes
* chore: add tests for clearing mapped task instances from api endpoint
* chore: add descriptions to task_ids in the payload
* fix: deal with tests that were broken when map_indexes was introduced
* chore: generate datamodel and api spec
* chore: rewrite clear task_ids for clarity and remove duplicate tasks
* Small adjustments
---------
Co-authored-by: pierrejeambrun <[email protected]>
---
.../core_api/datamodels/task_instances.py | 6 +-
.../core_api/openapi/v2-rest-api-generated.yaml | 3 +
.../core_api/routes/public/task_instances.py | 27 ++--
.../airflow/serialization/serialized_objects.py | 3 +
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 3 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 3 +
.../core_api/routes/public/test_task_instances.py | 141 ++++++++++++++++++---
.../src/airflowctl/api/datamodels/generated.py | 8 +-
8 files changed, 167 insertions(+), 27 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 45fa65762c6..1d96a91c51d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -150,7 +150,11 @@ class ClearTaskInstancesBody(StrictBaseModel):
only_failed: bool = True
only_running: bool = False
reset_dag_runs: bool = True
- task_ids: list[str | tuple[str, int]] | None = None
+ task_ids: list[str | tuple[str, int]] | None = Field(
+ default=None,
+ description="A list of `task_id` or [`task_id`, `map_index`]. "
+ "If only the `task_id` is provided for a mapped task, all of its map
indices will be targeted.",
+ )
dag_run_id: str | None = None
include_upstream: bool = False
include_downstream: bool = False
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 419746b7bf4..eae45b1f1c1 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -9501,6 +9501,9 @@ components:
type: array
- type: 'null'
title: Task Ids
+ description: A list of `task_id` or [`task_id`, `map_index`]. If
only the
+ `task_id` is provided for a mapped task, all of its map indices
will be
+ targeted.
dag_run_id:
anyOf:
- type: string
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index f834d3b5180..28485e945f3 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -745,16 +745,25 @@ def post_clear_task_instances(
task_ids = body.task_ids
if task_ids is not None:
- task_id = [task[0] if isinstance(task, tuple) else task for task in
task_ids]
- dag = dag.partial_subset(
- task_ids=task_id,
- include_downstream=downstream,
- include_upstream=upstream,
- )
+ tasks = set(task_ids)
+ mapped_tasks_tuples = set(t for t in tasks if isinstance(t, tuple))
+ # Unmapped tasks are expressed in their task_ids (without map_indexes)
+ unmapped_task_ids = set(t for t in tasks if not isinstance(t, tuple))
+
+ if upstream or downstream:
+ mapped_task_ids = set(tid for tid, _ in mapped_tasks_tuples)
+ relatives = dag.partial_subset(
+ task_ids=unmapped_task_ids | mapped_task_ids,
+ include_downstream=downstream,
+ include_upstream=upstream,
+ exclude_original=True,
+ )
+ unmapped_task_ids = unmapped_task_ids |
set(relatives.task_dict.keys())
- if len(dag.task_dict) > 1:
- # If we had upstream/downstream etc then also include those!
- task_ids.extend(tid for tid in dag.task_dict if tid != task_id)
+ mapped_tasks_list = [
+ (tid, map_id) for tid, map_id in mapped_tasks_tuples if tid not in
unmapped_task_ids
+ ]
+ task_ids = mapped_tasks_list + list(unmapped_task_ids)
# Prepare common parameters
common_params = {
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 94a0a3dfafd..47e790da150 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -2930,6 +2930,7 @@ class SerializedDAG(BaseSerialization):
include_downstream: bool = False,
include_upstream: bool = True,
include_direct_upstream: bool = False,
+ exclude_original: bool = False,
):
from airflow.models.mappedoperator import MappedOperator as
SerializedMappedOperator
@@ -2980,6 +2981,8 @@ class SerializedDAG(BaseSerialization):
return copy.deepcopy(t, memo)
# Compiling the unique list of tasks that made the cut
+ if exclude_original:
+ matched_tasks = []
dag.task_dict = {
t.task_id: _deepcopy_task(t)
for t in itertools.chain(matched_tasks, also_include,
direct_upstreams)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 159478097f6..5e826f98f4b 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1290,7 +1290,8 @@ export const $ClearTaskInstancesBody = {
type: 'null'
}
],
- title: 'Task Ids'
+ title: 'Task Ids',
+ description: 'A list of `task_id` or [`task_id`, `map_index`]. If
only the `task_id` is provided for a mapped task, all of its map indices will
be targeted.'
},
dag_run_id: {
anyOf: [
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 892f630989d..6db170921b0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -406,6 +406,9 @@ export type ClearTaskInstancesBody = {
only_failed?: boolean;
only_running?: boolean;
reset_dag_runs?: boolean;
+ /**
+ * A list of `task_id` or [`task_id`, `map_index`]. If only the `task_id`
is provided for a mapped task, all of its map indices will be targeted.
+ */
task_ids?: Array<(string | [
string,
number
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 6aaa54b8479..85f32d2f817 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -119,12 +119,14 @@ class TestTaskInstanceEndpoint:
dag_version = DagVersion.get_latest_version(dag.dag_id,
session=session)
tis = []
for i in range(counter):
- if task_instances is None:
- pass
- elif update_extras:
- self.ti_extras.update(task_instances[i])
- else:
- self.ti_init.update(task_instances[i])
+ map_indexes = (-1,)
+ if task_instances:
+ map_index = task_instances[i].get("map_index", -1)
+ map_indexes = task_instances[i].pop("map_indexes",
(map_index,))
+ if update_extras:
+ self.ti_extras.update(task_instances[i])
+ else:
+ self.ti_init.update(task_instances[i])
if "logical_date" in self.ti_init:
run_id = f"TEST_DAG_RUN_ID_{i}"
@@ -143,14 +145,17 @@ class TestTaskInstanceEndpoint:
session.flush()
if TYPE_CHECKING:
assert dag_version
- ti = TaskInstance(task=tasks[i], **self.ti_init,
dag_version_id=dag_version.id)
- session.add(ti)
- ti.dag_run = dr
- ti.note = "placeholder-note"
- for key, value in self.ti_extras.items():
- setattr(ti, key, value)
- tis.append(ti)
+ for mi in map_indexes:
+ kwargs = self.ti_init | {"map_index": mi}
+ ti = TaskInstance(task=tasks[i], **kwargs,
dag_version_id=dag_version.id)
+ session.add(ti)
+ ti.dag_run = dr
+ ti.note = "placeholder-note"
+
+ for key, value in self.ti_extras.items():
+ setattr(ti, key, value)
+ tis.append(ti)
session.flush()
@@ -2502,10 +2507,116 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
"example_python_operator",
{
"dry_run": False,
- "task_ids": [["print_the_context", 0], "sleep_for_1"],
+ "task_ids": [["print_the_context", -1], "sleep_for_1"],
},
2,
- id="clear mapped task and unmapped tasks together",
+ id="clear unmapped tasks with and without map index",
+ ),
+ pytest.param(
+ "example_task_mapping_second_order",
+ [
+ {
+ "logical_date": DEFAULT_DATETIME_1,
+ "state": State.FAILED,
+ },
+ {
+ "logical_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=1),
+ "state": State.FAILED,
+ "map_indexes": (0, 1, 2),
+ },
+ {
+ "logical_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=2),
+ "state": State.FAILED,
+ "map_indexes": (0, 1, 2),
+ },
+ ],
+ "example_task_mapping_second_order",
+ {
+ "dry_run": False,
+ "task_ids": [["times_2", 0], ["add_10", 1]],
+ },
+ 2,
+ id="clear multiple mapped tasks",
+ ),
+ pytest.param(
+ "example_task_mapping_second_order",
+ [
+ {
+ "logical_date": DEFAULT_DATETIME_1,
+ "state": State.FAILED,
+ },
+ {
+ "logical_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=1),
+ "state": State.FAILED,
+ "map_indexes": (0, 1, 2),
+ },
+ {
+ "logical_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=2),
+ "state": State.FAILED,
+ "map_indexes": (0, 1, 2),
+ },
+ ],
+ "example_task_mapping_second_order",
+ {
+ "dry_run": False,
+ "task_ids": [["times_2", 0], ["add_10", 1]],
+ "include_upstream": True,
+ },
+ 5,
+ id="clear mapped tasks and upstream tasks",
+ ),
+ pytest.param(
+ "example_task_mapping_second_order",
+ [
+ {
+ "logical_date": DEFAULT_DATETIME_1,
+ "state": State.FAILED,
+ },
+ {
+ "logical_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=1),
+ "state": State.FAILED,
+ "map_indexes": (0, 1, 2),
+ },
+ {
+ "logical_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=2),
+ "state": State.FAILED,
+ "map_indexes": (0, 1, 2),
+ },
+ ],
+ "example_task_mapping_second_order",
+ {
+ "dry_run": False,
+ "task_ids": [["times_2", 0], ["add_10", 1]],
+ "include_downstream": True,
+ },
+ 4,
+ id="clear mapped tasks and downstream tasks",
+ ),
+ pytest.param(
+ "example_task_mapping_second_order",
+ [
+ {
+ "logical_date": DEFAULT_DATETIME_1,
+ "state": State.FAILED,
+ },
+ {
+ "logical_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=1),
+ "state": State.FAILED,
+ "map_indexes": (0, 1, 2),
+ },
+ {
+ "logical_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=2),
+ "state": State.FAILED,
+ "map_indexes": (0, 1, 2),
+ },
+ ],
+ "example_task_mapping_second_order",
+ {
+ "dry_run": False,
+ "task_ids": [["times_2", 0], "add_10"],
+ },
+ 4,
+ id="clear mapped tasks with and without map index",
),
],
)
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 15d090f7d10..dc87837b8c2 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -153,7 +153,13 @@ class ClearTaskInstancesBody(BaseModel):
only_failed: Annotated[bool | None, Field(title="Only Failed")] = True
only_running: Annotated[bool | None, Field(title="Only Running")] = False
reset_dag_runs: Annotated[bool | None, Field(title="Reset Dag Runs")] =
True
- task_ids: Annotated[list[str | TaskIds] | None, Field(title="Task Ids")] =
None
+ task_ids: Annotated[
+ list[str | TaskIds] | None,
+ Field(
+ description="A list of `task_id` or [`task_id`, `map_index`]. If
only the `task_id` is provided for a mapped task, all of its map indices will
be targeted.",
+ title="Task Ids",
+ ),
+ ] = None
dag_run_id: Annotated[str | None, Field(title="Dag Run Id")] = None
include_upstream: Annotated[bool | None, Field(title="Include Upstream")]
= False
include_downstream: Annotated[bool | None, Field(title="Include
Downstream")] = False