jgoedeke commented on issue #57564:
URL: https://github.com/apache/airflow/issues/57564#issuecomment-3495875957
I found a good workaround to get a typed object out of the extra, which
works with 3.1.2:
Pass a pydantic BaseModel into the extra field and use `.model_validate()`
when retrieving the extra. Airflow serializes the extra automatically when
writing it into the database. Here is the modified example from above:
```python
from datetime import UTC, datetime
from pathlib import Path
from airflow.sdk import Asset, Metadata, dag, task
from pydantic import BaseModel
class AssetExtra(BaseModel):
path: Path
class AssetEventExtra(BaseModel):
timestamp: datetime
asset = Asset(
's3://bucket/mydata.csv',
extra=AssetExtra(
path=Path('/some/local/path.txt'),
),
)
@dag(schedule=None)
def producer_dag():
@task(outlets=[asset])
def produce_asset():
yield Metadata(asset,
AssetEventExtra(timestamp=datetime.now(tz=UTC)))
produce_asset()
@dag(schedule=[asset])
def consumer_dag():
@task(inlets=[asset])
def consume_asset(inlet_events):
events: list[Metadata] = inlet_events[asset]
for event in events:
event_extra = AssetEventExtra.model_validate(event.extra)
asset_extra = AssetExtra.model_validate(event.asset.extra)
print(f'Asset extra: {asset_extra }')
print(f'Extra metadata: {event_extra }')
consume_asset()
producer_dag()
consumer_dag()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]