amoghrajesh commented on code in PR #67299:
URL: https://github.com/apache/airflow/pull/67299#discussion_r3360468590


##########
airflow-core/docs/core-concepts/task-store.rst:
##########
@@ -0,0 +1,276 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:task-store:
+
+.. spelling:word-list::
+
+   intra
+   Intra
+   checkpointing
+
+Task Store
+==========
+
+.. versionadded:: 3.3
+
+Task store is a persistent key/value store scoped to a single task instance 
(``dag_id`` + ``run_id`` + ``task_id`` + ``map_index``). It survives worker 
crashes and task retries within the same Dag run, making it suitable for 
storing external job IDs, intra-task checkpoints, and progress metadata.
+
+Data persisted via task store is accessed through the task context via 
``context["task_store"]`` and exposes four methods: ``get``, ``set``, 
``delete``, and ``clear``.
+
+
+Accessing task store
+--------------------
+
+Inside any ``@task``-decorated function or ``BaseOperator.execute()`` method, 
task store is available through the ``context`` dictionary via the 
``task_store`` key. From there, it can be used to retrieve, set, delete, or 
clear data for a specific key-value pair. In this example, the ``job_id`` is 
retrieved from task store, then updated, before being deleted. All data for 
that task is then removed using the ``clear`` method.
+
+.. code-block:: python
+
+    from airflow.sdk import task
+    import random
+
+
+    @task
+    def my_task(**context):
+        # Retrieve task_store from context
+        task_store = context["task_store"]
+        my_value = task_store.get("my_key", default="my_default_key")
+
+        # Set the new value
+        new_value = f"It is {random.randint(1, 12 + 1)} o'clock"
+        task_store.set("my_key", new_value)
+
+        # Delete the value
+        task_store.delete("my_key")
+
+        # Clear all store entries for the task
+        task_store.clear()
+
+Reference
+-------------
+
+``get(key, default)``
+~~~~~~~~~~~~~~~~~~~~~~
+
+Returns the stored string value, or the ``default`` value if the key does not 
exist.

Review Comment:
   ```suggestion
   Returns the stored JSON value, or the ``default`` value if the key does not 
exist.
   ```



##########
airflow-core/docs/administration-and-deployment/task-and-asset-store.rst:
##########
@@ -0,0 +1,210 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _task-and-asset-store:
+
+Task and Asset Store Configuration
+====================================
+
+.. versionadded:: 3.3
+
+The task and asset store is the persistence layer for :doc:`task store 
</core-concepts/task-store>` and :doc:`asset store 
</core-concepts/asset-store>`. By default, both are stored in the Airflow 
metadata database. This page describes the available configuration options, 
garbage-collection semantics, and how to provide a custom backend.
+
+Configuration reference
+-----------------------
+
+All options live under the ``[state_store]`` section of ``airflow.cfg``.
+
+.. note::
+
+   The config section is ``[state_store]``, **not** ``[task_store]``.
+
+``backend``
+~~~~~~~~~~~
+
+Full dotted path to a class that implements 
:class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in 
metastore backend.
+
+.. code-block:: ini
+
+    [state_store]
+    backend = mypackage.state.CustomStateBackend
+
+``default_retention_days``
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Number of days to retain **task store** rows after their last update. Rows 
older than this are deleted during the next garbage collection pass.

Review Comment:
   ```suggestion
   Number of days after which task store rows expire. When a key is written 
with no explicit retention, expires_at is computed on the worker as now + 
default_retention_days. Changing this setting does not affect already-written 
rows.
   ```



##########
airflow-core/docs/administration-and-deployment/task-and-asset-store.rst:
##########
@@ -0,0 +1,210 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _task-and-asset-store:
+
+Task and Asset Store Configuration
+====================================
+
+.. versionadded:: 3.3
+
+The task and asset store is the persistence layer for :doc:`task store 
</core-concepts/task-store>` and :doc:`asset store 
</core-concepts/asset-store>`. By default, both are stored in the Airflow 
metadata database. This page describes the available configuration options, 
garbage-collection semantics, and how to provide a custom backend.
+
+Configuration reference
+-----------------------
+
+All options live under the ``[state_store]`` section of ``airflow.cfg``.
+
+.. note::
+
+   The config section is ``[state_store]``, **not** ``[task_store]``.
+
+``backend``
+~~~~~~~~~~~
+
+Full dotted path to a class that implements 
:class:`~airflow.sdk.state.BaseStateBackend`. Defaults to the built-in 
metastore backend.
+
+.. code-block:: ini
+
+    [state_store]
+    backend = mypackage.state.CustomStateBackend
+
+``default_retention_days``
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Number of days to retain **task store** rows after their last update. Rows 
older than this are deleted during the next garbage collection pass.
+
+* Set to ``0`` to disable time-based cleanup entirely.
+* Default: ``30``.
+* This setting does **not** apply to asset store rows.
+
+.. code-block:: ini
+
+    [state_store]
+    default_retention_days = 30
+
+``clear_on_success``
+~~~~~~~~~~~~~~~~~~~~
+
+When ``True``, all task store keys for a task instance are automatically 
deleted when that task instance moves to the ``success`` state. Defaults to 
``False``, which preserves task store entries after success for observability 
(e.g. the submitted job ID or the last row count is still readable from the UI 
or REST API after the run completes).
+
+.. important::
+
+   ``clear_on_success`` clears **task store only**. It has no effect on asset 
store. Asset store is scoped to the asset rather than the task instance and 
must be cleared explicitly.
+
+.. code-block:: ini
+
+    [state_store]
+    clear_on_success = False
+
+``state_cleanup_batch_size``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Number of rows deleted per batch during garbage collection cleanup. Set to 
``0`` (default) to delete all matching rows in a single statement. Tune this on 
deployments with large ``task_store`` tables to reduce lock contention.
+
+.. code-block:: ini
+
+    [state_store]
+    state_cleanup_batch_size = 10000
+
+.. _task-and-asset-store:worker-backends:
+
+Worker-side backend (``[workers] state_backend``)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+A separate, optional config key under ``[workers]`` lets you route task store 
and asset store values through a worker-side backend before they reach the API 
server.
+
+.. code-block:: ini
+
+    [workers]
+    state_backend = mypackage.state.S3StateBackend
+
+When this is set, ``TaskStoreAccessor.set()`` calls 
``serialize_task_store_to_ref()`` on the worker-side backend before sending the 
returned value (a reference to the actual storage) to the Execution API, and 
``get()`` calls ``deserialize_task_store_from_ref()`` after receiving the 
stored reference from the Execution API. See `Custom worker-side backends`_ 
below.
+
+
+Garbage collection semantics
+-----------------------------
+
+The cleanup task, also known as "garbage collection" is triggered using the 
Airflow CLI. The command to trigger the cleanup task is ``airflow state-store 
cleanup-task-store``. This process removes store rows according to the 
following rules:
+
+**Time-based expiry (task store only)**
+  Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on 
the *worker* at write time, not by the server.
+
+**``default_retention_days`` fallback (task store only)**
+  Keys written with no explicit ``retention`` (i.e. ``expires_at`` is 
``NULL``) are governed by the global ``default_retention_days`` setting. When 
this setting is positive, the garbage collection job treats such rows as 
expiring ``default_retention_days`` days after their last update.

