This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 61e2f22d8a2 Document what happens when task without inlets / outlets
accesses `asset_state_store` (#68886)
61e2f22d8a2 is described below
commit 61e2f22d8a238fa32dcc3c41aee59e462135029b
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jun 23 14:28:00 2026 +0530
Document what happens when task without inlets / outlets accesses
`asset_state_store` (#68886)
---
.../docs/core-concepts/asset-state-store.rst | 24 ++++++++++++++++++++++
1 file changed, 24 insertions(+)
diff --git a/airflow-core/docs/core-concepts/asset-state-store.rst
b/airflow-core/docs/core-concepts/asset-state-store.rst
index d3dec61f956..1f019d89d22 100644
--- a/airflow-core/docs/core-concepts/asset-state-store.rst
+++ b/airflow-core/docs/core-concepts/asset-state-store.rst
@@ -37,6 +37,30 @@ When is ``asset_state_store`` available?
When using asset state store within a task, ``context["asset_state_store"]``
is populated for **concrete** :class:`~airflow.sdk.definitions.asset.Asset`
inlets and outlets. A task must declare at least one concrete inlet or outlet
for ``asset_state_store`` to contain any entries.
+If a task has no inlets or outlets and accesses
``context["asset_state_store"]``, a ``KeyError`` is raised at runtime. Declare
at least one asset inlet or outlet on the task to use asset state store.
+
+This applies to both the ``@asset`` pattern and the ``@task`` pattern:
+
+.. code-block:: python
+
+ from airflow.sdk import Asset, DAG, asset, task
+
+ my_asset = Asset("my_data", uri="s3://bucket/my_data")
+
+
+ # @asset Dags implicitly declare the asset as an outlet
+ @asset
+ def my_asset_dag(**context):
+ context["asset_state_store"].set("watermark", "2024-06-01")
+
+
+ # @task within a @dag requires explicit inlets or outlets
+ with DAG("example", schedule=None):
+
+ @task(outlets=[my_asset])
+ def my_task(**context):
+ context["asset_state_store"].set("watermark", "2024-06-01")
+
Accessing asset state store using ``context``
---------------------------------------------