This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 61e2f22d8a2 Document what happens when task without inlets / outlets 
accesses `asset_state_store` (#68886)
61e2f22d8a2 is described below

commit 61e2f22d8a238fa32dcc3c41aee59e462135029b
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jun 23 14:28:00 2026 +0530

    Document what happens when task without inlets / outlets accesses 
`asset_state_store` (#68886)
---
 .../docs/core-concepts/asset-state-store.rst       | 24 ++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/airflow-core/docs/core-concepts/asset-state-store.rst 
b/airflow-core/docs/core-concepts/asset-state-store.rst
index d3dec61f956..1f019d89d22 100644
--- a/airflow-core/docs/core-concepts/asset-state-store.rst
+++ b/airflow-core/docs/core-concepts/asset-state-store.rst
@@ -37,6 +37,30 @@ When is ``asset_state_store`` available?
 
 When using asset state store within a task, ``context["asset_state_store"]`` 
is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset` 
inlets and outlets. A task must declare at least one concrete inlet or outlet 
for ``asset_state_store`` to contain any entries.
 
+If a task has no inlets or outlets and accesses 
``context["asset_state_store"]``, a ``KeyError`` is raised at runtime. Declare 
at least one asset inlet or outlet on the task to use asset state store.
+
+This applies to both the ``@asset`` pattern and the ``@task`` pattern:
+
+.. code-block:: python
+
+    from airflow.sdk import Asset, DAG, asset, task
+
+    my_asset = Asset("my_data", uri="s3://bucket/my_data")
+
+
+    # @asset Dags implicitly declare the asset as an outlet
+    @asset
+    def my_asset_dag(**context):
+        context["asset_state_store"].set("watermark", "2024-06-01")
+
+
+    # @task within a @dag requires explicit inlets or outlets
+    with DAG("example", schedule=None):
+
+        @task(outlets=[my_asset])
+        def my_task(**context):
+            context["asset_state_store"].set("watermark", "2024-06-01")
+
 Accessing asset state store using ``context``
 ---------------------------------------------
 

Reply via email to