amoghrajesh opened a new pull request, #66160:
URL: https://github.com/apache/airflow/pull/66160

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   closes: https://github.com/apache/airflow/issues/65779
   
   This PR is part of AIP-103 (Task State Management) and is the third in the 
series. It adds the Task SDK layer: comms message types, supervisor proxy, and 
context accessors that wire `context['task_state']` and 
`context['asset_state']` into task execution.
   
   ## What does the PR have?
   
   **Comms layer** (`comms.py`):
   - `GetTaskState`, `SetTaskState`, `DeleteTaskState`, `ClearTaskState` (with 
`all_map_indices: bool` field)
   - `GetAssetStateByName`/`GetAssetStateByUri`, 
`SetAssetStateByName`/`SetAssetStateByUri`, 
`DeleteAssetStateByName`/`DeleteAssetStateByUri`, 
`ClearAssetStateByName`/`ClearAssetStateByUri` — separate typed classes per 
addressing mode, matching the `GetAssetByName`/`GetAssetByUri` convention
   - `GetAssetsByAlias` + `AssetsByAliasResult` — resolves an `AssetAlias` 
inlet to its concrete assets at context build time
   - `TaskStateResult`, `AssetStateResult` result types
   
   **Supervisor** (`supervisor.py`): handler branches proxying the above 
messages to the Execution API endpoints from PR #66073.
   
   **Client** (`client.py`): `TaskStateOperations` and `AssetStateOperations` 
classes exposed as `client.task_state` and `client.asset_state`. 
`AssetStateOperations` has a `_resolve_endpoint` helper that builds the 
`by-name/{op}` or `by-uri/{op}` endpoint + params, keeping 
`get`/`set`/`delete`/`clear` each a one-liner. `AssetOperations.get_by_alias()` 
resolves alias → concrete assets.
   
   **Execution API** (`routes/assets.py`): `GET 
/assets/by-alias?alias_name=...` wrapping the existing 
`expand_alias_to_assets()` DB function. Cadwyn migration added in 
`v2026_04_17.py`.
   
   **Context accessors** (`context.py`):
   - `TaskStateAccessor` — always available as `context['task_state']`
   - `AssetStateAccessors` container + `AssetStateAccessor` (per-asset):
     - `context['asset_state'][MY_ASSET].get('watermark')` — keyed by `Asset | 
AssetNameRef | AssetUriRef | AssetAlias`, consistent with `inlet_events[asset]`
     - Single-inlet sugar: `context['asset_state'].get('watermark')` proxies 
through when exactly one concrete inlet exists; raises `ValueError` with a 
clear message for multi-inlet tasks
     - `AssetAlias` inlets are resolved to their concrete assets at context 
build time via `GetAssetsByAlias` comms — 
`context['asset_state'][Asset(name="a")]` works when the alias maps to that 
asset. If the alias resolves to nothing, `asset_state` is present but empty.
   
   **Context wiring** (`task_runner.py`): both accessors registered in 
`get_template_context()`. `asset_state` is set for any task with at least one 
concrete inlet including aliases.
   
   **Context TypedDict** (`definitions/context.py`): `task_state: 
TaskStateAccessor` and `asset_state: AssetStateAccessors` added.
   
   ## Design choices worth flagging
   
   1. **Asset state routes use `name`/`uri` not `asset_id`.** Asset names and 
URIs are unique, directly on the `Asset` object at runtime, consistent with 
`/assets/by-name` and `/assets/by-uri`. Avoids a DB round trip.
   
   2. **`clear()` wiping entire fleet is opt-in.** `DELETE /state/ti/{ti_id}` 
defaults to clearing only this task instance's `map_index`. Pass 
`?all_map_indices=true` (or `task_state.clear(all_map_indices=True)`) for 
fleet-wide wipe.
   
   3. **Both keyed and sugar access on `asset_state`.** 
`context['asset_state'][MY_ASSET]` is the primary API. For single-inlet tasks 
(the watcher pattern), `context['asset_state'].get(...)` works as sugar. 
Consistent with `inlet_events[asset]`.
   
   4. **`AssetAlias` resolution is event-driven.** The alias -> concrete asset 
mapping in `expand_alias_to_assets()` is populated when a producer emits 
through the alias. If the alias has never been emitted through, `asset_state` 
is present but empty.
   
   5. **`__getattr__` not implemented.** Template access like `{{ 
task_state.job_id }}` is not supported yet. Easy to add later.
   
   ## Test plan
   
   - Unit: `TestTaskStateAccessor`, `TestAssetStateAccessor`, 
`TestAssetStateAccessors` in `test_context.py`
   - Supervisor: `TestHandleRequest` in `test_supervisor.py` — includes 
`GetAssetsByAlias` case
   - Client: `TestTaskStateOperations`, `TestAssetStateOperations`, 
`get_by_alias` cases in `test_client.py`
   - Integration: `TestTaskInstanceStateOperations` in `test_task_runner.py` — 
covers multi-inlet keyed access, `AssetUriRef` inlet, `AssetAlias` inlet 
resolving to concrete asset, and empty alias
   
   ### Manual verification for the new asset alias endpoint
   
   DAG:
   ```python
   from airflow.sdk import DAG, task
   from airflow.sdk.definitions.asset import Asset, AssetAlias
   import pendulum
   
   my_asset = Asset(name="test_asset", uri="s3://bucket/test")
   my_alias = AssetAlias("test_alias")
   
   with DAG("alias_producer", schedule=None, start_date=pendulum.datetime(2026, 
1, 1)):
   
       @task(outlets=[my_alias, my_asset])
       def produce(**context):
           context["outlet_events"][my_alias].add(my_asset, extra={"run": "1"})
       produce()
   
   
   with DAG("alias_consumer", schedule=None, start_date=pendulum.datetime(2026, 
1, 1)):
   
       @task(inlets=[my_alias])
       def consume(**context):
           print("=== inlet_events ===")
           try:
               events = list(context["inlet_events"][my_alias])
               print(f"events: {events}")
           except Exception as e:
               print(f"inlet_events error: {e}")
   
           print("=== asset_state ===")
           print("asset_state:", context.get("asset_state"))
   
           try:
               state = context["asset_state"][my_asset]
               print("current watermark:", state.get("watermark"))
               state.set("watermark", "2026-05-06")
               print("set watermark to 2026-05-06")
               print("read back:", state.get("watermark"))
           except Exception as e:
               print(f"asset_state error: {e}")
   
       consume()
   
   ```
   
   When I do this, this is what I get from the consumer task:
   
   <img width="2447" height="1006" alt="image" 
src="https://github.com/user-attachments/assets/2cdd60e7-0792-431d-bf9d-254ca20c384f";
 />
   
   ie: producer emits through alias, consumer accesses 
`context['asset_state'][my_asset]`, set/get watermark works
   
   
   
   ---
   
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes: claude sonnet 4.6
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to