jgoedeke commented on issue #57564:
URL: https://github.com/apache/airflow/issues/57564#issuecomment-3495875957

   I found a good workaround to get a typed object out of the extra, which 
works with 3.1.2:
   
   Pass a pydantic BaseModel into the extra field and use `.model_validate()` 
when retrieving the extra. Airflow serializes the extra automatically when 
writing it into the database. Here is the modified example from above:
   
   ```python
   from datetime import UTC, datetime
   from pathlib import Path
   
   from airflow.sdk import Asset, Metadata, dag, task
   from pydantic import BaseModel
   
   
   class AssetExtra(BaseModel):
       path: Path
   
   
   class AssetEventExtra(BaseModel):
       timestamp: datetime
   
   
   asset = Asset(
       's3://bucket/mydata.csv',
       extra=AssetExtra(
           path=Path('/some/local/path.txt'),
       ),
   )
   
   
   @dag(schedule=None)
   def producer_dag():
       @task(outlets=[asset])
       def produce_asset():
           yield Metadata(asset, 
AssetEventExtra(timestamp=datetime.now(tz=UTC)))
   
       produce_asset()
   
   
   @dag(schedule=[asset])
   def consumer_dag():
       @task(inlets=[asset])
       def consume_asset(inlet_events):
           events: list[Metadata] = inlet_events[asset]
           for event in events:
               event_extra = AssetEventExtra.model_validate(event.extra)
               asset_extra = AssetExtra.model_validate(event.asset.extra)
               print(f'Asset extra: {asset_extra }')
               print(f'Extra metadata: {event_extra }')
   
       consume_asset()
   
   
   producer_dag()
   consumer_dag()
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to