amoghrajesh commented on PR #66160:
URL: https://github.com/apache/airflow/pull/66160#issuecomment-4388552651

   Added GET `/assets/by-alias` execution API endpoint so that `AssetAlias` 
inlets can be resolved to their concrete assets at context build time. Without 
this, the worker only knows the alias name and it doesn't know which concrete 
assets the alias maps to, so `context["asset_state"][my_asset]` had no way to 
work. The alias -> asset mapping is event-driven (populated when a producer 
emits through the alias), so asset_state is present but empty if the alias 
hasn't been used yet. Handled in: [ability to fetch asset_state for aliases 
too](https://github.com/apache/airflow/pull/66160/commits/59e2825eeb717f9845569755396292677da944e4)
   
   
   DAG:
   ```python
   from airflow.sdk import DAG, task
   from airflow.sdk.definitions.asset import Asset, AssetAlias
   import pendulum
   
   my_asset = Asset(name="test_asset", uri="s3://bucket/test")
   my_alias = AssetAlias("test_alias")
   
   with DAG("alias_producer", schedule=None, start_date=pendulum.datetime(2026, 
1, 1)):
   
       @task(outlets=[my_alias, my_asset])
       def produce(**context):
           context["outlet_events"][my_alias].add(my_asset, extra={"run": "1"})
       produce()
   
   
   with DAG("alias_consumer", schedule=None, start_date=pendulum.datetime(2026, 
1, 1)):
   
       @task(inlets=[my_alias])
       def consume(**context):
           print("=== inlet_events ===")
           try:
               events = list(context["inlet_events"][my_alias])
               print(f"events: {events}")
           except Exception as e:
               print(f"inlet_events error: {e}")
   
           print("=== asset_state ===")
           print("asset_state:", context.get("asset_state"))
   
           try:
               state = context["asset_state"][my_asset]
               print("current watermark:", state.get("watermark"))
               state.set("watermark", "2026-05-06")
               print("set watermark to 2026-05-06")
               print("read back:", state.get("watermark"))
           except Exception as e:
               print(f"asset_state error: {e}")
   
       consume()
   
   ```
   
   When I do this, this is what I get from the consumer task:
   
   <img width="2447" height="1006" alt="image" 
src="https://github.com/user-attachments/assets/2cdd60e7-0792-431d-bf9d-254ca20c384f";
 />
   
   ie: producer emits through alias, consumer accesses 
`context['asset_state'][my_asset]`, set/get watermark works
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to