kaxil opened a new pull request, #62825:
URL: https://github.com/apache/airflow/pull/62825

   ## Summary
   
   Adds `AgentOperator` and the `@task.agent` TaskFlow decorator for multi-turn 
agentic LLM workflows. Unlike `LLMOperator` (single prompt → single response), 
the agent reasons about the prompt, calls tools (SQL queries, hook methods, 
etc.) in a loop, and returns a final answer.
   
   Builds on #62785 which added `SQLToolset` and `HookToolset` — those provide 
the tools, this PR provides the operator/decorator that wires them to an LLM 
agent.
   
   ## Design
   
   `AgentOperator` is a thin wrapper around pydantic-ai's `Agent.run_sync()`:
   
   1. Creates a `PydanticAIHook` from `llm_conn_id` / `model_id`
   2. Calls `hook.create_agent()` with the system prompt, output type, and any 
toolsets
   3. Runs the agent synchronously and returns the output (serialized via 
`model_dump()` for BaseModel outputs)
   
   The operator deliberately does not manage conversation history, streaming, 
or async execution — those are future concerns. It runs the agent to completion 
in a single `execute()` call.
   
   `@task.agent` wraps `AgentOperator` as a `DecoratedOperator`. The user 
function returns the prompt string; all other params (`llm_conn_id`, 
`toolsets`, `system_prompt`, etc.) are decorator kwargs. Same pattern as 
`@task.llm` and `@task.llm_sql`.
   
   ### Why `output_type` instead of generics?
   
   pydantic-ai's `Agent` is generic over output type, but Airflow operators 
aren't generic. We pass `output_type` as a runtime parameter and serialize 
BaseModel outputs via `model_dump()` for XCom compatibility.
   
   ## Usage
   
   ```python
   # Operator form
   AgentOperator(
       task_id="analyst",
       prompt="What are the top 5 customers by order count?",
       llm_conn_id="my_llm",
       system_prompt="You are a SQL analyst.",
       toolsets=[SQLToolset(db_conn_id="postgres_default", 
allowed_tables=["customers", "orders"])],
   )
   
   # Decorator form
   @task.agent(
       llm_conn_id="my_llm",
       system_prompt="You are a data analyst.",
       toolsets=[SQLToolset(db_conn_id="postgres_default")],
   )
   def analyze(question: str):
       return f"Answer: {question}"
   ```
   
   Structured output with Pydantic models:
   
   ```python
   class Analysis(BaseModel):
       summary: str
       top_items: list[str]
   
   @task.agent(llm_conn_id="my_llm", output_type=Analysis, ...)
   def analyze(question: str):
       return f"Analyze: {question}"
   ```
   
   ## Limitations
   
   - **Synchronous only** — uses `Agent.run_sync()`. Async/streaming support is 
out of scope.
   - **No conversation memory** — each `execute()` is a fresh agent run. 
Multi-turn conversations across task instances would need a separate 
persistence layer.
   - **Toolsets are not serializable** — they can't go through XCom or be 
templated. Must be instantiated at DAG parse time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to