gopidesupavan opened a new pull request, #67438:
URL: https://github.com/apache/airflow/pull/67438
# Add BaseAIHook and Update usage
## Summary
Introduce `BaseAIHook`, a backend-neutral contract for multi-turn LLM agents
in the `common-ai` provider. `AgentOperator` and `@task.agent` now resolve the
agent runtime from the connection `conn_type` (for example `pydanticai`,
`pydanticai-bedrock`, `pydanticai-azure`) and delegate all framework-specific
work to the hook.
`PydanticAIHook` is the first implementation. All LLM operators and
`LLMRetryPolicy` are migrated to a shared `AgentRunRequest` / `run_agent` API.
`SQLToolset` is migrated to the new framework-agnostic `BaseToolset` interface.
This lays the foundation for additional agent backends without adding
parallel operator classes per framework. A follow-up PR will add **AWS
Strands** as the next hook implementation; this contract also opens the door
for **Google ADK** and other agent runtimes behind the same `AgentOperator` /
`@task.agent` surface.
---
## Motivation
Before this change:
- `AgentOperator` contained pydantic-ai-specific logic (tool wrapping,
durable caching, agent construction).
- `PydanticAIHook.create_agent()` / `run_agent()` used ad-hoc keyword
arguments.
- Tool logging and durable execution were handled in the operator layer via
`LoggingToolset` / `CachingToolset` wrappers.
The operator should stay framework-agnostic. Hooks should own agent
lifecycle, tool resolution, durable execution, and normalized results.
---
## Design
### `BaseAIHook` contract
New abstract hook with:
| Method / property | Purpose |
|-------------------|---------|
| `get_model()` | Return backend model/client |
| `get_conn()` | Compatibility shim → `get_model()` |
| `create_agent(request)` | Build (but do not run) the agent |
| `run_agent(agent, request)` | Execute and return `AgentRunResult` |
| `_tool_spec_to_native(spec)` | Convert `ToolSpec` → native tool
representation |
| `get_agent_hook(conn_id)` | Resolve hook from connection `conn_type` |
Capability flags: `supports_toolsets`, `supports_durable`,
`supports_usage_limits`.
### Parameter objects
- **`AgentRunRequest`** — prompt, output type, instructions, toolsets, usage
limits, message history, durable context, agent params
- **`AgentRunResult`** — output, message history, model name, usage, tool
names, durable stats
- **`ToolSpec`** — framework-neutral tool descriptor (name, description,
JSON schema, callable)
- **`BaseToolset`** — abstract `as_tools() → list[ToolSpec]`
- **`DurableContext`** / **`DurableStats`** — durable execution identity and
cache statistics
### Shared hook helpers
Moved into `BaseAIHook`:
- `_resolve_tools()` — converts `BaseToolset`, plain callables, and native
tool objects
- `_logged_callable()` — per-tool real-time logging
- `_cached_callable()` — per-tool durable step caching
- `_init_durable()` — `DurableStorage` / `DurableStepCounter` setup
### `PydanticAIHook` implementation
- Implements full `BaseAIHook` contract
- Splits toolsets into two paths:
- **`AbstractToolset`** (`HookToolset`, `MCPToolset`, `DataFusionToolset`,
third-party) → `Agent(toolsets=[...])` with `LoggingToolset` / `CachingToolset`
wrapping when enabled
- **`BaseToolset` / callables / native `Tool`** → resolved via
`_resolve_tools` → `Agent(tools=[...])`
- Durable model caching via `CachingModel` in `run_agent`
- `get_model()` replaces direct `get_conn()` usage; `get_conn()` delegates
for backward compatibility
### `AgentOperator` thinning
Operator execution is now:
```python
request = self._build_request(prompt=self.prompt)
agent = self.llm_hook.create_agent(request)
run_result = self.llm_hook.run_agent(agent, request)
```
No pydantic-ai imports at runtime (except `UsageLimits` under
`TYPE_CHECKING`).
Early validation via `_validate_hook_capabilities()` checks hook support for
toolsets, durable, and usage limits.
### `SQLToolset` → `BaseToolset`
`SQLToolset` no longer implements pydantic-ai's `AbstractToolset`. It
implements `BaseToolset.as_tools()` returning four `ToolSpec` objects with JSON
schemas (`list_tables`, `get_schema`, `query`, `check_query`).
`HookToolset`, `MCPToolset`, and `DataFusionToolset` remain
`AbstractToolset` and continue to work unchanged through the pydantic-ai
routing path.
---
## Other changes
### All LLM operators migrated
These now use `BaseAIHook.get_agent_hook()` and `AgentRunRequest`:
- `LLMOperator`
- `LLMBranchOperator`
- `LLMSQLOperator`
- `LLMSchemaCompareOperator`
- `LLMFileAnalysisOperator`
- `LLMRetryPolicy`
### Logging utilities
- `log_run_summary()` now accepts `AgentRunResult` directly
- Removed `wrap_toolsets_for_logging()` from the operator path; logging is
handled in the hook layer
### Examples and docs
- Updated `example_pydantic_ai_hook.py` to use `BaseAIHook.get_agent_hook()`
+ `AgentRunRequest`
- Updated `docs/operators/agent.rst`, `docs/toolsets.rst`, `AGENTS.md`
- Changelog entry for the new contract
### Tests
- New `test_base_ai.py` — dataclasses, `_resolve_tools`, logging/caching
wrappers
- Expanded `test_pydantic_ai.py` — contract, durable init, `AbstractToolset`
routing/wrapping
- Updated operator, decorator, and policy tests to mock `BaseAIHook` and
assert `AgentRunRequest` forwarding
- Rewritten `test_sql.py` for `BaseToolset.as_tools()` API
---
## Breaking changes
### `PydanticAIHook` API
**Before:**
```python
agent = hook.create_agent(output_type=str, instructions="...",
toolsets=[...])
result = hook.run_agent(agent, prompt="hello", usage_limits=limits)
```
**After:**
```python
request = AgentRunRequest(prompt="hello", output_type=str,
instructions="...", toolsets=[...], usage_limits=limits)
agent = hook.create_agent(request)
result = hook.run_agent(agent, request)
```
`get_conn()` still works (delegates to `get_model()`).
### `SQLToolset` direct pydantic-ai usage
**Before:** pass `SQLToolset(...)` directly to pydantic-ai
`Agent(toolsets=[...])`.
**After:** use via `AgentOperator` / `@task.agent`, or build through the
hook:
```python
request = AgentRunRequest(prompt="...",
toolsets=[SQLToolset(db_conn_id="my_db")])
agent = hook.create_agent(request)
result = hook.run_agent(agent, request)
```
`SQLToolset` is now a `BaseToolset`, not an `AbstractToolset`.
---
## Migration guide
### Custom code calling `PydanticAIHook` directly
Replace kwargs-style `create_agent` / `run_agent` with `AgentRunRequest`:
```python
from airflow.providers.common.ai.hooks.base_ai import AgentRunRequest,
BaseAIHook
hook = BaseAIHook.get_agent_hook("pydanticai_default",
hook_params={"model_id": "openai:gpt-5"})
request = AgentRunRequest(
prompt="Analyze this dataset",
output_type=str,
instructions="You are a data analyst.",
toolsets=[SQLToolset(db_conn_id="postgres_default")],
)
agent = hook.create_agent(request)
result = hook.run_agent(agent, request)
print(result.output)
```
### DAG authors using operators / decorators
No DAG changes required for:
- `AgentOperator` / `@task.agent`
- `LLMOperator` / `@task.llm`
- Other LLM decorators
Connection `conn_type` continues to select the backend.
### Adding a new agent backend
Subclass `BaseAIHook` and implement:
1. `get_model()`
2. `create_agent(request)`
3. `run_agent(agent, request)`
4. `_tool_spec_to_native(spec)`
Register the hook in `provider.yaml`. Reuse shared helpers
(`_resolve_tools`, `_logged_callable`, `_cached_callable`, `_init_durable`)
where applicable.
---
## Known limitations / follow-ups
- **`ToolSpec.parameters`** are not yet forwarded to pydantic-ai `Tool` in
`_tool_spec_to_native`; schemas may be inferred from bound-method type hints
instead.
- **`AgentRunRequest.prompt`** is typed as `str`; widening to `str |
Sequence[UserContent]` may be needed when merging with multimodal prompt
support ([#67389](https://github.com/apache/airflow/pull/67389)).
- **`HookToolset` / `DataFusionToolset`** could be migrated to `BaseToolset`
in a follow-up; they work today via the `AbstractToolset` pass-through path.
---
## Test plan
- [x] `tests/unit/common/ai/hooks/test_base_ai.py`
- [x] `tests/unit/common/ai/hooks/test_pydantic_ai.py`
- [x] `tests/unit/common/ai/operators/test_agent.py`
- [x] `tests/unit/common/ai/operators/test_llm.py`
- [x] `tests/unit/common/ai/operators/test_llm_branch.py`
- [x] `tests/unit/common/ai/operators/test_llm_sql.py`
- [x] `tests/unit/common/ai/operators/test_llm_schema_compare.py`
- [x] `tests/unit/common/ai/operators/test_llm_file_analysis.py`
- [x] `tests/unit/common/ai/decorators/test_agent.py`
- [x] `tests/unit/common/ai/decorators/test_llm*.py`
- [x] `tests/unit/common/ai/policies/test_retry.py`
- [x] `tests/unit/common/ai/toolsets/test_sql.py`
- [x] `tests/unit/common/ai/utils/test_logging.py`
- Follow-up: **AWS Strands** agent hook (`StrandsAIHook` implementing
`BaseAIHook`)
- Follow-up: **Google ADK** agent hook (same contract, new `conn_type`
registration)
- Follow-up: migrate **`HookToolset`**, **`MCPToolset`**, and
**`DataFusionToolset`** from pydantic-ai `AbstractToolset` to `BaseToolset`
##### Was generative AI tooling used to co-author this PR?
- [ ] Yes
- [ ] No
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [ ] Yes (please specify the tool below)
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]