This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aa28f27f12d Remove all_map_indices from task_state_store.clear() in 
context (#68880)
aa28f27f12d is described below

commit aa28f27f12df228f6f1410a15d363699248eabda
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jun 23 14:27:39 2026 +0530

    Remove all_map_indices from task_state_store.clear() in context (#68880)
---
 .../docs/core-concepts/task-state-store.rst        | 19 ++++++-----------
 .../execution_api/routes/task_state_store.py       | 24 +++-------------------
 .../versions/head/test_task_state_store.py         | 19 -----------------
 shared/state/src/airflow_shared/state/__init__.py  | 15 +++++++-------
 task-sdk/src/airflow/sdk/api/client.py             |  5 ++---
 task-sdk/src/airflow/sdk/execution_time/comms.py   |  1 -
 task-sdk/src/airflow/sdk/execution_time/context.py | 14 ++++---------
 .../airflow/sdk/execution_time/schema/schema.json  |  5 -----
 .../src/airflow/sdk/execution_time/supervisor.py   |  2 +-
 task-sdk/tests/task_sdk/api/test_client.py         | 12 +----------
 .../tests/task_sdk/execution_time/test_context.py  | 19 +++--------------
 .../task_sdk/execution_time/test_supervisor.py     | 13 +-----------
 .../task_sdk/execution_time/test_task_runner.py    | 15 +++-----------
 13 files changed, 32 insertions(+), 131 deletions(-)

diff --git a/airflow-core/docs/core-concepts/task-state-store.rst 
b/airflow-core/docs/core-concepts/task-state-store.rst
index c1f5b6475e1..7702fb90f68 100644
--- a/airflow-core/docs/core-concepts/task-state-store.rst
+++ b/airflow-core/docs/core-concepts/task-state-store.rst
@@ -124,21 +124,15 @@ Deletes a single key. No-op if the key does not exist.
 
     task_state_store.delete("job_id")
 
-``clear(all_map_indices=False)``
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+``clear()``
+~~~~~~~~~~~
 
 Deletes *all* task state store keys for this task instance.
 
-For :doc:`mapped tasks </authoring-and-scheduling/dynamic-task-mapping>`, the 
default clears only the current map index. Pass ``all_map_indices=True`` to 
wipe the store across **every** mapped instance of the task (fleet-wide reset).
-
 .. code-block:: python
 
-    # clear only this map index
     task_state_store.clear()
 
-    # clear all map indices (fleet-wide)
-    task_state_store.clear(all_map_indices=True)
-
 
 Some Example Use Cases
 ----------------------
@@ -255,16 +249,15 @@ Once a task defers, the Triggerer handles continuity 
across poke cycles. Use tas
 Mapped tasks
 ------------
 
-When a task is dynamically mapped (``task.expand(...)``), each map index has 
its own task state store namespace. ``clear()`` without arguments clears the 
store only for the current index. ``clear(all_map_indices=True)`` wipes the 
store across every index of the task.
+When a task is dynamically mapped (``task.expand(...)``), each map index has 
its own task state store namespace. ``clear()`` clears only the current index's 
store.
+
+To wipe state across all map indices of a task, use the :doc:`Core API 
</administration-and-deployment/task-and-asset-state-store>` (e.g. via the UI 
or CLI) after the task group has finished.
 
 .. code-block:: python
 
-    # Inside a mapped task — clear only this index
+    # Inside a mapped task — clears only this index
     task_state_store.clear()
 
-    # Wipe store for all indices of this task
-    task_state_store.clear(all_map_indices=True)
-
 
 Automatic cleanup (``clear_on_success``)
 ----------------------------------------
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state_store.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state_store.py
index 722aef991a9..624926fe639 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state_store.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state_store.py
@@ -21,7 +21,7 @@ from typing import Annotated
 from uuid import UUID
 
 from cadwyn import VersionedAPIRouter
-from fastapi import HTTPException, Path, Query, Security, status
+from fastapi import HTTPException, Path, Security, status
 from sqlalchemy.orm import Session
 
 from airflow._shared.state import TaskScope
@@ -105,25 +105,7 @@ def delete_task_state_store(
 def clear_task_state_store(
     task_instance_id: UUID,
     session: SessionDep,
-    all_map_indices: Annotated[bool, Query()] = False,
 ) -> None:
-    """
-    Delete all task state store keys for this task instance.
-
-    By default, only keys for the requesting TI's exact ``map_index`` are
-    cleared — same isolation as DELETE endpoint above.
-
-    Pass ``?all_map_indices=true`` to wipe state store for every mapped 
sibling of
-    the task in the same DAG run.  This is intentionally fleet-wide: the
-    ``ti:self`` JWT authentication scope authenticates that the caller is
-    a legitimate member of the mapped task group, and grants it authority
-    to reset shared task state store on behalf of the whole group.
-    The SDK only forwards this flag when the user calls 
``task_state_store.clear(all_map_indices=True)``
-    explicitly, so the expanded scope is always an explicit opt-in by the task 
author.
-
-    For non-mapped tasks (``map_index=-1``), there is only ever one index, so
-    ``?all_map_indices=true`` is functionally identical to the default and is
-    accepted without error.
-    """
+    """Delete all task state store keys for this task instance."""
     scope = _get_task_scope_for_ti(task_instance_id, session)
-    get_state_backend().clear(scope, all_map_indices=all_map_indices, 
session=session)
+    get_state_backend().clear(scope, session=session)
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state_store.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state_store.py
index d15bf5dc1e0..f544de4f95e 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state_store.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_state_store.py
@@ -296,25 +296,6 @@ class TestClearTaskState:
             )
             assert remaining_indices == [0, 1]
 
-    def test_clear_with_all_map_indices_query_param_wipes_fleet(
-        self, client: TestClient, create_task_instance: CreateTaskInstance
-    ):
-        """Clear with ?all_map_indices=true wipes state for every mapped 
instance."""
-        ti = create_task_instance(map_index=2)
-        self._seed_fleet_rows(ti, (0, 1, 2))
-
-        response = client.delete(_api_url(ti.id), params={"all_map_indices": 
"true"})
-
-        assert response.status_code == 204
-        with create_session() as session:
-            remaining = session.scalars(
-                select(TaskStateStoreModel).where(
-                    TaskStateStoreModel.dag_id == ti.dag_id,
-                    TaskStateStoreModel.task_id == ti.task_id,
-                )
-            ).all()
-            assert remaining == []
-
 
 class TestTiSelfEnforcement:
     @pytest.fixture
diff --git a/shared/state/src/airflow_shared/state/__init__.py 
b/shared/state/src/airflow_shared/state/__init__.py
index c8c7549ffcd..e2c094d8481 100644
--- a/shared/state/src/airflow_shared/state/__init__.py
+++ b/shared/state/src/airflow_shared/state/__init__.py
@@ -37,9 +37,7 @@ class TaskScope:
 
     ``map_index`` defaults to ``-1`` for non-mapped tasks. For mapped tasks,
     set it to the actual mapped index. ``get``/``set``/``delete`` always match
-    on ``(dag_id, run_id, task_id, map_index)`` exactly. To wipe state across
-    every map index of the task, call ``clear``/``aclear`` with
-    ``all_map_indices=True``.
+    on ``(dag_id, run_id, task_id, map_index)`` exactly.
     """
 
     dag_id: str
@@ -183,8 +181,9 @@ class BaseStoreBackend(ABC):
         Must handle both ``TaskScope`` and ``AssetScope``.
 
         For ``TaskScope``: by default, only keys for the exact ``map_index`` 
on the
-        scope are cleared. Pass ``all_map_indices=True`` to drop the 
``map_index``
-        filter entirely and wipe state across every mapped instance of the 
task.
+        scope are cleared. When ``all_map_indices=True``, the ``map_index`` 
filter is
+        dropped and state is wiped across every mapped instance — for use by 
external
+        callers (UI, CLI) only, not from within a running task.
         For ``AssetScope`` the flag has no effect.
         """
 
@@ -231,8 +230,10 @@ class BaseStoreBackend(ABC):
         Async variant of clear. Must handle both ``TaskScope`` and 
``AssetScope``.
 
         For ``TaskScope``: by default, only keys for the exact ``map_index`` 
on the
-        scope are cleared. Pass ``all_map_indices=True`` to wipe state across 
every
-        mapped instance of the task. For ``AssetScope`` the flag has no effect.
+        scope are cleared. When ``all_map_indices=True``, the ``map_index`` 
filter is
+        dropped and state is wiped across every mapped instance — for use by 
external
+        callers (UI, CLI) only, not from within a running task.
+        For ``AssetScope`` the flag has no effect.
 
         ``session`` is optional. If provided, implementations should use it 
directly.
         If ``None``, implementations manage their own async session internally.
diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index 66496d5cac9..037ddd7eb5f 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -743,10 +743,9 @@ class TaskStateStoreOperations:
         self.client.delete(f"store/ti/{ti_id}/{key}")
         return OKResponse(ok=True)
 
-    def clear(self, ti_id: uuid.UUID, all_map_indices: bool = False) -> 
OKResponse:
+    def clear(self, ti_id: uuid.UUID) -> OKResponse:
         """Clear all task store keys for a task instance via the API server."""
-        params = {"all_map_indices": "true"} if all_map_indices else {}
-        self.client.delete(f"store/ti/{ti_id}", params=params)
+        self.client.delete(f"store/ti/{ti_id}")
         return OKResponse(ok=True)
 
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py 
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index a042d7a1e80..c70b3e9b518 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -943,7 +943,6 @@ class DeleteTaskStateStore(BaseModel):
 
 class ClearTaskStateStore(BaseModel):
     ti_id: UUID
-    all_map_indices: bool = False
     type: Literal["ClearTaskStateStore"] = "ClearTaskStateStore"
 
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py 
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 6753d710df4..8d14f0b63b2 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -636,23 +636,17 @@ class TaskStateStoreAccessor:
         if backend is not None:
             backend.delete(self._scope, key)
 
-    def clear(self, all_map_indices: bool = False) -> None:
-        """
-        Delete all keys for this task instance.
-
-        Pass ``all_map_indices=True`` to wipe state across every mapped
-        instance of the task (fleet-wide reset). Defaults to clearing only
-        this task instance's own state.
-        """
+    def clear(self) -> None:
+        """Delete all keys for this task instance."""
         from airflow.sdk.execution_time.comms import ClearTaskStateStore
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
         # cleanup the DB ref first, if backend cleanup fails after this, the 
ref is gone and
         # deterministic keys are recoverable on next set().
-        SUPERVISOR_COMMS.send(ClearTaskStateStore(ti_id=self._ti_id, 
all_map_indices=all_map_indices))
+        SUPERVISOR_COMMS.send(ClearTaskStateStore(ti_id=self._ti_id))
         backend = _get_worker_state_store_backend()
         if backend is not None:
-            backend.clear(self._scope, all_map_indices=all_map_indices)
+            backend.clear(self._scope)
 
     def _clear_backend_only(self) -> None:
         """
diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json 
b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
index 4807d9f53b3..733aab6603d 100644
--- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
+++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
@@ -487,11 +487,6 @@
           "title": "Ti Id",
           "type": "string"
         },
-        "all_map_indices": {
-          "default": false,
-          "title": "All Map Indices",
-          "type": "boolean"
-        },
         "type": {
           "const": "ClearTaskStateStore",
           "default": "ClearTaskStateStore",
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 9f1c71d4164..76dfd2009ac 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1847,7 +1847,7 @@ class ActivitySubprocess(WatchedSubprocess):
             self.client.task_state_store.delete(msg.ti_id, msg.key)
             resp = OKResponse(ok=True)
         elif isinstance(msg, ClearTaskStateStore):
-            self.client.task_state_store.clear(msg.ti_id, 
all_map_indices=msg.all_map_indices)
+            self.client.task_state_store.clear(msg.ti_id)
             resp = OKResponse(ok=True)
         elif isinstance(msg, GetAssetStateStoreByName):
             asset_store = self.client.asset_state_store.get(msg.key, 
name=msg.name)
diff --git a/task-sdk/tests/task_sdk/api/test_client.py 
b/task-sdk/tests/task_sdk/api/test_client.py
index 0d6ee44628a..92b970f7ee5 100644
--- a/task-sdk/tests/task_sdk/api/test_client.py
+++ b/task-sdk/tests/task_sdk/api/test_client.py
@@ -1901,26 +1901,16 @@ class TestTaskStateOperations:
         result = client.task_state_store.delete(ti_id=self.TI_ID, key="job_id")
         assert result == OKResponse(ok=True)
 
-    def test_clear_default_no_query_param(self):
+    def test_clear_sends_delete_request(self):
         def handle_request(request: httpx.Request) -> httpx.Response:
             assert request.method == "DELETE"
             assert request.url.path == f"/store/ti/{self.TI_ID}"
-            assert "all_map_indices" not in str(request.url.query)
             return httpx.Response(status_code=204)
 
         client = make_client(transport=httpx.MockTransport(handle_request))
         result = client.task_state_store.clear(ti_id=self.TI_ID)
         assert result == OKResponse(ok=True)
 
-    def test_clear_all_map_indices_sends_query_param(self):
-        def handle_request(request: httpx.Request) -> httpx.Response:
-            assert "all_map_indices=true" in str(request.url.query)
-            return httpx.Response(status_code=204)
-
-        client = make_client(transport=httpx.MockTransport(handle_request))
-        result = client.task_state_store.clear(ti_id=self.TI_ID, 
all_map_indices=True)
-        assert result == OKResponse(ok=True)
-
 
 class TestAssetStateOperations:
     def test_get_by_name_success(self):
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py 
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index 6f09489a88d..f82f4924fba 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -1278,23 +1278,12 @@ class TestTaskStateStoreAccessor:
             DeleteTaskStateStore(ti_id=self.TI_ID, key="job_id")
         )
 
-    def test_clear_default_sends_all_map_indices_false(self, 
mock_supervisor_comms):
+    def test_clear_sends_comms_message(self, mock_supervisor_comms):
         mock_supervisor_comms.send.return_value = OKResponse(ok=True)
 
         TaskStateStoreAccessor(ti_id=self.TI_ID, scope=self.SCOPE).clear()
 
-        mock_supervisor_comms.send.assert_called_once_with(
-            ClearTaskStateStore(ti_id=self.TI_ID, all_map_indices=False)
-        )
-
-    def test_clear_all_map_indices_sends_flag_true(self, 
mock_supervisor_comms):
-        mock_supervisor_comms.send.return_value = OKResponse(ok=True)
-
-        TaskStateStoreAccessor(ti_id=self.TI_ID, 
scope=self.SCOPE).clear(all_map_indices=True)
-
-        mock_supervisor_comms.send.assert_called_once_with(
-            ClearTaskStateStore(ti_id=self.TI_ID, all_map_indices=True)
-        )
+        
mock_supervisor_comms.send.assert_called_once_with(ClearTaskStateStore(ti_id=self.TI_ID))
 
     def test_set_datetime_raises_validation_error(self, mock_supervisor_comms):
         """datetime is not JSON-serializable; callers must use .isoformat() 
first."""
@@ -1788,9 +1777,7 @@ class TestTaskStateStoreAccessorWithCustomBackend:
 
         assert "job_id" not in backend._actual_key_value_store
         assert "checkpoint" not in backend._actual_key_value_store
-        mock_supervisor_comms.send.assert_any_call(
-            ClearTaskStateStore(ti_id=self.TI_ID, all_map_indices=False)
-        )
+        
mock_supervisor_comms.send.assert_any_call(ClearTaskStateStore(ti_id=self.TI_ID))
 
 
 class TestAssetStateStoreAccessorWithCustomBackend:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 48932581954..6d350df4b58 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2872,18 +2872,7 @@ REQUEST_TEST_CASES = [
         client_mock=ClientMock(
             method_path="task_state_store.clear",
             args=(TI_ID,),
-            kwargs={"all_map_indices": False},
-            response=OKResponse(ok=True),
-        ),
-        expected_body={"ok": True, "type": "OKResponse"},
-    ),
-    RequestTestCase(
-        message=ClearTaskStateStore(ti_id=TI_ID, all_map_indices=True),
-        test_id="clear_task_store_all_map_indices",
-        client_mock=ClientMock(
-            method_path="task_state_store.clear",
-            args=(TI_ID,),
-            kwargs={"all_map_indices": True},
+            kwargs={},
             response=OKResponse(ok=True),
         ),
         expected_body={"ok": True, "type": "OKResponse"},
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 4fd0692425b..6e7322429ff 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -5699,24 +5699,15 @@ class TestTaskInstanceStateOperations:
 
         
mock_supervisor_comms.send.assert_any_call(DeleteTaskStateStore(ti_id=runtime_ti.id,
 key="job_id"))
 
-    @pytest.mark.parametrize(
-        ("call_kwargs", "expected_flag"),
-        [
-            pytest.param({}, False, id="default"),
-            pytest.param({"all_map_indices": True}, True, id="fleet-wipe"),
-        ],
-    )
-    def test_task_can_clear_state(self, call_kwargs, expected_flag, 
create_runtime_ti, mock_supervisor_comms):
+    def test_task_can_clear_state(self, create_runtime_ti, 
mock_supervisor_comms):
         class MyOperator(BaseOperator):
             def execute(self, context):
-                context["task_state_store"].clear(**call_kwargs)
+                context["task_state_store"].clear()
 
         task = MyOperator(task_id="t")
         runtime_ti = create_runtime_ti(task=task)
         run(runtime_ti, context=runtime_ti.get_template_context(), 
log=mock.MagicMock())
-        mock_supervisor_comms.send.assert_any_call(
-            ClearTaskStateStore(ti_id=runtime_ti.id, 
all_map_indices=expected_flag)
-        )
+        
mock_supervisor_comms.send.assert_any_call(ClearTaskStateStore(ti_id=runtime_ti.id))
 
     @staticmethod
     def _watcher_side_effect(msg=None, *args, **kwargs):

Reply via email to