kaxil opened a new pull request, #62825:
URL: https://github.com/apache/airflow/pull/62825
## Summary
Adds `AgentOperator` and the `@task.agent` TaskFlow decorator for multi-turn
agentic LLM workflows. Unlike `LLMOperator` (single prompt → single response),
the agent reasons about the prompt, calls tools (SQL queries, hook methods,
etc.) in a loop, and returns a final answer.
Builds on #62785 which added `SQLToolset` and `HookToolset` — those provide
the tools, this PR provides the operator/decorator that wires them to an LLM
agent.
## Design
`AgentOperator` is a thin wrapper around pydantic-ai's `Agent.run_sync()`:
1. Creates a `PydanticAIHook` from `llm_conn_id` / `model_id`
2. Calls `hook.create_agent()` with the system prompt, output type, and any
toolsets
3. Runs the agent synchronously and returns the output (serialized via
`model_dump()` for BaseModel outputs)
The operator deliberately does not manage conversation history, streaming,
or async execution — those are future concerns. It runs the agent to completion
in a single `execute()` call.
`@task.agent` wraps `AgentOperator` as a `DecoratedOperator`. The user
function returns the prompt string; all other params (`llm_conn_id`,
`toolsets`, `system_prompt`, etc.) are decorator kwargs. Same pattern as
`@task.llm` and `@task.llm_sql`.
### Why `output_type` instead of generics?
pydantic-ai's `Agent` is generic over output type, but Airflow operators
aren't generic. We pass `output_type` as a runtime parameter and serialize
BaseModel outputs via `model_dump()` for XCom compatibility.
## Usage
```python
# Operator form
AgentOperator(
task_id="analyst",
prompt="What are the top 5 customers by order count?",
llm_conn_id="my_llm",
system_prompt="You are a SQL analyst.",
toolsets=[SQLToolset(db_conn_id="postgres_default",
allowed_tables=["customers", "orders"])],
)
# Decorator form
@task.agent(
llm_conn_id="my_llm",
system_prompt="You are a data analyst.",
toolsets=[SQLToolset(db_conn_id="postgres_default")],
)
def analyze(question: str):
return f"Answer: {question}"
```
Structured output with Pydantic models:
```python
class Analysis(BaseModel):
summary: str
top_items: list[str]
@task.agent(llm_conn_id="my_llm", output_type=Analysis, ...)
def analyze(question: str):
return f"Analyze: {question}"
```
## Limitations
- **Synchronous only** — uses `Agent.run_sync()`. Async/streaming support is
out of scope.
- **No conversation memory** — each `execute()` is a fresh agent run.
Multi-turn conversations across task instances would need a separate
persistence layer.
- **Toolsets are not serializable** — they can't go through XCom or be
templated. Must be instantiated at DAG parse time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]