gopidesupavan opened a new pull request, #67438:
URL: https://github.com/apache/airflow/pull/67438

    # Add BaseAIHook and Update usage
   
   ## Summary
   
   Introduce `BaseAIHook`, a backend-neutral contract for multi-turn LLM agents 
in the `common-ai` provider. `AgentOperator` and `@task.agent` now resolve the 
agent runtime from the connection `conn_type` (for example `pydanticai`, 
`pydanticai-bedrock`, `pydanticai-azure`) and delegate all framework-specific 
work to the hook.
   
   `PydanticAIHook` is the first implementation. All LLM operators and 
`LLMRetryPolicy` are migrated to a shared `AgentRunRequest` / `run_agent` API. 
`SQLToolset` is migrated to the new framework-agnostic `BaseToolset` interface.
   
   This lays the foundation for additional agent backends without adding 
parallel operator classes per framework. A follow-up PR will add **AWS 
Strands** as the next hook implementation; this contract also opens the door 
for **Google ADK** and other agent runtimes behind the same `AgentOperator` / 
`@task.agent` surface.
   
   ---
   
   ## Motivation
   
   Before this change:
   
   - `AgentOperator` contained pydantic-ai-specific logic (tool wrapping, 
durable caching, agent construction).
   - `PydanticAIHook.create_agent()` / `run_agent()` used ad-hoc keyword 
arguments.
   - Tool logging and durable execution were handled in the operator layer via 
`LoggingToolset` / `CachingToolset` wrappers.
   
   The operator should stay framework-agnostic. Hooks should own agent 
lifecycle, tool resolution, durable execution, and normalized results.
   
   ---
   
   ## Design
   
   ### `BaseAIHook` contract
   
   New abstract hook with:
   
   | Method / property | Purpose |
   |-------------------|---------|
   | `get_model()` | Return backend model/client |
   | `get_conn()` | Compatibility shim → `get_model()` |
   | `create_agent(request)` | Build (but do not run) the agent |
   | `run_agent(agent, request)` | Execute and return `AgentRunResult` |
   | `_tool_spec_to_native(spec)` | Convert `ToolSpec` → native tool 
representation |
   | `get_agent_hook(conn_id)` | Resolve hook from connection `conn_type` |
   
   Capability flags: `supports_toolsets`, `supports_durable`, 
`supports_usage_limits`.
   
   ### Parameter objects
   
   - **`AgentRunRequest`** — prompt, output type, instructions, toolsets, usage 
limits, message history, durable context, agent params
   - **`AgentRunResult`** — output, message history, model name, usage, tool 
names, durable stats
   - **`ToolSpec`** — framework-neutral tool descriptor (name, description, 
JSON schema, callable)
   - **`BaseToolset`** — abstract `as_tools() → list[ToolSpec]`
   - **`DurableContext`** / **`DurableStats`** — durable execution identity and 
cache statistics
   
   ### Shared hook helpers
   
   Moved into `BaseAIHook`:
   
   - `_resolve_tools()` — converts `BaseToolset`, plain callables, and native 
tool objects
   - `_logged_callable()` — per-tool real-time logging
   - `_cached_callable()` — per-tool durable step caching
   - `_init_durable()` — `DurableStorage` / `DurableStepCounter` setup
   
   ### `PydanticAIHook` implementation
   
   - Implements full `BaseAIHook` contract
   - Splits toolsets into two paths:
     - **`AbstractToolset`** (`HookToolset`, `MCPToolset`, `DataFusionToolset`, 
third-party) → `Agent(toolsets=[...])` with `LoggingToolset` / `CachingToolset` 
wrapping when enabled
     - **`BaseToolset` / callables / native `Tool`** → resolved via 
`_resolve_tools` → `Agent(tools=[...])`
   - Durable model caching via `CachingModel` in `run_agent`
   - `get_model()` replaces direct `get_conn()` usage; `get_conn()` delegates 
for backward compatibility
   
   ### `AgentOperator` thinning
   
   Operator execution is now:
   
   ```python
   request = self._build_request(prompt=self.prompt)
   agent = self.llm_hook.create_agent(request)
   run_result = self.llm_hook.run_agent(agent, request)
   ```
   
   No pydantic-ai imports at runtime (except `UsageLimits` under 
`TYPE_CHECKING`).
   
   Early validation via `_validate_hook_capabilities()` checks hook support for 
toolsets, durable, and usage limits.
   
   ### `SQLToolset` → `BaseToolset`
   
   `SQLToolset` no longer implements pydantic-ai's `AbstractToolset`. It 
implements `BaseToolset.as_tools()` returning four `ToolSpec` objects with JSON 
schemas (`list_tables`, `get_schema`, `query`, `check_query`).
   
   `HookToolset`, `MCPToolset`, and `DataFusionToolset` remain 
`AbstractToolset` and continue to work unchanged through the pydantic-ai 
routing path.
   
   ---
   
   ## Other changes
   
   ### All LLM operators migrated
   
   These now use `BaseAIHook.get_agent_hook()` and `AgentRunRequest`:
   
   - `LLMOperator`
   - `LLMBranchOperator`
   - `LLMSQLOperator`
   - `LLMSchemaCompareOperator`
   - `LLMFileAnalysisOperator`
   - `LLMRetryPolicy`
   
   ### Logging utilities
   
   - `log_run_summary()` now accepts `AgentRunResult` directly
   - Removed `wrap_toolsets_for_logging()` from the operator path; logging is 
handled in the hook layer
   
   ### Examples and docs
   
   - Updated `example_pydantic_ai_hook.py` to use `BaseAIHook.get_agent_hook()` 
