amoghrajesh commented on code in PR #68900:
URL: https://github.com/apache/airflow/pull/68900#discussion_r3472087663
##########
airflow-core/docs/core-concepts/asset-state-store.rst:
##########
@@ -150,6 +150,80 @@ Deletes *all* asset state store keys for the asset.
# Using context
context["asset_state_store"][my_asset].clear()
+Using ``asset_state_store`` inside a Watcher Trigger
+-----------------------------------------------------
+
+:class:`~airflow.triggers.base.BaseEventTrigger` subclasses (watcher triggers)
can read and write asset state store directly from within ``run()``. The
triggerer injects ``self.asset_state_store`` before ``run()`` is called, scoped
to the asset the trigger is watching.
Review Comment:
```suggestion
:class:`~airflow.triggers.base.BaseEventTrigger` subclasses (watcher
triggers) can read and write asset state store directly from within ``run()``.
The triggerer injects ``self.asset_state_store`` before ``run()`` is called,
scoped to the asset the trigger is watching. It is not available during
``__init__`` or ``serialize()``, only access it from within ``run()``.
```
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state_store.py:
##########
@@ -130,18 +132,31 @@ def _put_asset_state_store(
) -> None:
backend = get_state_backend()
if isinstance(backend, MetastoreBackend):
- dag_id, run_id, task_id, map_index = _fetch_ti_writer_fields(token,
session)
- backend.set_asset_state_store(
- scope,
- key,
- json.dumps(body.value),
- kind=AssetStateStoreWriterKind.TASK,
- dag_id=dag_id,
- run_id=run_id,
- task_id=task_id,
- map_index=map_index,
- session=session,
- )
+ if token.id == NULL_UUID:
+ # A watcher trigger writing via the triggerer's in-process client
has a null-UUID
Review Comment:
```suggestion
# Since the asset state store routes do not have
`task_instance_id` in their path params, the default kicks in which
is"00000000-0000-0000-0000-000000000000"
```
##########
task-sdk/tests/task_sdk/execution_time/test_request_handlers.py:
##########
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.sdk.api import client as sdk_client
+from airflow.sdk.api.datamodels._generated import AssetStateStoreResponse
+from airflow.sdk.exceptions import ErrorType
+from airflow.sdk.execution_time.comms import (
+ AssetStateStoreResult,
+ ClearAssetStateStoreByName,
+ ClearAssetStateStoreByUri,
+ DeleteAssetStateStoreByName,
+ DeleteAssetStateStoreByUri,
+ ErrorResponse,
+ GetAssetStateStoreByName,
+ GetAssetStateStoreByUri,
+ SetAssetStateStoreByName,
+ SetAssetStateStoreByUri,
+)
+from airflow.sdk.execution_time.request_handlers import (
+ handle_clear_asset_state_store_by_name,
+ handle_clear_asset_state_store_by_uri,
+ handle_delete_asset_state_store_by_name,
+ handle_delete_asset_state_store_by_uri,
+ handle_get_asset_state_store_by_name,
+ handle_get_asset_state_store_by_uri,
+ handle_set_asset_state_store_by_name,
+ handle_set_asset_state_store_by_uri,
+)
+
+
[email protected]
+def client():
+ return MagicMock(spec=sdk_client.Client)
+
+
+def test_get_asset_state_store_by_name_wraps_response_as_result(client):
+ client.asset_state_store.get.return_value =
AssetStateStoreResponse(value="2026-01-01")
+
+ result, dump_opts = handle_get_asset_state_store_by_name(
+ client, GetAssetStateStoreByName(name="asset_a", key="watermark")
+ )
+
+ client.asset_state_store.get.assert_called_once_with(key="watermark",
name="asset_a")
+ assert result == AssetStateStoreResult(value="2026-01-01")
+ assert dump_opts == {}
+
+
+def test_get_asset_state_store_by_name_passes_through_error_response(client):
+ err = ErrorResponse(error=ErrorType.ASSET_STORE_NOT_FOUND, detail={"key":
"watermark"})
+ client.asset_state_store.get.return_value = err
+
+ result, dump_opts = handle_get_asset_state_store_by_name(
+ client, GetAssetStateStoreByName(name="asset_a", key="watermark")
+ )
+
+ assert result is err
+ assert dump_opts == {}
+
+
+def test_get_asset_state_store_by_uri_wraps_response_as_result(client):
+ client.asset_state_store.get.return_value =
AssetStateStoreResponse(value="2026-01-01")
+
+ result, dump_opts = handle_get_asset_state_store_by_uri(
+ client, GetAssetStateStoreByUri(uri="s3://bucket/a", key="watermark")
+ )
+
+ client.asset_state_store.get.assert_called_once_with(key="watermark",
uri="s3://bucket/a")
+ assert result == AssetStateStoreResult(value="2026-01-01")
+ assert dump_opts == {}
+
+
+def test_get_asset_state_store_by_uri_passes_through_error_response(client):
+ err = ErrorResponse(error=ErrorType.ASSET_STORE_NOT_FOUND, detail={"key":
"watermark"})
+ client.asset_state_store.get.return_value = err
+
+ result, dump_opts = handle_get_asset_state_store_by_uri(
+ client, GetAssetStateStoreByUri(uri="s3://bucket/a", key="watermark")
+ )
+
+ assert result is err
+ assert dump_opts == {}
+
+
[email protected](
+ ("handler", "msg", "call_kwargs", "method"),
+ [
+ (
+ handle_set_asset_state_store_by_name,
+ SetAssetStateStoreByName,
+ {
+ "name": "asset_a",
+ "key": "watermark",
+ "value": "2026-01-01",
+ },
+ "set",
+ ),
+ (
+ handle_set_asset_state_store_by_uri,
+ SetAssetStateStoreByUri,
+ {
+ "uri": "s3://bucket/a",
+ "key": "watermark",
+ "value": "2026-01-01",
+ },
+ "set",
+ ),
+ (
+ handle_delete_asset_state_store_by_name,
+ DeleteAssetStateStoreByName,
+ {
+ "name": "asset_a",
+ "key": "watermark",
+ },
+ "delete",
+ ),
+ (
+ handle_delete_asset_state_store_by_uri,
+ DeleteAssetStateStoreByUri,
+ {
+ "uri": "s3://bucket/a",
+ "key": "watermark",
+ },
+ "delete",
+ ),
+ (
+ handle_clear_asset_state_store_by_name,
+ ClearAssetStateStoreByName,
+ {
+ "name": "asset_a",
+ },
+ "clear",
+ ),
+ (
+ handle_clear_asset_state_store_by_uri,
+ ClearAssetStateStoreByUri,
+ {
+ "uri": "s3://bucket/a",
+ },
+ "clear",
+ ),
+ ],
+)
+def test_asset_store_store_delegates_to_client(client, handler, msg,
call_kwargs, method):
Review Comment:
```suggestion
def test_asset_store_delegates_to_client(client, handler, msg, call_kwargs,
method):
```
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state_store.py:
##########
@@ -130,18 +132,31 @@ def _put_asset_state_store(
) -> None:
backend = get_state_backend()
if isinstance(backend, MetastoreBackend):
- dag_id, run_id, task_id, map_index = _fetch_ti_writer_fields(token,
session)
- backend.set_asset_state_store(
- scope,
- key,
- json.dumps(body.value),
- kind=AssetStateStoreWriterKind.TASK,
- dag_id=dag_id,
- run_id=run_id,
- task_id=task_id,
- map_index=map_index,
- session=session,
- )
+ if token.id == NULL_UUID:
+ # A watcher trigger writing via the triggerer's in-process client
has a null-UUID
+ backend.set_asset_state_store(
+ scope,
+ key,
+ json.dumps(body.value),
+ kind=AssetStateStoreWriterKind.WATCHER,
+ session=session,
+ )
+ else:
+ # Retrieve TI fields, since token.id is not NULL_UUID
Review Comment:
```suggestion
```
##########
airflow-core/docs/core-concepts/asset-state-store.rst:
##########
@@ -150,6 +150,80 @@ Deletes *all* asset state store keys for the asset.
# Using context
context["asset_state_store"][my_asset].clear()
+Using ``asset_state_store`` inside a Watcher Trigger
+-----------------------------------------------------
+
+:class:`~airflow.triggers.base.BaseEventTrigger` subclasses (watcher triggers)
can read and write asset state store directly from within ``run()``. The
triggerer injects ``self.asset_state_store`` before ``run()`` is called, scoped
to the asset the trigger is watching.
+
+Unlike task-based access (where the asset is identified by an inlet or outlet
declaration), the accessor in a watcher trigger is automatically bound to the
watched asset, so no subscripting is needed.
+
+.. code-block:: python
+
+ import asyncio
+ from collections.abc import AsyncIterator
+ from typing import Any
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+
+ class PollEventsTrigger(BaseEventTrigger):
+ def __init__(self, source: str, waiter_delay: int, **kwargs):
+ super().__init__(**kwargs)
+ self.source = source
+ self.waiter_delay = waiter_delay
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ return (
+ f"{self.__class__.__module__}.{self.__class__.__qualname__}",
+ {"source": self.source, "waiter_delay": self.waiter_delay},
+ )
+
+ def _poll_for_new_record(self, source: str, last_seen: str) -> str |
None:
+ ... # Add logic for polling a certain source
+ return None
+
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ while True:
+ last_seen = self.asset_state_store.get("last_seen_id",
default=0)
+ new_id = self._poll_for_new_record(
+ source=self.source,
+ last_seen=last_seen,
+ )
+
+ if new_id is not None:
+ self.asset_state_store.set("last_seen_id", new_id)
+ yield TriggerEvent({"status": "success", "record_id":
new_id})
+ return
+
+ await asyncio.sleep(self.waiter_delay)
+
+The corresponding :class:`~airflow.sdk.definitions.asset.AssetWatcher` wires
the trigger to the asset:
+
+.. code-block:: python
+
+ from airflow.sdk import Asset, AssetWatcher
+
+ # Import trigger from internal module (wherever it was written)
Review Comment:
```suggestion
from my_dag.triggers import PollEventsTrigger
```
That's too informal, sounds like a todo, not something we want in docs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]