amoghrajesh commented on code in PR #67299: URL: https://github.com/apache/airflow/pull/67299#discussion_r3309616194
########## airflow-core/docs/core-concepts/task-and-asset-state.rst: ########## @@ -0,0 +1,80 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:state-overview: + +Task and Asset State Overview +======== + +.. versionadded:: 3.3 + +Airflow has always modeled tasks as stateless, idempotent units of work. A growing class of workloads, however, require a small amount of data to be persisted outside of a Task's return value, like a submitted job ID that must survive a worker crash, a watermark that advances run-by-run, or a row counter exposed for observability. Task state and Asset state fill that gap without touching the XCom or Variable systems. + +Task and Asset State +-------------------- + +Airflow 3.3 ships two persistent key/value stores, differentiated by *what* they are scoped to: + +.. list-table:: + :header-rows: 1 + :widths: 20 25 25 30 + + * - Store + - Scope + - Default lifetime + - Primary use case + * - **Task state** + - A single task Instance (dag_id + run_id + task_id + map_index) + - Configurable retention; cleared on task success when ``clear_on_success = True`` + - Survive retries, track in-flight jobs, checkpoint progress within a run + * - **Asset state** + - An asset (independent of any particular run) + - Persists indefinitely; removed only when the asset is deactivated + - Cross-run watermarks, incremental-load cursors, per-asset metadata + +Both stores accept string keys and string values. Values up to 64 KB are supported through the default metastore backend; larger payloads can be offloaded via a :ref:`custom worker-side backend <state-store:worker-backends>`. Review Comment: Values are `JsonValue` now, so we can probably call out as JSON strings instead. This will confuse users who try to store a dict or int. ########## airflow-core/docs/core-concepts/task-state.rst: ########## @@ -0,0 +1,261 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:task-state: + +Task State +========== + +.. versionadded:: 3.3 + +Task state is a persistent key/value store scoped to a single task instance (``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker crashes and task retries within the same Dag run, making it suitable for storing external job IDs, intra-task checkpoints, and progress metadata. + +Task state is accessed through the task context via ``context["task_state"]`` and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``. + + +Accessing task state +-------------------- + +Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, task state is available through the ``context`` dictionary via the ``task_state`` key. From there, it can be used to retrieve, set, delete, or clear task state for a specific key-value pair. In this example, the ``job_id`` is retrieved from task state, then upated. + +.. code-block:: python + from airflow.sdk import task + import random + + @task + def my_task(**context): + # Retrieve task_state from context + task_state = context["task_state"] + my_value = task_state.get("my_key") + + # Set the new value + new_value = f"It is {random.randint(1, 12 + 1)} o'clock" + task_state.set("my_key", new_value) + +Reference +------------- + +``get(key)`` +~~~~~~~~~~~~ + +Returns the stored string value, or ``None`` if the key does not exist. + +.. code-block:: python + + value = task_state.get("job_id") # returns str or None + +``set(key, value, *, retention=None)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Writes or overwrites a key. Note, ``value`` must be a string. Review Comment: Same JsonValue issue. We should say values can be any JSON-compatible type (str, int, float, bool, list, dict). ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. Review Comment: Broken cross-reference. `asset-state.rst` lives at `core-concepts/asset-state`, not `authoring-and-scheduling/asset-state`? ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. Review Comment: ```suggestion Number of rows deleted per batch during garbage collection cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. ``` ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task state and asset state values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the value to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference. See `Custom worker-side backends`_ below. + + +Garbage collection semantics +----------------------------- + +The GC job runs periodically and removes state rows according to the following rules: + +**Time-based expiry (task state only)** + Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. + +**``default_retention_days`` fallback (task state only)** + Keys written with no explicit ``retention`` (i.e. ``expires_at`` is ``NULL``) are governed by the global ``default_retention_days`` setting. When this setting is positive, the GC job treats such rows as expiring ``default_retention_days`` days after their last update. + +**``NEVER_EXPIRE`` keys** + Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = NULL`` and a flag that tells the GC to skip them unconditionally. They are never deleted by time-based cleanup, regardless of ``default_retention_days``. + +**Orphan sweep (asset state)** + Asset state rows for assets that no longer have an ``asset_active`` record are deleted during the orphan-sweep pass. This cleans up state for deactivated or renamed assets. + + +Custom backends +--------------- + +To replace the default metastore backend, subclass :class:`~airflow.sdk.state.BaseStateBackend` and implement all eight abstract methods. + +Abstract methods +~~~~~~~~~~~~~~~~ + +There are four synchronous methods and four async equivalents: + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Method + - Description + * - ``get(scope, key, *, session)`` + - Return the stored value, or ``None``. + * - ``set(scope, key, value, *, expires_at, session)`` + - Write or overwrite a key. ``expires_at`` is a UTC datetime or ``None`` + for non-expiring keys. + * - ``delete(scope, key, *, session)`` + - Delete a single key; no-op if absent. + * - ``clear(scope, *, all_map_indices, session)`` + - Delete all keys under the scope. + * - ``aget(scope, key, *, session)`` + - Async variant of ``get``. + * - ``aset(scope, key, value, *, expires_at, session)`` + - Async variant of ``set``. + * - ``adelete(scope, key, *, session)`` + - Async variant of ``delete``. + * - ``aclear(scope, *, all_map_indices, session)`` + - Async variant of ``clear``. + +Dispatching on scope type +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Each method receives a ``scope`` argument that is either a :class:`~airflow.sdk.state.TaskScope` or an :class:`~airflow.sdk.state.AssetScope`. Use a ``match`` statement to dispatch: + +.. code-block:: python + + from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope + + + class MyBackend(BaseStateBackend): + def get(self, scope, key, *, session=None): + if scope == TaskScope(): + return self._task_store.get(scope, key) + elif scope == AssetScope(): + return self._asset_store.get(scope, key) Review Comment: Should use isinstance checks. ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task state and asset state values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the value to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference. See `Custom worker-side backends`_ below. Review Comment: ```suggestion When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the returned value (a reference to the actual storage) to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference from the Execution API. See `Custom worker-side backends`_ below. ``` ########## airflow-core/docs/core-concepts/asset-state.rst: ########## @@ -0,0 +1,221 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:asset-state: + +Asset State +=========== + +.. versionadded:: 3.3 + +Asset state is a persistent key/value store scoped to an *asset*, independent of any particular DAG run. Unlike :doc:`task state </core-concepts/task-state>`, which is tied to a single task instance, asset state persists across runs and is logically owned by the asset itself. It is the natural home for cross-run metadata such as watermarks, incremental-load cursors, and per-asset configuration. + +Asset state is accessed through the task context via ``context["asset_state"]``, or via the ``AssetState`` Task SDK mechanism. + + +When is ``asset_state`` available? +------------------------------------ + +``context["asset_state"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. It is *not* available for :class:`~airflow.sdk.definitions.asset.AssetAlias` inlets. Aliases are resolved at runtime, but asset state accessors are only created for concrete assets declared directly on the task. A task must declare at least one concrete inlet or outlet for ``asset_state`` to contain any entries. + +Asset state is also available using ``AssetState``, provided by the Task SDK. This approach is more commonly used when building ``BaseEventTrigger``'s for things like asset-watching. + +.. warning:: + + **Outlets-only tasks**: if a task declares only ``outlets`` (no ``inlets``), ``context["asset_state"][my_asset]`` may raise a ``KeyError`` at runtime. The workaround is to declare the asset in **both** ``inlets`` and ``outlets``. + + .. code-block:: python + # my_asset defined above ... + + @task(inlets=[my_asset], outlets=[my_asset]) + def write_asset(**context): + context["asset_state"][my_asset].set("watermark", "2024-01-01") + + This known issue will be resolved in a future release. A workaround for this is to use the ``AssetState`` class, provided as part of the Task SDK. More details on that below! + + +Accessing asset state using ``context`` +--------------------------------------- + +An asset can be brought into "scope" (for lack of a better phrase) by including it in ``inlets`` (or both ``inlets`` and ``outlets``). Then subscript ``context["asset_state"]`` with the asset object to retrieve the asset state. + +.. code-block:: python + + from airflow.sdk import Asset, DAG, task + + my_asset = Asset("my_data", uri="s3://bucket/my_data") + + with DAG("example_asset_state", schedule=None): + + @task(inlets=[my_asset], outlets=[my_asset]) + def process(**context): + asset_state = context["asset_state"][my_asset] + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +Accessing asset state using ``AssetState`` +------------------------------------------ + +Asset state can also be retrieved for an asset using the ``airflow.sdk.AssetState`` class. This approach does NOT require that an asset is passed to a task using ``inlets``. + +.. code-block:: python + from airflow.sdk import DAG, task, AssetState + + with DAG("example_asset_state", schedule=None): + + @task() + def process(): + asset_state = AssetState(name="my_data") + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +In the example above, the ``name`` of the asset is used to retrieve it's state. However, the ``uri`` can also be used: + +.. code-block:: python + from airflow.sdk import DAG, task, AssetState + + with DAG("example_asset_state", schedule=None): + + @task() + def process(): + asset_state = AssetState(uri="s3://bucket/my_data") + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + Review Comment: Maybe remove this for now, you can revisit once the dependent PR lands. ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task state and asset state values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the value to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference. See `Custom worker-side backends`_ below. + + +Garbage collection semantics +----------------------------- + +The GC job runs periodically and removes state rows according to the following rules: + +**Time-based expiry (task state only)** + Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. + +**``default_retention_days`` fallback (task state only)** + Keys written with no explicit ``retention`` (i.e. ``expires_at`` is ``NULL``) are governed by the global ``default_retention_days`` setting. When this setting is positive, the GC job treats such rows as expiring ``default_retention_days`` days after their last update. + +**``NEVER_EXPIRE`` keys** + Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = NULL`` and a flag that tells the GC to skip them unconditionally. They are never deleted by time-based cleanup, regardless of ``default_retention_days``. + +**Orphan sweep (asset state)** + Asset state rows for assets that no longer have an ``asset_active`` record are deleted during the orphan-sweep pass. This cleans up state for deactivated or renamed assets. + + +Custom backends +--------------- + +To replace the default metastore backend, subclass :class:`~airflow.sdk.state.BaseStateBackend` and implement all eight abstract methods. + +Abstract methods +~~~~~~~~~~~~~~~~ + +There are four synchronous methods and four async equivalents: + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Method + - Description + * - ``get(scope, key, *, session)`` + - Return the stored value, or ``None``. + * - ``set(scope, key, value, *, expires_at, session)`` + - Write or overwrite a key. ``expires_at`` is a UTC datetime or ``None`` + for non-expiring keys. + * - ``delete(scope, key, *, session)`` + - Delete a single key; no-op if absent. + * - ``clear(scope, *, all_map_indices, session)`` + - Delete all keys under the scope. + * - ``aget(scope, key, *, session)`` + - Async variant of ``get``. + * - ``aset(scope, key, value, *, expires_at, session)`` + - Async variant of ``set``. + * - ``adelete(scope, key, *, session)`` + - Async variant of ``delete``. + * - ``aclear(scope, *, all_map_indices, session)`` + - Async variant of ``clear``. + +Dispatching on scope type +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Each method receives a ``scope`` argument that is either a :class:`~airflow.sdk.state.TaskScope` or an :class:`~airflow.sdk.state.AssetScope`. Use a ``match`` statement to dispatch: + +.. code-block:: python + + from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope + + + class MyBackend(BaseStateBackend): + def get(self, scope, key, *, session=None): + if scope == TaskScope(): + return self._task_store.get(scope, key) + elif scope == AssetScope(): + return self._asset_store.get(scope, key) + +:class:`~airflow.sdk.state.AssetScope` has three optional fields: ``asset_id`` (integer, server-side only), ``name``, and ``uri``. At least one must be set. Server-side operations (REST API calls) provide ``asset_id``. Worker-side operations provide ``name`` or ``uri`` (workers do not have access to the integer ``asset_id``). + +Configure the class via ``[state_store] backend``: + +.. code-block:: ini + + [state_store] + backend = mypackage.state.MyBackend + + +Custom worker-side backends +---------------------------- + +Worker-side backends extend ``BaseStateBackend`` with two pairs of serialization hooks. They are configured separately via ``[workers] state_backend`` and run *on the worker process*, not on the API server. This lets you store large payloads or credentialed data directly on worker infrastructure while only a compact reference string is kept in the database. Review Comment: ```suggestion Worker-side backends extend ``BaseStateBackend`` with two pairs of serialization hooks. They are configured separately via ``[workers] state_backend`` and run *on the worker process*, not on the API server. This lets you store large payloads or credentialed data directly using worker infrastructure while only a compact reference string is kept in the database. ``` Not sure if this is what you meant? ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task state and asset state values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the value to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference. See `Custom worker-side backends`_ below. + + +Garbage collection semantics +----------------------------- + +The GC job runs periodically and removes state rows according to the following rules: + +**Time-based expiry (task state only)** + Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. + +**``default_retention_days`` fallback (task state only)** + Keys written with no explicit ``retention`` (i.e. ``expires_at`` is ``NULL``) are governed by the global ``default_retention_days`` setting. When this setting is positive, the GC job treats such rows as expiring ``default_retention_days`` days after their last update. + +**``NEVER_EXPIRE`` keys** + Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = NULL`` and a flag that tells the GC to skip them unconditionally. They are never deleted by time-based cleanup, regardless of ``default_retention_days``. + +**Orphan sweep (asset state)** + Asset state rows for assets that no longer have an ``asset_active`` record are deleted during the orphan-sweep pass. This cleans up state for deactivated or renamed assets. + + +Custom backends +--------------- + +To replace the default metastore backend, subclass :class:`~airflow.sdk.state.BaseStateBackend` and implement all eight abstract methods. + +Abstract methods +~~~~~~~~~~~~~~~~ + +There are four synchronous methods and four async equivalents: + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Method + - Description + * - ``get(scope, key, *, session)`` + - Return the stored value, or ``None``. + * - ``set(scope, key, value, *, expires_at, session)`` + - Write or overwrite a key. ``expires_at`` is a UTC datetime or ``None`` + for non-expiring keys. + * - ``delete(scope, key, *, session)`` + - Delete a single key; no-op if absent. + * - ``clear(scope, *, all_map_indices, session)`` + - Delete all keys under the scope. + * - ``aget(scope, key, *, session)`` + - Async variant of ``get``. + * - ``aset(scope, key, value, *, expires_at, session)`` + - Async variant of ``set``. + * - ``adelete(scope, key, *, session)`` + - Async variant of ``delete``. + * - ``aclear(scope, *, all_map_indices, session)`` + - Async variant of ``clear``. + +Dispatching on scope type +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Each method receives a ``scope`` argument that is either a :class:`~airflow.sdk.state.TaskScope` or an :class:`~airflow.sdk.state.AssetScope`. Use a ``match`` statement to dispatch: + +.. code-block:: python + + from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope + + + class MyBackend(BaseStateBackend): + def get(self, scope, key, *, session=None): + if scope == TaskScope(): + return self._task_store.get(scope, key) + elif scope == AssetScope(): + return self._asset_store.get(scope, key) + +:class:`~airflow.sdk.state.AssetScope` has three optional fields: ``asset_id`` (integer, server-side only), ``name``, and ``uri``. At least one must be set. Server-side operations (REST API calls) provide ``asset_id``. Worker-side operations provide ``name`` or ``uri`` (workers do not have access to the integer ``asset_id``). + +Configure the class via ``[state_store] backend``: + +.. code-block:: ini + + [state_store] + backend = mypackage.state.MyBackend + + +Custom worker-side backends +---------------------------- + +Worker-side backends extend ``BaseStateBackend`` with two pairs of serialization hooks. They are configured separately via ``[workers] state_backend`` and run *on the worker process*, not on the API server. This lets you store large payloads or credentialed data directly on worker infrastructure while only a compact reference string is kept in the database. + +Override these four methods: + +``serialize_task_state_to_ref(*, value, key, ti_id)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``TaskStateAccessor.set()`` before sending the value to the Execution API. Return a reference string (e.g. an S3 key) that will be stored in the database instead of the raw value. + +``deserialize_task_state_from_ref(stored)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``TaskStateAccessor.get()`` after retrieving the reference string from the Execution API. Return the actual value. + +``serialize_asset_state_to_ref(*, value, key, asset_ref)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Same as the task variant, but for asset state. ``asset_ref`` is the asset name or URI, depending on how the accessor was constructed. + +``deserialize_asset_state_from_ref(stored)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``AssetStateAccessor.get()`` to resolve the stored reference back to the actual value. + +.. important:: + + **References must be deterministic.** Given the same inputs (``ti_id`` + ``key`` for task state; ``asset_ref`` + ``key`` for asset state), the serialization method must always return the same reference string. Do not embed timestamps, random UUIDs, or any other non-deterministic component in the reference path. + + When a key is deleted or cleared, Airflow clears the database reference *first*, then calls the backend's ``delete()`` or ``clear()`` method. If backend cleanup fails after the DB row is gone, the external object is orphaned. Because the reference is deterministic, a subsequent ``set()`` for the same key will overwrite the orphaned object, making the failure recoverable. A non-deterministic reference would leave the external object permanently orphaned with no way to locate it. + +Example skeleton: + +.. code-block:: python + + from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope + + + class S3StateBackend(BaseStateBackend): + + def _task_ref(self, ti_id: str, key: str) -> str: + return f"airflow/task-state/{ti_id}/{key}" + + def _asset_ref(self, asset_ref: str, key: str) -> str: + import hashlib + + safe = hashlib.sha256(asset_ref.encode()).hexdigest()[:16] + return f"airflow/asset-state/{safe}/{key}" + + def serialize_task_state_to_ref(self, *, value, key, ti_id): + s3_key = self._task_ref(ti_id, key) + s3_client.put_object(Bucket=BUCKET, Key=s3_key, Body=value.encode()) + return s3_key + + def deserialize_task_state_from_ref(self, stored): + s3_object = s3_client.get_object(Bucket=BUCKET, Key=stored) + return s3_object["Body"].read().decode() Review Comment: ```suggestion def deserialize_task_state_from_ref(self, stored): s3_object = s3_client.get_object(Bucket=BUCKET, Key=stored) return json.loads(s3_object["Body"].read().decode()) ``` ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task state and asset state values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the value to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference. See `Custom worker-side backends`_ below. + + +Garbage collection semantics +----------------------------- + +The GC job runs periodically and removes state rows according to the following rules: + +**Time-based expiry (task state only)** + Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. + +**``default_retention_days`` fallback (task state only)** + Keys written with no explicit ``retention`` (i.e. ``expires_at`` is ``NULL``) are governed by the global ``default_retention_days`` setting. When this setting is positive, the GC job treats such rows as expiring ``default_retention_days`` days after their last update. + +**``NEVER_EXPIRE`` keys** + Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = NULL`` and a flag that tells the GC to skip them unconditionally. They are never deleted by time-based cleanup, regardless of ``default_retention_days``. + +**Orphan sweep (asset state)** + Asset state rows for assets that no longer have an ``asset_active`` record are deleted during the orphan-sweep pass. This cleans up state for deactivated or renamed assets. + + +Custom backends +--------------- + +To replace the default metastore backend, subclass :class:`~airflow.sdk.state.BaseStateBackend` and implement all eight abstract methods. + +Abstract methods +~~~~~~~~~~~~~~~~ + +There are four synchronous methods and four async equivalents: + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Method + - Description + * - ``get(scope, key, *, session)`` + - Return the stored value, or ``None``. + * - ``set(scope, key, value, *, expires_at, session)`` + - Write or overwrite a key. ``expires_at`` is a UTC datetime or ``None`` + for non-expiring keys. + * - ``delete(scope, key, *, session)`` + - Delete a single key; no-op if absent. + * - ``clear(scope, *, all_map_indices, session)`` + - Delete all keys under the scope. + * - ``aget(scope, key, *, session)`` + - Async variant of ``get``. + * - ``aset(scope, key, value, *, expires_at, session)`` + - Async variant of ``set``. + * - ``adelete(scope, key, *, session)`` + - Async variant of ``delete``. + * - ``aclear(scope, *, all_map_indices, session)`` + - Async variant of ``clear``. + +Dispatching on scope type +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Each method receives a ``scope`` argument that is either a :class:`~airflow.sdk.state.TaskScope` or an :class:`~airflow.sdk.state.AssetScope`. Use a ``match`` statement to dispatch: + +.. code-block:: python + + from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope + + + class MyBackend(BaseStateBackend): + def get(self, scope, key, *, session=None): + if scope == TaskScope(): + return self._task_store.get(scope, key) + elif scope == AssetScope(): + return self._asset_store.get(scope, key) + +:class:`~airflow.sdk.state.AssetScope` has three optional fields: ``asset_id`` (integer, server-side only), ``name``, and ``uri``. At least one must be set. Server-side operations (REST API calls) provide ``asset_id``. Worker-side operations provide ``name`` or ``uri`` (workers do not have access to the integer ``asset_id``). + +Configure the class via ``[state_store] backend``: + +.. code-block:: ini + + [state_store] + backend = mypackage.state.MyBackend + + +Custom worker-side backends +---------------------------- + +Worker-side backends extend ``BaseStateBackend`` with two pairs of serialization hooks. They are configured separately via ``[workers] state_backend`` and run *on the worker process*, not on the API server. This lets you store large payloads or credentialed data directly on worker infrastructure while only a compact reference string is kept in the database. + +Override these four methods: + +``serialize_task_state_to_ref(*, value, key, ti_id)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``TaskStateAccessor.set()`` before sending the value to the Execution API. Return a reference string (e.g. an S3 key) that will be stored in the database instead of the raw value. + +``deserialize_task_state_from_ref(stored)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``TaskStateAccessor.get()`` after retrieving the reference string from the Execution API. Return the actual value. + +``serialize_asset_state_to_ref(*, value, key, asset_ref)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Same as the task variant, but for asset state. ``asset_ref`` is the asset name or URI, depending on how the accessor was constructed. + +``deserialize_asset_state_from_ref(stored)`` Review Comment: True signatures pls. ########## airflow-core/docs/core-concepts/asset-state.rst: ########## @@ -0,0 +1,221 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:asset-state: + +Asset State +=========== + +.. versionadded:: 3.3 + +Asset state is a persistent key/value store scoped to an *asset*, independent of any particular DAG run. Unlike :doc:`task state </core-concepts/task-state>`, which is tied to a single task instance, asset state persists across runs and is logically owned by the asset itself. It is the natural home for cross-run metadata such as watermarks, incremental-load cursors, and per-asset configuration. + +Asset state is accessed through the task context via ``context["asset_state"]``, or via the ``AssetState`` Task SDK mechanism. + + +When is ``asset_state`` available? +------------------------------------ + +``context["asset_state"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. It is *not* available for :class:`~airflow.sdk.definitions.asset.AssetAlias` inlets. Aliases are resolved at runtime, but asset state accessors are only created for concrete assets declared directly on the task. A task must declare at least one concrete inlet or outlet for ``asset_state`` to contain any entries. + +Asset state is also available using ``AssetState``, provided by the Task SDK. This approach is more commonly used when building ``BaseEventTrigger``'s for things like asset-watching. + +.. warning:: + + **Outlets-only tasks**: if a task declares only ``outlets`` (no ``inlets``), ``context["asset_state"][my_asset]`` may raise a ``KeyError`` at runtime. The workaround is to declare the asset in **both** ``inlets`` and ``outlets``. + + .. code-block:: python + # my_asset defined above ... + + @task(inlets=[my_asset], outlets=[my_asset]) + def write_asset(**context): + context["asset_state"][my_asset].set("watermark", "2024-01-01") + + This known issue will be resolved in a future release. A workaround for this is to use the ``AssetState`` class, provided as part of the Task SDK. More details on that below! Review Comment: Guess we should just fix this instead ########## airflow-core/docs/core-concepts/task-state.rst: ########## @@ -0,0 +1,261 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:task-state: + +Task State +========== + +.. versionadded:: 3.3 + +Task state is a persistent key/value store scoped to a single task instance (``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker crashes and task retries within the same Dag run, making it suitable for storing external job IDs, intra-task checkpoints, and progress metadata. + +Task state is accessed through the task context via ``context["task_state"]`` and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``. + + +Accessing task state +-------------------- + +Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, task state is available through the ``context`` dictionary via the ``task_state`` key. From there, it can be used to retrieve, set, delete, or clear task state for a specific key-value pair. In this example, the ``job_id`` is retrieved from task state, then upated. + +.. code-block:: python + from airflow.sdk import task + import random + + @task + def my_task(**context): + # Retrieve task_state from context + task_state = context["task_state"] + my_value = task_state.get("my_key") + + # Set the new value + new_value = f"It is {random.randint(1, 12 + 1)} o'clock" + task_state.set("my_key", new_value) + +Reference +------------- + +``get(key)`` +~~~~~~~~~~~~ + +Returns the stored string value, or ``None`` if the key does not exist. + +.. code-block:: python + + value = task_state.get("job_id") # returns str or None + +``set(key, value, *, retention=None)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Writes or overwrites a key. Note, ``value`` must be a string. + +The optional ``retention`` argument controls when the key expires: + +* ``timedelta(...)``: expire after the given duration from the time of the write (e.g. ``timedelta(hours=6)``). The expiry timestamp is computed on the worker in UTC before the value is sent to the API server. +* ``NEVER_EXPIRE``: the key never expires and is skipped by garbage collection, regardless of the global ``[state_store] default_retention_days`` setting. +* ``None`` (default): fall back to the global ``[state_store] default_retention_days`` config. + +.. important:: + + ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain integer number of days. Passing an integer raises a ``TypeError``. + + .. code-block:: python + + # correct + task_state.set("key", "val", retention=timedelta(days=7)) + + # wrong — raises TypeError + task_state.set("key", "val", retention=7) + +``NEVER_EXPIRE`` sentinel +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``: + +.. code-block:: python + + from airflow.sdk.execution_time.context import NEVER_EXPIRE + + task_state.set("job_id", job_id, retention=NEVER_EXPIRE) + +``delete(key)`` +~~~~~~~~~~~~~~~ + +Deletes a single key. No-op if the key does not exist. + +.. code-block:: python + + task_state.delete("job_id") + +``clear(all_map_indices=False)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Deletes *all* state keys for this task instance. + +For :doc:`mapped tasks <dynamic-task-mapping>`, the default clears only the current map index. Pass ``all_map_indices=True`` to wipe state across **every** mapped instance of the task (fleet-wide reset). + +.. code-block:: python + + # clear only this map index + task_state.clear() + + # clear all map indices (fleet-wide) + task_state.clear(all_map_indices=True) + + +Use Cases +--------- + +External job resumption +~~~~~~~~~~~~~~~~~~~~~~~ + +A common pattern for long-running external jobs: check whether a job ID is already stored before submitting, and use ``NEVER_EXPIRE`` so the key outlives +the default retention window. + +.. code-block:: python + + from datetime import timedelta + + from airflow.sdk import DAG, task + from airflow.sdk.execution_time.context import NEVER_EXPIRE + + + with DAG("spark_job_dag", schedule=None): + + @task + def run_spark_job(**context): + task_state = context["task_state"] + + # Check for an already-submitted job from a previous attempt. + job_id = task_state.get("job_id") + if job_id is None: + job_id = spark_client.submit_job(...) + # Store with NEVER_EXPIRE so the key is not garbage-collected before the job finishes + task_state.set("job_id", str(job_id), retention=NEVER_EXPIRE) + + # Reattach to the job and wait for completion. + result = spark_client.wait_for_completion(job_id) + return result + +On a retry, the task finds the stored ``job_id`` and reattaches instead of submitting a duplicate job. + +Intra-task checkpointing +~~~~~~~~~~~~~~~~~~~~~~~~ + +For tasks that process paginated or batched data, store the last-completed offset so a retry can resume mid-stream rather than restarting from the beginning. + +.. code-block:: python + + from airflow.sdk import DAG, task + + + with DAG("paginated_ingest", schedule="@daily"): + + @task + def ingest_pages(**context): + task_state = context["task_state"] + + raw = task_state.get("last_page") + start_page = int(raw) + 1 if raw is not None else 1 + + for page in range(start_page, total_pages + 1): + fetch_and_load(page) + task_state.set("last_page", str(page)) + +On a retry, the task reads ``last_page`` and skips pages that were already processed. + +Progress metadata +~~~~~~~~~~~~~~~~~ + +Task state can expose in-progress metrics for observability — row counts, status strings, or lightweight JSON payloads — without requiring XCom or an external system. + +.. code-block:: python + + import json + + from airflow.sdk import DAG, task + + + with DAG("row_ingest", schedule="@hourly"): + + @task + def ingest_rows(**context): + task_state = context["task_state"] + total = 0 + + for batch in get_batches(): + load(batch) + total += len(batch) + task_state.set( + "progress", + json.dumps({ + "rows_loaded": total, + "status": "running" + }), + ) + + task_state.set( + "progress", + json.dumps({ + "rows_loaded": total, + "status": "done" + }), Review Comment: No need to do json.dumps anymore :) ########## airflow-core/docs/core-concepts/asset-state.rst: ########## @@ -0,0 +1,221 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:asset-state: + +Asset State +=========== + +.. versionadded:: 3.3 + +Asset state is a persistent key/value store scoped to an *asset*, independent of any particular DAG run. Unlike :doc:`task state </core-concepts/task-state>`, which is tied to a single task instance, asset state persists across runs and is logically owned by the asset itself. It is the natural home for cross-run metadata such as watermarks, incremental-load cursors, and per-asset configuration. + +Asset state is accessed through the task context via ``context["asset_state"]``, or via the ``AssetState`` Task SDK mechanism. + + +When is ``asset_state`` available? +------------------------------------ + +``context["asset_state"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. It is *not* available for :class:`~airflow.sdk.definitions.asset.AssetAlias` inlets. Aliases are resolved at runtime, but asset state accessors are only created for concrete assets declared directly on the task. A task must declare at least one concrete inlet or outlet for ``asset_state`` to contain any entries. + +Asset state is also available using ``AssetState``, provided by the Task SDK. This approach is more commonly used when building ``BaseEventTrigger``'s for things like asset-watching. + +.. warning:: + + **Outlets-only tasks**: if a task declares only ``outlets`` (no ``inlets``), ``context["asset_state"][my_asset]`` may raise a ``KeyError`` at runtime. The workaround is to declare the asset in **both** ``inlets`` and ``outlets``. + + .. code-block:: python + # my_asset defined above ... + + @task(inlets=[my_asset], outlets=[my_asset]) + def write_asset(**context): + context["asset_state"][my_asset].set("watermark", "2024-01-01") + + This known issue will be resolved in a future release. A workaround for this is to use the ``AssetState`` class, provided as part of the Task SDK. More details on that below! + + +Accessing asset state using ``context`` +--------------------------------------- + +An asset can be brought into "scope" (for lack of a better phrase) by including it in ``inlets`` (or both ``inlets`` and ``outlets``). Then subscript ``context["asset_state"]`` with the asset object to retrieve the asset state. + +.. code-block:: python + + from airflow.sdk import Asset, DAG, task + + my_asset = Asset("my_data", uri="s3://bucket/my_data") + + with DAG("example_asset_state", schedule=None): + + @task(inlets=[my_asset], outlets=[my_asset]) + def process(**context): + asset_state = context["asset_state"][my_asset] + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +Accessing asset state using ``AssetState`` +------------------------------------------ + +Asset state can also be retrieved for an asset using the ``airflow.sdk.AssetState`` class. This approach does NOT require that an asset is passed to a task using ``inlets``. + +.. code-block:: python + from airflow.sdk import DAG, task, AssetState + + with DAG("example_asset_state", schedule=None): + + @task() + def process(): + asset_state = AssetState(name="my_data") + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +In the example above, the ``name`` of the asset is used to retrieve it's state. However, the ``uri`` can also be used: + +.. code-block:: python + from airflow.sdk import DAG, task, AssetState + + with DAG("example_asset_state", schedule=None): + + @task() + def process(): + asset_state = AssetState(uri="s3://bucket/my_data") + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +Single-inlet shorthand +~~~~~~~~~~~~~~~~~~~~~~~ + +For tasks with exactly **one** concrete inlet, you can call ``get``, ``set``, ``delete``, and ``clear`` directly on ``context["asset_state"]`` without subscripting. + +.. code-block:: python + + @task(inlets=[my_asset], outlets=[my_asset]) + def process_single(**context): + asset_state = context["asset_state"] + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +If the task has more than one concrete inlet, calling the shorthand raises a ``ValueError``. Use the subscript form (``context["asset_state"][my_asset]``) whenever a task has multiple inlets. + + +API reference +------------- + +The following methods are available on both the per-asset accessor (``context["asset_state"][my_asset]``), the shorthand (``context["asset_state"]``) when the task has exactly one inlet, and when using the ``AssetState`` Task SDK class. + +``get(key)`` +~~~~~~~~~~~~ + +Returns the stored string value, or ``None`` if the key does not exist. Review Comment: Is returns a JsonValue - so maybe mention `Json` string? ########## airflow-core/docs/core-concepts/asset-state.rst: ########## @@ -0,0 +1,221 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:asset-state: + +Asset State +=========== + +.. versionadded:: 3.3 + +Asset state is a persistent key/value store scoped to an *asset*, independent of any particular DAG run. Unlike :doc:`task state </core-concepts/task-state>`, which is tied to a single task instance, asset state persists across runs and is logically owned by the asset itself. It is the natural home for cross-run metadata such as watermarks, incremental-load cursors, and per-asset configuration. + +Asset state is accessed through the task context via ``context["asset_state"]``, or via the ``AssetState`` Task SDK mechanism. + + +When is ``asset_state`` available? +------------------------------------ + +``context["asset_state"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. It is *not* available for :class:`~airflow.sdk.definitions.asset.AssetAlias` inlets. Aliases are resolved at runtime, but asset state accessors are only created for concrete assets declared directly on the task. A task must declare at least one concrete inlet or outlet for ``asset_state`` to contain any entries. + +Asset state is also available using ``AssetState``, provided by the Task SDK. This approach is more commonly used when building ``BaseEventTrigger``'s for things like asset-watching. + +.. warning:: + + **Outlets-only tasks**: if a task declares only ``outlets`` (no ``inlets``), ``context["asset_state"][my_asset]`` may raise a ``KeyError`` at runtime. The workaround is to declare the asset in **both** ``inlets`` and ``outlets``. + + .. code-block:: python + # my_asset defined above ... + + @task(inlets=[my_asset], outlets=[my_asset]) + def write_asset(**context): + context["asset_state"][my_asset].set("watermark", "2024-01-01") + + This known issue will be resolved in a future release. A workaround for this is to use the ``AssetState`` class, provided as part of the Task SDK. More details on that below! + + +Accessing asset state using ``context`` +--------------------------------------- + +An asset can be brought into "scope" (for lack of a better phrase) by including it in ``inlets`` (or both ``inlets`` and ``outlets``). Then subscript ``context["asset_state"]`` with the asset object to retrieve the asset state. + +.. code-block:: python + + from airflow.sdk import Asset, DAG, task + + my_asset = Asset("my_data", uri="s3://bucket/my_data") + + with DAG("example_asset_state", schedule=None): + + @task(inlets=[my_asset], outlets=[my_asset]) + def process(**context): + asset_state = context["asset_state"][my_asset] + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +Accessing asset state using ``AssetState`` +------------------------------------------ + +Asset state can also be retrieved for an asset using the ``airflow.sdk.AssetState`` class. This approach does NOT require that an asset is passed to a task using ``inlets``. + +.. code-block:: python + from airflow.sdk import DAG, task, AssetState + + with DAG("example_asset_state", schedule=None): + + @task() + def process(): + asset_state = AssetState(name="my_data") + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +In the example above, the ``name`` of the asset is used to retrieve it's state. However, the ``uri`` can also be used: + +.. code-block:: python + from airflow.sdk import DAG, task, AssetState + + with DAG("example_asset_state", schedule=None): + + @task() + def process(): + asset_state = AssetState(uri="s3://bucket/my_data") + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +Single-inlet shorthand +~~~~~~~~~~~~~~~~~~~~~~~ + +For tasks with exactly **one** concrete inlet, you can call ``get``, ``set``, ``delete``, and ``clear`` directly on ``context["asset_state"]`` without subscripting. + +.. code-block:: python + + @task(inlets=[my_asset], outlets=[my_asset]) + def process_single(**context): + asset_state = context["asset_state"] + watermark = asset_state.get("watermark") + asset_state.set("watermark", "2024-06-01") + +If the task has more than one concrete inlet, calling the shorthand raises a ``ValueError``. Use the subscript form (``context["asset_state"][my_asset]``) whenever a task has multiple inlets. + + +API reference +------------- + +The following methods are available on both the per-asset accessor (``context["asset_state"][my_asset]``), the shorthand (``context["asset_state"]``) when the task has exactly one inlet, and when using the ``AssetState`` Task SDK class. + +``get(key)`` +~~~~~~~~~~~~ + +Returns the stored string value, or ``None`` if the key does not exist. + +.. code-block:: python + + # Using context + watermark = context["asset_state"][my_asset].get("watermark") + + # Using the Task SDK + AssetState(name="my_data").get("watermark") + +``set(key, value)`` +~~~~~~~~~~~~~~~~~~~ + +Writes or overwrites a key-value pair. Unlike Task state, asset state has no ``retention`` parameter. Values persist until explicitly deleted or until the asset is deactivated. + +.. code-block:: python + + # Using context + context["asset_state"][my_asset].set("watermark", "2024-06-01T00:00:00Z") + + # Using the Task SDK class + AssetState(name="my_data").set("watermark", "2024-06-01T00:00:00Z") + +``delete(key)`` +~~~~~~~~~~~~~~~ + +Deletes a single key. No-op if the key does not exist. + +.. code-block:: python + + # Using context + context["asset_state"][my_asset].delete("watermark") + + # Using the Task SDK class + AssetState(name="my_data").delete("watermark") + +``clear()`` +~~~~~~~~~~~ + +Deletes *all* state keys for the asset. + +.. code-block:: python + + # Using context + context["asset_state"][my_asset].clear() + + # Using the Task SDK class + AssetState(name="my_data").clear() + +Use cases +--------- + +Watermark pattern +~~~~~~~~~~~~~~~~~ + +The canonical use case for asset state is an incremental-load task that advances a watermark on each run. The watermark is stored on the asset itself so any task that reads or writes that asset can access it. *This use case is especially applicable when building things like asset "watchers" using ``BaseEventTrigger``'s. Review Comment: ```suggestion The canonical use case for asset state is an incremental-load task that advances a watermark on each run. The watermark is stored on the asset itself so any task that reads or writes that asset can access it. This use case is especially applicable when building things like asset "watchers" using ``BaseEventTrigger``'s. ``` Formatting incident? ########## airflow-core/docs/core-concepts/task-state.rst: ########## @@ -0,0 +1,261 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:task-state: + +Task State +========== + +.. versionadded:: 3.3 + +Task state is a persistent key/value store scoped to a single task instance (``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker crashes and task retries within the same Dag run, making it suitable for storing external job IDs, intra-task checkpoints, and progress metadata. + +Task state is accessed through the task context via ``context["task_state"]`` and exposes four methods: ``get``, ``set``, ``delete``, and ``clear``. + + +Accessing task state +-------------------- + +Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, task state is available through the ``context`` dictionary via the ``task_state`` key. From there, it can be used to retrieve, set, delete, or clear task state for a specific key-value pair. In this example, the ``job_id`` is retrieved from task state, then upated. + +.. code-block:: python + from airflow.sdk import task + import random + + @task + def my_task(**context): + # Retrieve task_state from context + task_state = context["task_state"] + my_value = task_state.get("my_key") + + # Set the new value + new_value = f"It is {random.randint(1, 12 + 1)} o'clock" + task_state.set("my_key", new_value) + +Reference +------------- + +``get(key)`` +~~~~~~~~~~~~ + +Returns the stored string value, or ``None`` if the key does not exist. + +.. code-block:: python + + value = task_state.get("job_id") # returns str or None + +``set(key, value, *, retention=None)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Writes or overwrites a key. Note, ``value`` must be a string. + +The optional ``retention`` argument controls when the key expires: + +* ``timedelta(...)``: expire after the given duration from the time of the write (e.g. ``timedelta(hours=6)``). The expiry timestamp is computed on the worker in UTC before the value is sent to the API server. +* ``NEVER_EXPIRE``: the key never expires and is skipped by garbage collection, regardless of the global ``[state_store] default_retention_days`` setting. +* ``None`` (default): fall back to the global ``[state_store] default_retention_days`` config. + +.. important:: + + ``retention`` accepts only a :class:`~datetime.timedelta`, not a plain integer number of days. Passing an integer raises a ``TypeError``. + + .. code-block:: python + + # correct + task_state.set("key", "val", retention=timedelta(days=7)) + + # wrong — raises TypeError + task_state.set("key", "val", retention=7) + +``NEVER_EXPIRE`` sentinel +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Import ``NEVER_EXPIRE`` from ``airflow.sdk.execution_time.context``: + +.. code-block:: python + + from airflow.sdk.execution_time.context import NEVER_EXPIRE + + task_state.set("job_id", job_id, retention=NEVER_EXPIRE) + +``delete(key)`` +~~~~~~~~~~~~~~~ + +Deletes a single key. No-op if the key does not exist. + +.. code-block:: python + + task_state.delete("job_id") + +``clear(all_map_indices=False)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Deletes *all* state keys for this task instance. + +For :doc:`mapped tasks <dynamic-task-mapping>`, the default clears only the current map index. Pass ``all_map_indices=True`` to wipe state across **every** mapped instance of the task (fleet-wide reset). + +.. code-block:: python + + # clear only this map index + task_state.clear() + + # clear all map indices (fleet-wide) + task_state.clear(all_map_indices=True) + + +Use Cases +--------- + +External job resumption +~~~~~~~~~~~~~~~~~~~~~~~ + +A common pattern for long-running external jobs: check whether a job ID is already stored before submitting, and use ``NEVER_EXPIRE`` so the key outlives +the default retention window. + +.. code-block:: python + + from datetime import timedelta + + from airflow.sdk import DAG, task + from airflow.sdk.execution_time.context import NEVER_EXPIRE + + + with DAG("spark_job_dag", schedule=None): + + @task + def run_spark_job(**context): + task_state = context["task_state"] + + # Check for an already-submitted job from a previous attempt. + job_id = task_state.get("job_id") + if job_id is None: + job_id = spark_client.submit_job(...) + # Store with NEVER_EXPIRE so the key is not garbage-collected before the job finishes + task_state.set("job_id", str(job_id), retention=NEVER_EXPIRE) + + # Reattach to the job and wait for completion. + result = spark_client.wait_for_completion(job_id) + return result + +On a retry, the task finds the stored ``job_id`` and reattaches instead of submitting a duplicate job. + +Intra-task checkpointing +~~~~~~~~~~~~~~~~~~~~~~~~ + +For tasks that process paginated or batched data, store the last-completed offset so a retry can resume mid-stream rather than restarting from the beginning. + +.. code-block:: python + + from airflow.sdk import DAG, task + + + with DAG("paginated_ingest", schedule="@daily"): + + @task + def ingest_pages(**context): + task_state = context["task_state"] + + raw = task_state.get("last_page") + start_page = int(raw) + 1 if raw is not None else 1 + + for page in range(start_page, total_pages + 1): + fetch_and_load(page) + task_state.set("last_page", str(page)) + Review Comment: ```suggestion @task def ingest_pages(**context): task_state = context["task_state"] raw = task_state.get("last_page") start_page = raw + 1 if raw is not None else 1 for page in range(start_page, total_pages + 1): fetch_and_load(page) task_state.set("last_page", page) ``` ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task state and asset state values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the value to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference. See `Custom worker-side backends`_ below. + + +Garbage collection semantics +----------------------------- + +The GC job runs periodically and removes state rows according to the following rules: Review Comment: There's no scheduler side job here. Cleanup is triggered by a CLI command (`airflow state-store cleanup-task-states`) as done in: https://github.com/apache/airflow/pull/66463. Also worth noting: the cleanup only works for MetastoreStateBackend. Custom backends are explicitly skipped ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task state and asset state values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the value to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference. See `Custom worker-side backends`_ below. + + +Garbage collection semantics +----------------------------- + +The GC job runs periodically and removes state rows according to the following rules: + +**Time-based expiry (task state only)** + Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. + +**``default_retention_days`` fallback (task state only)** + Keys written with no explicit ``retention`` (i.e. ``expires_at`` is ``NULL``) are governed by the global ``default_retention_days`` setting. When this setting is positive, the GC job treats such rows as expiring ``default_retention_days`` days after their last update. + +**``NEVER_EXPIRE`` keys** + Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = NULL`` and a flag that tells the GC to skip them unconditionally. They are never deleted by time-based cleanup, regardless of ``default_retention_days``. + +**Orphan sweep (asset state)** + Asset state rows for assets that no longer have an ``asset_active`` record are deleted during the orphan-sweep pass. This cleans up state for deactivated or renamed assets. + + +Custom backends +--------------- + +To replace the default metastore backend, subclass :class:`~airflow.sdk.state.BaseStateBackend` and implement all eight abstract methods. + +Abstract methods +~~~~~~~~~~~~~~~~ + +There are four synchronous methods and four async equivalents: + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Method + - Description + * - ``get(scope, key, *, session)`` + - Return the stored value, or ``None``. + * - ``set(scope, key, value, *, expires_at, session)`` + - Write or overwrite a key. ``expires_at`` is a UTC datetime or ``None`` + for non-expiring keys. + * - ``delete(scope, key, *, session)`` + - Delete a single key; no-op if absent. + * - ``clear(scope, *, all_map_indices, session)`` + - Delete all keys under the scope. + * - ``aget(scope, key, *, session)`` + - Async variant of ``get``. + * - ``aset(scope, key, value, *, expires_at, session)`` + - Async variant of ``set``. + * - ``adelete(scope, key, *, session)`` + - Async variant of ``delete``. + * - ``aclear(scope, *, all_map_indices, session)`` + - Async variant of ``clear``. + +Dispatching on scope type +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Each method receives a ``scope`` argument that is either a :class:`~airflow.sdk.state.TaskScope` or an :class:`~airflow.sdk.state.AssetScope`. Use a ``match`` statement to dispatch: + +.. code-block:: python + + from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope + + + class MyBackend(BaseStateBackend): + def get(self, scope, key, *, session=None): + if scope == TaskScope(): + return self._task_store.get(scope, key) + elif scope == AssetScope(): + return self._asset_store.get(scope, key) + +:class:`~airflow.sdk.state.AssetScope` has three optional fields: ``asset_id`` (integer, server-side only), ``name``, and ``uri``. At least one must be set. Server-side operations (REST API calls) provide ``asset_id``. Worker-side operations provide ``name`` or ``uri`` (workers do not have access to the integer ``asset_id``). + +Configure the class via ``[state_store] backend``: + +.. code-block:: ini + + [state_store] + backend = mypackage.state.MyBackend + + +Custom worker-side backends +---------------------------- + +Worker-side backends extend ``BaseStateBackend`` with two pairs of serialization hooks. They are configured separately via ``[workers] state_backend`` and run *on the worker process*, not on the API server. This lets you store large payloads or credentialed data directly on worker infrastructure while only a compact reference string is kept in the database. + +Override these four methods: + +``serialize_task_state_to_ref(*, value, key, ti_id)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``TaskStateAccessor.set()`` before sending the value to the Execution API. Return a reference string (e.g. an S3 key) that will be stored in the database instead of the raw value. + +``deserialize_task_state_from_ref(stored)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``TaskStateAccessor.get()`` after retrieving the reference string from the Execution API. Return the actual value. + +``serialize_asset_state_to_ref(*, value, key, asset_ref)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Same as the task variant, but for asset state. ``asset_ref`` is the asset name or URI, depending on how the accessor was constructed. + +``deserialize_asset_state_from_ref(stored)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``AssetStateAccessor.get()`` to resolve the stored reference back to the actual value. + +.. important:: + + **References must be deterministic.** Given the same inputs (``ti_id`` + ``key`` for task state; ``asset_ref`` + ``key`` for asset state), the serialization method must always return the same reference string. Do not embed timestamps, random UUIDs, or any other non-deterministic component in the reference path. + + When a key is deleted or cleared, Airflow clears the database reference *first*, then calls the backend's ``delete()`` or ``clear()`` method. If backend cleanup fails after the DB row is gone, the external object is orphaned. Because the reference is deterministic, a subsequent ``set()`` for the same key will overwrite the orphaned object, making the failure recoverable. A non-deterministic reference would leave the external object permanently orphaned with no way to locate it. + +Example skeleton: + +.. code-block:: python + + from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope + + + class S3StateBackend(BaseStateBackend): + + def _task_ref(self, ti_id: str, key: str) -> str: + return f"airflow/task-state/{ti_id}/{key}" + + def _asset_ref(self, asset_ref: str, key: str) -> str: + import hashlib + + safe = hashlib.sha256(asset_ref.encode()).hexdigest()[:16] + return f"airflow/asset-state/{safe}/{key}" + + def serialize_task_state_to_ref(self, *, value, key, ti_id): + s3_key = self._task_ref(ti_id, key) + s3_client.put_object(Bucket=BUCKET, Key=s3_key, Body=value.encode()) Review Comment: ```suggestion s3_client.put_object(Bucket=BUCKET, Key=s3_key, Body=json.dumps(value).encode()) ``` `serialize_task_state_to_ref` receives value: JsonValue, which can be a dict, int, list, etc. Calling .encode() directly on those raises AttributeError ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task state and asset state values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the value to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference. See `Custom worker-side backends`_ below. + + +Garbage collection semantics +----------------------------- + +The GC job runs periodically and removes state rows according to the following rules: + +**Time-based expiry (task state only)** + Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. + +**``default_retention_days`` fallback (task state only)** + Keys written with no explicit ``retention`` (i.e. ``expires_at`` is ``NULL``) are governed by the global ``default_retention_days`` setting. When this setting is positive, the GC job treats such rows as expiring ``default_retention_days`` days after their last update. + +**``NEVER_EXPIRE`` keys** + Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = NULL`` and a flag that tells the GC to skip them unconditionally. They are never deleted by time-based cleanup, regardless of ``default_retention_days``. + +**Orphan sweep (asset state)** + Asset state rows for assets that no longer have an ``asset_active`` record are deleted during the orphan-sweep pass. This cleans up state for deactivated or renamed assets. + + +Custom backends +--------------- + +To replace the default metastore backend, subclass :class:`~airflow.sdk.state.BaseStateBackend` and implement all eight abstract methods. + +Abstract methods +~~~~~~~~~~~~~~~~ + +There are four synchronous methods and four async equivalents: + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Method + - Description + * - ``get(scope, key, *, session)`` + - Return the stored value, or ``None``. + * - ``set(scope, key, value, *, expires_at, session)`` + - Write or overwrite a key. ``expires_at`` is a UTC datetime or ``None`` + for non-expiring keys. + * - ``delete(scope, key, *, session)`` + - Delete a single key; no-op if absent. + * - ``clear(scope, *, all_map_indices, session)`` + - Delete all keys under the scope. + * - ``aget(scope, key, *, session)`` + - Async variant of ``get``. + * - ``aset(scope, key, value, *, expires_at, session)`` + - Async variant of ``set``. + * - ``adelete(scope, key, *, session)`` + - Async variant of ``delete``. + * - ``aclear(scope, *, all_map_indices, session)`` + - Async variant of ``clear``. Review Comment: We should mention true signatures here. This makes is confusing. ########## airflow-core/docs/core-concepts/asset-state.rst: ########## @@ -0,0 +1,221 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:asset-state: + +Asset State +=========== + +.. versionadded:: 3.3 + +Asset state is a persistent key/value store scoped to an *asset*, independent of any particular DAG run. Unlike :doc:`task state </core-concepts/task-state>`, which is tied to a single task instance, asset state persists across runs and is logically owned by the asset itself. It is the natural home for cross-run metadata such as watermarks, incremental-load cursors, and per-asset configuration. + +Asset state is accessed through the task context via ``context["asset_state"]``, or via the ``AssetState`` Task SDK mechanism. + + +When is ``asset_state`` available? +------------------------------------ + +``context["asset_state"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. It is *not* available for :class:`~airflow.sdk.definitions.asset.AssetAlias` inlets. Aliases are resolved at runtime, but asset state accessors are only created for concrete assets declared directly on the task. A task must declare at least one concrete inlet or outlet for ``asset_state`` to contain any entries. Review Comment: outlets are never added to the accessors. The outlets-only KeyError behavior is a consequence of this, not a separate known issue. ########## airflow-core/docs/core-concepts/asset-state.rst: ########## @@ -0,0 +1,221 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:asset-state: + +Asset State +=========== + +.. versionadded:: 3.3 + +Asset state is a persistent key/value store scoped to an *asset*, independent of any particular DAG run. Unlike :doc:`task state </core-concepts/task-state>`, which is tied to a single task instance, asset state persists across runs and is logically owned by the asset itself. It is the natural home for cross-run metadata such as watermarks, incremental-load cursors, and per-asset configuration. + +Asset state is accessed through the task context via ``context["asset_state"]``, or via the ``AssetState`` Task SDK mechanism. + + +When is ``asset_state`` available? +------------------------------------ + +``context["asset_state"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. It is *not* available for :class:`~airflow.sdk.definitions.asset.AssetAlias` inlets. Aliases are resolved at runtime, but asset state accessors are only created for concrete assets declared directly on the task. A task must declare at least one concrete inlet or outlet for ``asset_state`` to contain any entries. Review Comment: ```suggestion ``context["asset_state"]`` is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and outlets. A task must declare at least one concrete inlet or outlet for ``asset_state`` to contain any entries. ``` AssetAlias inlets are resolved and their concrete assets are accessible, but you must subscript with the concrete Asset object, not the alias. ########## airflow-core/docs/administration-and-deployment/state-store.rst: ########## @@ -0,0 +1,247 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _state-store: + +State Store Configuration +========================== + +.. versionadded:: 3.3 + +The state store is the persistence layer for :doc:`task state </core-concepts/task-state>` and :doc:`asset state </authoring-and-scheduling/asset-state>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. + +Configuration reference +----------------------- + +All options live under the ``[state_store]`` section of ``airflow.cfg``. + +.. note:: + + The config section is ``[state_store]``, **not** ``[task_state]``. + +``backend`` +~~~~~~~~~~~ + +Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in metastore backend. + +.. code-block:: ini + + [state_store] + backend = mypackage.state.CustomStateBackend + +``default_retention_days`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of days to retain **task state** rows after their last update. Rows older than this are deleted during the next GC pass. + +* Set to ``0`` to disable time-based cleanup entirely. +* Default: ``30``. +* This setting does **not** apply to asset state rows. + +.. code-block:: ini + + [state_store] + default_retention_days = 30 + +``clear_on_success`` +~~~~~~~~~~~~~~~~~~~~ + +When ``True``, all task state keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state after success for observability (e.g.the submitted job ID or the last row count is still readable from the UI orREST API after the run completes). + +.. important:: + + ``clear_on_success`` clears **task state only**. It has no effect on asset state. Asset state is scoped to the asset rather than the task instance and must be cleared explicitly. + +.. code-block:: ini + + [state_store] + clear_on_success = False + +``state_cleanup_batch_size`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Number of rows deleted per batch during GC cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state`` tables to reduce lock contention. + +.. code-block:: ini + + [state_store] + state_cleanup_batch_size = 10000 + +Worker-side backend (``[workers] state_backend``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A separate, optional config key under ``[workers]`` lets you route task state and asset state values through a worker-side backend before they reach the API server. + +.. code-block:: ini + + [workers] + state_backend = mypackage.state.S3StateBackend + +When this is set, ``TaskStateAccessor.set()`` calls ``serialize_task_state_to_ref()`` on the worker-side backend before sending the value to the Execution API, and ``get()`` calls ``deserialize_task_state_from_ref()`` after receiving the stored reference. See `Custom worker-side backends`_ below. + + +Garbage collection semantics +----------------------------- + +The GC job runs periodically and removes state rows according to the following rules: + +**Time-based expiry (task state only)** + Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. + +**``default_retention_days`` fallback (task state only)** + Keys written with no explicit ``retention`` (i.e. ``expires_at`` is ``NULL``) are governed by the global ``default_retention_days`` setting. When this setting is positive, the GC job treats such rows as expiring ``default_retention_days`` days after their last update. + +**``NEVER_EXPIRE`` keys** + Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = NULL`` and a flag that tells the GC to skip them unconditionally. They are never deleted by time-based cleanup, regardless of ``default_retention_days``. + +**Orphan sweep (asset state)** + Asset state rows for assets that no longer have an ``asset_active`` record are deleted during the orphan-sweep pass. This cleans up state for deactivated or renamed assets. + + +Custom backends +--------------- + +To replace the default metastore backend, subclass :class:`~airflow.sdk.state.BaseStateBackend` and implement all eight abstract methods. + +Abstract methods +~~~~~~~~~~~~~~~~ + +There are four synchronous methods and four async equivalents: + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Method + - Description + * - ``get(scope, key, *, session)`` + - Return the stored value, or ``None``. + * - ``set(scope, key, value, *, expires_at, session)`` + - Write or overwrite a key. ``expires_at`` is a UTC datetime or ``None`` + for non-expiring keys. + * - ``delete(scope, key, *, session)`` + - Delete a single key; no-op if absent. + * - ``clear(scope, *, all_map_indices, session)`` + - Delete all keys under the scope. + * - ``aget(scope, key, *, session)`` + - Async variant of ``get``. + * - ``aset(scope, key, value, *, expires_at, session)`` + - Async variant of ``set``. + * - ``adelete(scope, key, *, session)`` + - Async variant of ``delete``. + * - ``aclear(scope, *, all_map_indices, session)`` + - Async variant of ``clear``. + +Dispatching on scope type +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Each method receives a ``scope`` argument that is either a :class:`~airflow.sdk.state.TaskScope` or an :class:`~airflow.sdk.state.AssetScope`. Use a ``match`` statement to dispatch: + +.. code-block:: python + + from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope + + + class MyBackend(BaseStateBackend): + def get(self, scope, key, *, session=None): + if scope == TaskScope(): + return self._task_store.get(scope, key) + elif scope == AssetScope(): + return self._asset_store.get(scope, key) + +:class:`~airflow.sdk.state.AssetScope` has three optional fields: ``asset_id`` (integer, server-side only), ``name``, and ``uri``. At least one must be set. Server-side operations (REST API calls) provide ``asset_id``. Worker-side operations provide ``name`` or ``uri`` (workers do not have access to the integer ``asset_id``). + +Configure the class via ``[state_store] backend``: + +.. code-block:: ini + + [state_store] + backend = mypackage.state.MyBackend + + +Custom worker-side backends +---------------------------- + +Worker-side backends extend ``BaseStateBackend`` with two pairs of serialization hooks. They are configured separately via ``[workers] state_backend`` and run *on the worker process*, not on the API server. This lets you store large payloads or credentialed data directly on worker infrastructure while only a compact reference string is kept in the database. + +Override these four methods: + +``serialize_task_state_to_ref(*, value, key, ti_id)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``TaskStateAccessor.set()`` before sending the value to the Execution API. Return a reference string (e.g. an S3 key) that will be stored in the database instead of the raw value. + +``deserialize_task_state_from_ref(stored)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``TaskStateAccessor.get()`` after retrieving the reference string from the Execution API. Return the actual value. + +``serialize_asset_state_to_ref(*, value, key, asset_ref)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Same as the task variant, but for asset state. ``asset_ref`` is the asset name or URI, depending on how the accessor was constructed. + +``deserialize_asset_state_from_ref(stored)`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Called by ``AssetStateAccessor.get()`` to resolve the stored reference back to the actual value. + +.. important:: + + **References must be deterministic.** Given the same inputs (``ti_id`` + ``key`` for task state; ``asset_ref`` + ``key`` for asset state), the serialization method must always return the same reference string. Do not embed timestamps, random UUIDs, or any other non-deterministic component in the reference path. + + When a key is deleted or cleared, Airflow clears the database reference *first*, then calls the backend's ``delete()`` or ``clear()`` method. If backend cleanup fails after the DB row is gone, the external object is orphaned. Because the reference is deterministic, a subsequent ``set()`` for the same key will overwrite the orphaned object, making the failure recoverable. A non-deterministic reference would leave the external object permanently orphaned with no way to locate it. + +Example skeleton: + +.. code-block:: python + + from airflow.sdk.state import BaseStateBackend, TaskScope, AssetScope + + + class S3StateBackend(BaseStateBackend): Review Comment: Type hints for methods pls -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