+ `AgentRunRequest`
   - Updated `docs/operators/agent.rst`, `docs/toolsets.rst`, `AGENTS.md`
   - Changelog entry for the new contract
   
   ### Tests
   
   - New `test_base_ai.py` — dataclasses, `_resolve_tools`, logging/caching 
wrappers
   - Expanded `test_pydantic_ai.py` — contract, durable init, `AbstractToolset` 
routing/wrapping
   - Updated operator, decorator, and policy tests to mock `BaseAIHook` and 
assert `AgentRunRequest` forwarding
   - Rewritten `test_sql.py` for `BaseToolset.as_tools()` API
   
   ---
   
   ## Breaking changes
   
   ### `PydanticAIHook` API
   
   **Before:**
   
   ```python
   agent = hook.create_agent(output_type=str, instructions="...", 
toolsets=[...])
   result = hook.run_agent(agent, prompt="hello", usage_limits=limits)
   ```
   
   **After:**
   
   ```python
   request = AgentRunRequest(prompt="hello", output_type=str, 
instructions="...", toolsets=[...], usage_limits=limits)
   agent = hook.create_agent(request)
   result = hook.run_agent(agent, request)
   ```
   
   `get_conn()` still works (delegates to `get_model()`).
   
   ### `SQLToolset` direct pydantic-ai usage
   
   **Before:** pass `SQLToolset(...)` directly to pydantic-ai 
`Agent(toolsets=[...])`.
   
   **After:** use via `AgentOperator` / `@task.agent`, or build through the 
hook:
   
   ```python
   request = AgentRunRequest(prompt="...", 
toolsets=[SQLToolset(db_conn_id="my_db")])
   agent = hook.create_agent(request)
   result = hook.run_agent(agent, request)
   ```
   
   `SQLToolset` is now a `BaseToolset`, not an `AbstractToolset`.
   
   ---
   
   ## Migration guide
   
   ### Custom code calling `PydanticAIHook` directly
   
   Replace kwargs-style `create_agent` / `run_agent` with `AgentRunRequest`:
   
   ```python
   from airflow.providers.common.ai.hooks.base_ai import AgentRunRequest, 
BaseAIHook
   
   hook = BaseAIHook.get_agent_hook("pydanticai_default", 
hook_params={"model_id": "openai:gpt-5"})
   request = AgentRunRequest(
       prompt="Analyze this dataset",
       output_type=str,
       instructions="You are a data analyst.",
       toolsets=[SQLToolset(db_conn_id="postgres_default")],
   )
   agent = hook.create_agent(request)
   result = hook.run_agent(agent, request)
   print(result.output)
   ```
   
   ### DAG authors using operators / decorators
   
   No DAG changes required for:
   
   - `AgentOperator` / `@task.agent`
   - `LLMOperator` / `@task.llm`
   - Other LLM decorators
   
   Connection `conn_type` continues to select the backend.
   
   ### Adding a new agent backend
   
   Subclass `BaseAIHook` and implement:
   
   1. `get_model()`
   2. `create_agent(request)`
   3. `run_agent(agent, request)`
   4. `_tool_spec_to_native(spec)`
   
   Register the hook in `provider.yaml`. Reuse shared helpers 
(`_resolve_tools`, `_logged_callable`, `_cached_callable`, `_init_durable`) 
where applicable.
   
   ---
   
   ## Known limitations / follow-ups
   
   - **`ToolSpec.parameters`** are not yet forwarded to pydantic-ai `Tool` in 
`_tool_spec_to_native`; schemas may be inferred from bound-method type hints 
instead.
   - **`AgentRunRequest.prompt`** is typed as `str`; widening to `str | 
Sequence[UserContent]` may be needed when merging with multimodal prompt 
support ([#67389](https://github.com/apache/airflow/pull/67389)).
   - **`HookToolset` / `DataFusionToolset`** could be migrated to `BaseToolset` 
in a follow-up; they work today via the `AbstractToolset` pass-through path.
   
   ---
   
   ## Test plan
   
   - [x] `tests/unit/common/ai/hooks/test_base_ai.py`
   - [x] `tests/unit/common/ai/hooks/test_pydantic_ai.py`
   - [x] `tests/unit/common/ai/operators/test_agent.py`
   - [x] `tests/unit/common/ai/operators/test_llm.py`
   - [x] `tests/unit/common/ai/operators/test_llm_branch.py`
   - [x] `tests/unit/common/ai/operators/test_llm_sql.py`
   - [x] `tests/unit/common/ai/operators/test_llm_schema_compare.py`
   - [x] `tests/unit/common/ai/operators/test_llm_file_analysis.py`
   - [x] `tests/unit/common/ai/decorators/test_agent.py`
   - [x] `tests/unit/common/ai/decorators/test_llm*.py`
   - [x] `tests/unit/common/ai/policies/test_retry.py`
   - [x] `tests/unit/common/ai/toolsets/test_sql.py`
   - [x] `tests/unit/common/ai/utils/test_logging.py`
   
   
   - Follow-up: **AWS Strands** agent hook (`StrandsAIHook` implementing 
`BaseAIHook`)
   - Follow-up: **Google ADK** agent hook (same contract, new `conn_type` 
registration)
   - Follow-up: migrate **`HookToolset`**, **`MCPToolset`**, and 
**`DataFusionToolset`** from pydantic-ai `AbstractToolset` to `BaseToolset`
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [ ] Yes
   - [ ] No
   
   
   <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [ ] Yes (please specify the tool below)
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to