Review Comment:
   ```suggestion
     Keys written with no explicit retention get an ``expires_at`` of now + 
default_retention_days computed at write time. Garbage collection deletes rows 
where ``expires_at < now()``."
   ```



##########
airflow-core/docs/core-concepts/asset-store.rst:
##########
@@ -0,0 +1,207 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _concepts:asset-store:
+
+.. spelling:word-list::
+
+   subscripted
+   subscripting
+
+Asset Store
+===========
+
+.. versionadded:: 3.3
+
+Asset store is a persistent key/value store scoped to an *asset*, independent 
of any particular DAG run. Unlike :doc:`task store 
</core-concepts/task-store>`, which is tied to a single task instance, asset 
store persists across runs and is logically owned by the asset itself. It is 
the natural home for cross-run metadata such as watermarks, incremental-load 
cursors, and per-asset configuration.
+
+Asset store is accessed through the task context via 
``context["asset_store"]``.
+
+
+When is ``asset_store`` available?
+------------------------------------
+
+When using asset store within a task, ``context["asset_store"]`` is populated 
for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` inlets and 
outlets. A task must declare at least one concrete inlet or outlet for 
``asset_store`` to contain any entries.
+
+If using asset store in a ``BaseEventTrigger``, the ``self.asset_store`` 
parameter can be used within the ``BaseEventTrigger``. It can be subscripted in 
the same way that ``context["asset_store"]`` can be.
+
+.. warning::
+
+   **Outlets-only tasks**: if a task declares only ``outlets`` (no 
``inlets``), ``context["asset_store"][my_asset]`` may raise a ``KeyError`` at 
runtime. The workaround is to declare the asset in **both** ``inlets`` and 
``outlets``.
+
+   .. code-block:: python
+
+       # my_asset defined above ...
+
+
+       @task(inlets=[my_asset], outlets=[my_asset])
+       def write_asset(**context):
+           context["asset_store"][my_asset].set("watermark", "2024-01-01")
+
+   This known issue will be resolved in a future release.
+
+
+Accessing asset store using ``context``
+---------------------------------------
+
+An asset becomes available through context["asset_store"] when it is included 
in inlets (or in both inlets and outlets). You can then retrieve its asset 
store by subscripting context["asset_store"] with the asset object.
+
+.. code-block:: python
+
+    from airflow.sdk import Asset, DAG, task
+
+    my_asset = Asset("my_data", uri="s3://bucket/my_data")
+
+    with DAG("example_asset_store", schedule=None):
+
+        @task(inlets=[my_asset], outlets=[my_asset])
+        def process(**context):
+            asset_store = context["asset_store"][my_asset]
+            watermark = asset_store.get("watermark")
+            asset_store.set("watermark", "2024-06-01")
+
+To see asset store in-action in a real DAG, checkout the DAG in 
`example_asset_store.py 
<https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_asset_store.py>`_.
+
+Single-inlet shorthand
+~~~~~~~~~~~~~~~~~~~~~~~
+
+For tasks with exactly **one** concrete inlet, you can call ``get``, ``set``, 
``delete``, and ``clear`` directly on ``context["asset_store"]`` without 
subscripting.
+
+.. code-block:: python
+
+    @task(inlets=[my_asset], outlets=[my_asset])
+    def process_single(**context):
+        asset_store = context["asset_store"]
+        watermark = asset_store.get("watermark")
+        asset_store.set("watermark", "2024-06-01")
+
+If the task has more than one concrete inlet, calling the shorthand raises a 
``ValueError``. Use the subscript form (``context["asset_store"][my_asset]``) 
whenever a task has multiple inlets.
+
+
+API reference
+-------------
+
+The following methods are available on both the per-asset accessor 
(``context["asset_store"][my_asset]``), the shorthand 
(``context["asset_store"]``) when the task has exactly one inlet, and when 
using the ``self.asset_store`` attribute.
+
+``get(key, default)``
+~~~~~~~~~~~~
+
+Returns the stored JSON value, or ``None`` if the key does not exist.

Review Comment:
   ```suggestion
   Returns the stored JSON value, or the ``default`` value if the key does not 
exist.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to