kaxil opened a new pull request, #67644:
URL: https://github.com/apache/airflow/pull/67644
## Summary
When you pass `output_type=SomePydanticModel` to `LLMOperator` /
`LLMAgentOperator` / `LLMFileAnalysisOperator` (or the matching `@task.llm` /
`@task.agent` / `@task.llm_file_analysis` decorators), the operator used to
call `model_dump()` on the result before pushing it to XCom. So even though the
upstream declared a type, the downstream task got a dict and had to use
`analysis["priority"]` instead of `analysis.priority`.
This PR drops that `model_dump()` call. The Pydantic instance flows through
XCom as-is. Downstream tasks can type-hint the class and use attribute access.
Motivated by Jed's question :) "It would be nice if that could be
`List[TicketAnalysis]` instead" of `list[dict]`.
## What it looks like
```python
class TicketAnalysis(BaseModel):
priority: str
category: str
@task.llm(llm_conn_id="pydanticai_default", output_type=TicketAnalysis)
def analyze(ticket: str) -> str:
return f"Analyze: {ticket}"
@task
def store(analyses: list[TicketAnalysis]): # was: list[dict]
for a in analyses:
print(a.priority, a.category) # was: a["priority"]
```
## How it works (and why this design)
The Pydantic deserializer that turns the wire bytes back into a class
instance already exists (see
[`task-sdk/.../serde/__init__.py:286`](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/serde/__init__.py#L286)).
What gates it is the allow-list check at `serde/__init__.py:264`, which only
lets through classnames that match `[core] allowed_deserialization_classes` or
live in the process-local `_extra_allowed` set.
Lifting that gate for all Pydantic models is what bolkedebruin pushed back
on in PR [#51059](https://github.com/apache/airflow/pull/51059):
`import_string` runs before any type check, and Pydantic validators
(`@field_validator`, `@model_validator`) can execute arbitrary code during
`model_validate`. So "trust any class as long as it inherits `BaseModel`"
reopens an attack surface that was deliberately closed.
But in [his same
review](https://github.com/apache/airflow/pull/51059#issuecomment-2922059876)
he flagged the door:
> "We might need to think of a mechanism that allows serializers to register
'allowed' classes, but that's probably out of scope for now (let's not include
it now)."
This PR walks through that door. New helper in `airflow.sdk.serde`:
```python
def allow_class(cls: type) -> None:
# adds qualname(cls) to the process-local _extra_allowed set
```
Each LLM operator calls `allow_class(output_type)` from `__init__`. The
threat model is the same as a config edit: the DAG author put
`output_type=MyModel` in code that's already trusted. An attacker who can
change that argument already has DAG-file write access, which is RCE.
The reason same-DAG downstream tasks just work without any config: every
worker that runs *any* task in the DAG parses the DAG file at startup, which
re-runs every operator's `__init__`, which calls `allow_class` again.
Process-local, idempotent.
`output_type` can be a single class, a Union, an Optional, a `list[Model]`,
etc. (pydantic-ai accepts all of those). The new `iter_base_model_classes`
helper walks the type tree and registers each reachable `BaseModel` so
Union/Optional outputs work too.
## Working demo
Local run with a minimal DAG that mirrors what
`LLMOperator(output_type=TicketAnalysis)` does internally -- registers the
class via `allow_class`, returns the instance from one task, attribute-accesses
it from the downstream task.
<img width="1280" height="720" alt="dag_run_grid"
src="https://github.com/user-attachments/assets/6e05a78c-48fc-4cc3-88e6-6e467338543d"
/>
<img width="1920" height="1080" alt="produce_xcom_wide"
src="https://github.com/user-attachments/assets/08f2fde1-cec5-4119-8b1b-8a66320ab2e6"
/>
The producer task's log line confirms the value flows as a Pydantic
instance, not a dict:
```
INFO - Done. Returned value was: priority='high' category='bug'
summary='Nightly ETL failing with Postgres connection timeout'
suggested_action='Page the on-call DBA and check connection pool size'
```
The consumer task receives it as `TicketAnalysis` and uses attribute access.
The UI XCom viewer renders it via the existing `stringify` path:
```
<module>.TicketAnalysis@version=1(priority=high,category=bug,summary=...,suggested_action=...)
```
Not pretty (no field-by-field rendering today), but the value shows without
any allow-list edit on the deployment.
## What doesn't get auto-registered
One path still needs `[core] allowed_deserialization_classes` updated:
`xcom_pull` from a *different* DAG. The consumer DAG's worker only parses its
own DAG file, so the producer's `LLMOperator.__init__` never runs there. That
case is the same as today and is called out in the operator guides.
The UI XCom display goes through `stringify` (not `deserialize`), so it
works without config -- I confirmed this in the live demo above. Earlier drafts
of this PR overstated the limitation; the docs in this version match the actual
behaviour.
## Fail-fast on classes that can't round-trip
`allow_class` rejects classes whose qualname can't be re-imported:
- defined inside a function body (`<locals>` in `__qualname__`)
- nested inside another class (dotted `__qualname__`)
- dynamically built with a mismatched `__name__` (e.g. `MyModel =
pydantic.create_model("Different", ...)`)
- parametrised generics (`Result[int]`)
Without this guard the failure shows up at the downstream consumer's
`import_string()` call with no hint at the root cause. With the guard it raises
a clear `ValueError` at DAG parse time, pointing at the operator that owns the
bad `output_type`. The example DAGs that previously defined their Pydantic
class inside the `@dag` body are updated to put them at module scope.
## Backwards compatibility
Provider is in `incubation` lifecycle (`0.3.0`), so the breaking change to
the XCom value shape is permitted by the API contract reservation. Migration
note added to the top of
[`providers/common/ai/docs/changelog.rst`](https://github.com/apache/airflow/blob/main/providers/common/ai/docs/changelog.rst).
The `allow_class` helper itself is new to `airflow.sdk.serde`. The provider
still declares `apache-airflow>=3.0.0`, so I added a try/except import: when
running against an older Airflow that doesn't have `allow_class`, the operators
fall back to `model_dump()` (the previous behaviour). Users on the new Airflow
get the typed path; users on older versions keep the dict path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]