This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aa28f27f12d Remove all_map_indices from task_state_store.clear() in
context (#68880)
aa28f27f12d is described below
commit aa28f27f12df228f6f1410a15d363699248eabda
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jun 23 14:27:39 2026 +0530
Remove all_map_indices from task_state_store.clear() in context (#68880)
---
.../docs/core-concepts/task-state-store.rst | 19 ++++++-----------
.../execution_api/routes/task_state_store.py | 24 +++-------------------
.../versions/head/test_task_state_store.py | 19 -----------------
shared/state/src/airflow_shared/state/__init__.py | 15 +++++++-------
task-sdk/src/airflow/sdk/api/client.py | 5 ++---
task-sdk/src/airflow/sdk/execution_time/comms.py | 1 -
task-sdk/src/airflow/sdk/execution_time/context.py | 14 ++++---------
.../airflow/sdk/execution_time/schema/schema.json | 5 -----
.../src/airflow/sdk/execution_time/supervisor.py | 2 +-
task-sdk/tests/task_sdk/api/test_client.py | 12 +----------
.../tests/task_sdk/execution_time/test_context.py | 19 +++--------------
.../task_sdk/execution_time/test_supervisor.py | 13 +-----------
.../task_sdk/execution_time/test_task_runner.py | 15 +++-----------
13 files changed, 32 insertions(+), 131 deletions(-)
diff --git a/airflow-core/docs/core-concepts/task-state-store.rst
b/airflow-core/docs/core-concepts/task-state-store.rst
index c1f5b6475e1..7702fb90f68 100644
--- a/airflow-core/docs/core-concepts/task-state-store.rst
+++ b/airflow-core/docs/core-concepts/task-state-store.rst
@@ -124,21 +124,15 @@ Deletes a single key. No-op if the key does not exist.
task_state_store.delete("job_id")
-``clear(all_map_indices=False)``
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+``clear()``
+~~~~~~~~~~~
Deletes *all* task state store keys for this task instance.
-For :doc:`mapped tasks </authoring-and-scheduling/dynamic-task-mapping>`, the
default clears only the current map index. Pass ``all_map_indices=True`` to
wipe the store across **every** mapped instance of the task (fleet-wide reset).
-
.. code-block:: python
- # clear only this map index
task_state_store.clear()
- # clear all map indices (fleet-wide)
- task_state_store.clear(all_map_indices=True)
-
Some Example Use Cases
----------------------
@@ -255,16 +249,15 @@ Once a task defers, the Triggerer handles continuity
across poke cycles. Use tas
Mapped tasks
------------
-When a task is dynamically mapped (``task.expand(...)``), each map index has
its own task state store namespace. ``clear()`` without arguments clears the
store only for the current index. ``clear(all_map_indices=True)`` wipes the
store across every index of the task.
+When a task is dynamically mapped (``task.expand(...)``), each map index has
its own task state store namespace. ``clear()`` clears only the current index's
store.
+
+To wipe state across all map indices of a task, use the :doc:`Core API
</administration-and-deployment/task-and-asset-state-store>` (e.g. via the UI
or CLI) after the task group has finished.
.. code-block:: python
- # Inside a mapped task — clear only this index
+ # Inside a mapped task — clears only this index
task_state_store.clear()
- # Wipe store for all indices of this task
- task_state_store.clear(all_map_indices=True)
-
Automatic cleanup (``clear_on_success``)
----------------------------------------
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state_store.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state_store.py
index 722aef991a9..624926fe639 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state_store.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state_store.py
@@ -21,7 +21,7 @@ from typing import Annotated
from uuid import UUID
from cadwyn import VersionedAPIRouter
-from fastapi import HTTPException, Path, Query, Security, status
+from fastapi import HTTPException, Path, Security, status
from sqlalchemy.orm import Session
from airflow._shared.state import TaskScope
@@ -105,25 +105,7 @@ def delete_task_state_store(
def clear_task_state_store(
task_instance_id: UUID,
session: SessionDep,
- all_map_indices: Annotated[bool, Query()] = False,
) -> None:
- """
- Delete all task state store keys for this task instance.
-
- By default, only keys for the requesting TI's exact ``map_index`` are
- cleared — same isolation as DELETE endpoint above.
-
- Pass ``?all_map_indices=true`` to wipe state store for every mapped
sibling of
- the task in the same DAG run. This is intentionally fleet-wide: the
- ``ti:self`` JWT authentication scope authenticates that the caller is
- a legitimate member of the mapped task group, and grants it authority
- to reset shared task state store on behalf of the whole group.
- The SDK only forwards this flag when the user calls
``task_state_store.clear(all_map_indices=True)``
- explicitly, so the expanded scope is always an explicit opt-in by the task
author.
-
- For non-mapped tasks (``map_index=-1``), there is only ever one index, so
- ``?all_map_indices=true`` is functionally identical to the default and is
- accepted without error.
- """
+ """Delete all task state store keys for this task instance."""
scope = _get_task_scope_for_ti(task_instance_id, session)
- get_state_backend().clear(scope, all_map_indices=all_map_indices,
session=session)
+ get_state_backend().clear(scope, session=session)
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state_store.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state_store.py
index d15bf5dc1e0..f544de4f95e 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state_store.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state_store.py
@@ -296,25 +296,6 @@ class TestClearTaskState:
)
assert remaining_indices == [0, 1]
- def test_clear_with_all_map_indices_query_param_wipes_fleet(
- self, client: TestClient, create_task_instance: CreateTaskInstance
- ):
- """Clear with ?all_map_indices=true wipes state for every mapped
instance."""
- ti = create_task_instance(map_index=2)
- self._seed_fleet_rows(ti, (0, 1, 2))
-
- response = client.delete(_api_url(ti.id), params={"all_map_indices":
"true"})
-
- assert response.status_code == 204
- with create_session() as session:
- remaining = session.scalars(
- select(TaskStateStoreModel).where(
- TaskStateStoreModel.dag_id == ti.dag_id,
- TaskStateStoreModel.task_id == ti.task_id,
- )
- ).all()
- assert remaining == []
-
class TestTiSelfEnforcement:
@pytest.fixture
diff --git a/shared/state/src/airflow_shared/state/__init__.py
b/shared/state/src/airflow_shared/state/__init__.py
index c8c7549ffcd..e2c094d8481 100644
--- a/shared/state/src/airflow_shared/state/__init__.py
+++ b/shared/state/src/airflow_shared/state/__init__.py
@@ -37,9 +37,7 @@ class TaskScope:
``map_index`` defaults to ``-1`` for non-mapped tasks. For mapped tasks,
set it to the actual mapped index. ``get``/``set``/``delete`` always match
- on ``(dag_id, run_id, task_id, map_index)`` exactly. To wipe state across
- every map index of the task, call ``clear``/``aclear`` with
- ``all_map_indices=True``.
+ on ``(dag_id, run_id, task_id, map_index)`` exactly.
"""
dag_id: str
@@ -183,8 +181,9 @@ class BaseStoreBackend(ABC):
Must handle both ``TaskScope`` and ``AssetScope``.
For ``TaskScope``: by default, only keys for the exact ``map_index``
on the
- scope are cleared. Pass ``all_map_indices=True`` to drop the
``map_index``
- filter entirely and wipe state across every mapped instance of the
task.
+ scope are cleared. When ``all_map_indices=True``, the ``map_index``
filter is
+ dropped and state is wiped across every mapped instance — for use by
external
+ callers (UI, CLI) only, not from within a running task.
For ``AssetScope`` the flag has no effect.
"""
@@ -231,8 +230,10 @@ class BaseStoreBackend(ABC):
Async variant of clear. Must handle both ``TaskScope`` and
``AssetScope``.
For ``TaskScope``: by default, only keys for the exact ``map_index``
on the
- scope are cleared. Pass ``all_map_indices=True`` to wipe state across
every
- mapped instance of the task. For ``AssetScope`` the flag has no effect.
+ scope are cleared. When ``all_map_indices=True``, the ``map_index``
filter is
+ dropped and state is wiped across every mapped instance — for use by
external
+ callers (UI, CLI) only, not from within a running task.
+ For ``AssetScope`` the flag has no effect.
``session`` is optional. If provided, implementations should use it
directly.
If ``None``, implementations manage their own async session internally.
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index 66496d5cac9..037ddd7eb5f 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -743,10 +743,9 @@ class TaskStateStoreOperations:
self.client.delete(f"store/ti/{ti_id}/{key}")
return OKResponse(ok=True)
- def clear(self, ti_id: uuid.UUID, all_map_indices: bool = False) ->
OKResponse:
+ def clear(self, ti_id: uuid.UUID) -> OKResponse:
"""Clear all task store keys for a task instance via the API server."""
- params = {"all_map_indices": "true"} if all_map_indices else {}
- self.client.delete(f"store/ti/{ti_id}", params=params)
+ self.client.delete(f"store/ti/{ti_id}")
return OKResponse(ok=True)
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index a042d7a1e80..c70b3e9b518 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -943,7 +943,6 @@ class DeleteTaskStateStore(BaseModel):
class ClearTaskStateStore(BaseModel):
ti_id: UUID
- all_map_indices: bool = False
type: Literal["ClearTaskStateStore"] = "ClearTaskStateStore"
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 6753d710df4..8d14f0b63b2 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -636,23 +636,17 @@ class TaskStateStoreAccessor:
if backend is not None:
backend.delete(self._scope, key)
- def clear(self, all_map_indices: bool = False) -> None:
- """
- Delete all keys for this task instance.
-
- Pass ``all_map_indices=True`` to wipe state across every mapped
- instance of the task (fleet-wide reset). Defaults to clearing only
- this task instance's own state.
- """
+ def clear(self) -> None:
+ """Delete all keys for this task instance."""
from airflow.sdk.execution_time.comms import ClearTaskStateStore
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
# cleanup the DB ref first, if backend cleanup fails after this, the
ref is gone and
# deterministic keys are recoverable on next set().
- SUPERVISOR_COMMS.send(ClearTaskStateStore(ti_id=self._ti_id,
all_map_indices=all_map_indices))
+ SUPERVISOR_COMMS.send(ClearTaskStateStore(ti_id=self._ti_id))
backend = _get_worker_state_store_backend()
if backend is not None:
- backend.clear(self._scope, all_map_indices=all_map_indices)
+ backend.clear(self._scope)
def _clear_backend_only(self) -> None:
"""
diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
index 4807d9f53b3..733aab6603d 100644
--- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
+++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
@@ -487,11 +487,6 @@
"title": "Ti Id",
"type": "string"
},
- "all_map_indices": {
- "default": false,
- "title": "All Map Indices",
- "type": "boolean"
- },
"type": {
"const": "ClearTaskStateStore",
"default": "ClearTaskStateStore",
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 9f1c71d4164..76dfd2009ac 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1847,7 +1847,7 @@ class ActivitySubprocess(WatchedSubprocess):
self.client.task_state_store.delete(msg.ti_id, msg.key)
resp = OKResponse(ok=True)
elif isinstance(msg, ClearTaskStateStore):
- self.client.task_state_store.clear(msg.ti_id,
all_map_indices=msg.all_map_indices)
+ self.client.task_state_store.clear(msg.ti_id)
resp = OKResponse(ok=True)
elif isinstance(msg, GetAssetStateStoreByName):
asset_store = self.client.asset_state_store.get(msg.key,
name=msg.name)
diff --git a/task-sdk/tests/task_sdk/api/test_client.py
b/task-sdk/tests/task_sdk/api/test_client.py
index 0d6ee44628a..92b970f7ee5 100644
--- a/task-sdk/tests/task_sdk/api/test_client.py
+++ b/task-sdk/tests/task_sdk/api/test_client.py
@@ -1901,26 +1901,16 @@ class TestTaskStateOperations:
result = client.task_state_store.delete(ti_id=self.TI_ID, key="job_id")
assert result == OKResponse(ok=True)
- def test_clear_default_no_query_param(self):
+ def test_clear_sends_delete_request(self):
def handle_request(request: httpx.Request) -> httpx.Response:
assert request.method == "DELETE"
assert request.url.path == f"/store/ti/{self.TI_ID}"
- assert "all_map_indices" not in str(request.url.query)
return httpx.Response(status_code=204)
client = make_client(transport=httpx.MockTransport(handle_request))
result = client.task_state_store.clear(ti_id=self.TI_ID)
assert result == OKResponse(ok=True)
- def test_clear_all_map_indices_sends_query_param(self):
- def handle_request(request: httpx.Request) -> httpx.Response:
- assert "all_map_indices=true" in str(request.url.query)
- return httpx.Response(status_code=204)
-
- client = make_client(transport=httpx.MockTransport(handle_request))
- result = client.task_state_store.clear(ti_id=self.TI_ID,
all_map_indices=True)
- assert result == OKResponse(ok=True)
-
class TestAssetStateOperations:
def test_get_by_name_success(self):
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index 6f09489a88d..f82f4924fba 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -1278,23 +1278,12 @@ class TestTaskStateStoreAccessor:
DeleteTaskStateStore(ti_id=self.TI_ID, key="job_id")
)
- def test_clear_default_sends_all_map_indices_false(self,
mock_supervisor_comms):
+ def test_clear_sends_comms_message(self, mock_supervisor_comms):
mock_supervisor_comms.send.return_value = OKResponse(ok=True)
TaskStateStoreAccessor(ti_id=self.TI_ID, scope=self.SCOPE).clear()
- mock_supervisor_comms.send.assert_called_once_with(
- ClearTaskStateStore(ti_id=self.TI_ID, all_map_indices=False)
- )
-
- def test_clear_all_map_indices_sends_flag_true(self,
mock_supervisor_comms):
- mock_supervisor_comms.send.return_value = OKResponse(ok=True)
-
- TaskStateStoreAccessor(ti_id=self.TI_ID,
scope=self.SCOPE).clear(all_map_indices=True)
-
- mock_supervisor_comms.send.assert_called_once_with(
- ClearTaskStateStore(ti_id=self.TI_ID, all_map_indices=True)
- )
+
mock_supervisor_comms.send.assert_called_once_with(ClearTaskStateStore(ti_id=self.TI_ID))
def test_set_datetime_raises_validation_error(self, mock_supervisor_comms):
"""datetime is not JSON-serializable; callers must use .isoformat()
first."""
@@ -1788,9 +1777,7 @@ class TestTaskStateStoreAccessorWithCustomBackend:
assert "job_id" not in backend._actual_key_value_store
assert "checkpoint" not in backend._actual_key_value_store
- mock_supervisor_comms.send.assert_any_call(
- ClearTaskStateStore(ti_id=self.TI_ID, all_map_indices=False)
- )
+
mock_supervisor_comms.send.assert_any_call(ClearTaskStateStore(ti_id=self.TI_ID))
class TestAssetStateStoreAccessorWithCustomBackend:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 48932581954..6d350df4b58 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2872,18 +2872,7 @@ REQUEST_TEST_CASES = [
client_mock=ClientMock(
method_path="task_state_store.clear",
args=(TI_ID,),
- kwargs={"all_map_indices": False},
- response=OKResponse(ok=True),
- ),
- expected_body={"ok": True, "type": "OKResponse"},
- ),
- RequestTestCase(
- message=ClearTaskStateStore(ti_id=TI_ID, all_map_indices=True),
- test_id="clear_task_store_all_map_indices",
- client_mock=ClientMock(
- method_path="task_state_store.clear",
- args=(TI_ID,),
- kwargs={"all_map_indices": True},
+ kwargs={},
response=OKResponse(ok=True),
),
expected_body={"ok": True, "type": "OKResponse"},
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 4fd0692425b..6e7322429ff 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -5699,24 +5699,15 @@ class TestTaskInstanceStateOperations:
mock_supervisor_comms.send.assert_any_call(DeleteTaskStateStore(ti_id=runtime_ti.id,
key="job_id"))
- @pytest.mark.parametrize(
- ("call_kwargs", "expected_flag"),
- [
- pytest.param({}, False, id="default"),
- pytest.param({"all_map_indices": True}, True, id="fleet-wipe"),
- ],
- )
- def test_task_can_clear_state(self, call_kwargs, expected_flag,
create_runtime_ti, mock_supervisor_comms):
+ def test_task_can_clear_state(self, create_runtime_ti,
mock_supervisor_comms):
class MyOperator(BaseOperator):
def execute(self, context):
- context["task_state_store"].clear(**call_kwargs)
+ context["task_state_store"].clear()
task = MyOperator(task_id="t")
runtime_ti = create_runtime_ti(task=task)
run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())
- mock_supervisor_comms.send.assert_any_call(
- ClearTaskStateStore(ti_id=runtime_ti.id,
all_map_indices=expected_flag)
- )
+
mock_supervisor_comms.send.assert_any_call(ClearTaskStateStore(ti_id=runtime_ti.id))
@staticmethod
def _watcher_side_effect(msg=None, *args, **kwargs):