kaxil opened a new pull request, #67644:
URL: https://github.com/apache/airflow/pull/67644

   ## Summary
   
   When you pass `output_type=SomePydanticModel` to `LLMOperator` / 
`LLMAgentOperator` / `LLMFileAnalysisOperator` (or the matching `@task.llm` / 
`@task.agent` / `@task.llm_file_analysis` decorators), the operator used to 
call `model_dump()` on the result before pushing it to XCom. So even though the 
upstream declared a type, the downstream task got a dict and had to use 
`analysis["priority"]` instead of `analysis.priority`.
   
   This PR drops that `model_dump()` call. The Pydantic instance flows through 
XCom as-is. Downstream tasks can type-hint the class and use attribute access.
   
   Motivated by Jed's question :) "It would be nice if that could be 
`List[TicketAnalysis]` instead" of `list[dict]`.
   
   ## What it looks like
   
   ```python
   class TicketAnalysis(BaseModel):
       priority: str
       category: str
   
   @task.llm(llm_conn_id="pydanticai_default", output_type=TicketAnalysis)
   def analyze(ticket: str) -> str:
       return f"Analyze: {ticket}"
   
   @task
   def store(analyses: list[TicketAnalysis]):       # was: list[dict]
       for a in analyses:
           print(a.priority, a.category)            # was: a["priority"]
   ```
   
   ## How it works (and why this design)
   
   The Pydantic deserializer that turns the wire bytes back into a class 
instance already exists (see 
[`task-sdk/.../serde/__init__.py:286`](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/serde/__init__.py#L286)).
 What gates it is the allow-list check at `serde/__init__.py:264`, which only 
lets through classnames that match `[core] allowed_deserialization_classes` or 
live in the process-local `_extra_allowed` set.
   
   Lifting that gate for all Pydantic models is what bolkedebruin pushed back 
on in PR [#51059](https://github.com/apache/airflow/pull/51059): 
`import_string` runs before any type check, and Pydantic validators 
(`@field_validator`, `@model_validator`) can execute arbitrary code during 
`model_validate`. So "trust any class as long as it inherits `BaseModel`" 
reopens an attack surface that was deliberately closed.
   
   But in [his same 
review](https://github.com/apache/airflow/pull/51059#issuecomment-2922059876) 
he flagged the door:
   
   > "We might need to think of a mechanism that allows serializers to register 
'allowed' classes, but that's probably out of scope for now (let's not include 
it now)."
   
   This PR walks through that door. New helper in `airflow.sdk.serde`:
   
   ```python
   def allow_class(cls: type) -> None:
       # adds qualname(cls) to the process-local _extra_allowed set
   ```
   
   Each LLM operator calls `allow_class(output_type)` from `__init__`. The 
threat model is the same as a config edit: the DAG author put 
`output_type=MyModel` in code that's already trusted. An attacker who can 
change that argument already has DAG-file write access, which is RCE.
   
   The reason same-DAG downstream tasks just work without any config: every 
worker that runs *any* task in the DAG parses the DAG file at startup, which 
re-runs every operator's `__init__`, which calls `allow_class` again. 
Process-local, idempotent.
   
   `output_type` can be a single class, a Union, an Optional, a `list[Model]`, 
etc. (pydantic-ai accepts all of those). The new `iter_base_model_classes` 
helper walks the type tree and registers each reachable `BaseModel` so 
Union/Optional outputs work too.
   
   ## Working demo
   
   Local run with a minimal DAG that mirrors what 
`LLMOperator(output_type=TicketAnalysis)` does internally -- registers the 
class via `allow_class`, returns the instance from one task, attribute-accesses 
it from the downstream task.
   
   <img width="1280" height="720" alt="dag_run_grid" 
src="https://github.com/user-attachments/assets/6e05a78c-48fc-4cc3-88e6-6e467338543d";
 />
   <img width="1920" height="1080" alt="produce_xcom_wide" 
src="https://github.com/user-attachments/assets/08f2fde1-cec5-4119-8b1b-8a66320ab2e6";
 />
   
   
   The producer task's log line confirms the value flows as a Pydantic 
instance, not a dict:
   
   ```
   INFO - Done. Returned value was: priority='high' category='bug'
          summary='Nightly ETL failing with Postgres connection timeout'
          suggested_action='Page the on-call DBA and check connection pool size'
   ```
   
   The consumer task receives it as `TicketAnalysis` and uses attribute access. 
The UI XCom viewer renders it via the existing `stringify` path:
   
   ```
   
<module>.TicketAnalysis@version=1(priority=high,category=bug,summary=...,suggested_action=...)
   ```
   
   Not pretty (no field-by-field rendering today), but the value shows without 
any allow-list edit on the deployment.
   
   ## What doesn't get auto-registered
   
   One path still needs `[core] allowed_deserialization_classes` updated: 
`xcom_pull` from a *different* DAG. The consumer DAG's worker only parses its 
own DAG file, so the producer's `LLMOperator.__init__` never runs there. That 
case is the same as today and is called out in the operator guides.
   
   The UI XCom display goes through `stringify` (not `deserialize`), so it 
works without config -- I confirmed this in the live demo above. Earlier drafts 
of this PR overstated the limitation; the docs in this version match the actual 
behaviour.
   
   ## Fail-fast on classes that can't round-trip
   
   `allow_class` rejects classes whose qualname can't be re-imported:
   
   - defined inside a function body (`<locals>` in `__qualname__`)
   - nested inside another class (dotted `__qualname__`)
   - dynamically built with a mismatched `__name__` (e.g. `MyModel = 
pydantic.create_model("Different", ...)`)
   - parametrised generics (`Result[int]`)
   
   Without this guard the failure shows up at the downstream consumer's 
`import_string()` call with no hint at the root cause. With the guard it raises 
a clear `ValueError` at DAG parse time, pointing at the operator that owns the 
bad `output_type`. The example DAGs that previously defined their Pydantic 
class inside the `@dag` body are updated to put them at module scope.
   
   ## Backwards compatibility
   
   Provider is in `incubation` lifecycle (`0.3.0`), so the breaking change to 
the XCom value shape is permitted by the API contract reservation. Migration 
note added to the top of 
[`providers/common/ai/docs/changelog.rst`](https://github.com/apache/airflow/blob/main/providers/common/ai/docs/changelog.rst).
   
   The `allow_class` helper itself is new to `airflow.sdk.serde`. The provider 
still declares `apache-airflow>=3.0.0`, so I added a try/except import: when 
running against an older Airflow that doesn't have `allow_class`, the operators 
fall back to `model_dump()` (the previous behaviour). Users on the new Airflow 
get the typed path; users on older versions keep the dict path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to