Copilot commented on code in PR #67438:
URL: https://github.com/apache/airflow/pull/67438#discussion_r3344998891


##########
providers/common/ai/src/airflow/providers/common/ai/operators/agent.py:
##########
@@ -215,86 +218,53 @@ def __init__(
             )
 
     @cached_property
-    def llm_hook(self) -> PydanticAIHook:
-        """Return PydanticAIHook for the configured LLM connection."""
+    def llm_hook(self) -> BaseAIHook:
+        """Return the agent hook for the configured connection (resolved from 
``conn_type``)."""
         hook_params = {
             "model_id": self.model_id,
         }
-        return PydanticAIHook.get_hook(self.llm_conn_id, 
hook_params=hook_params)
-
-    def _build_agent(self) -> Agent[None, Any]:
-        """Build and return a pydantic-ai Agent from the operator's config."""
-        extra_kwargs = dict(self.agent_params)
-        if self.toolsets:
-            toolsets = self.toolsets
-            if self.durable and self._durable_storage is not None and 
self._durable_counter is not None:
-                toolsets = self._build_durable_toolsets(
-                    toolsets, self._durable_storage, self._durable_counter
-                )
-            if self.enable_tool_logging:
-                toolsets = wrap_toolsets_for_logging(toolsets, self.log)
-            extra_kwargs["toolsets"] = toolsets
-        return self.llm_hook.create_agent(
-            output_type=self.output_type,
-            instructions=self.system_prompt,
-            **extra_kwargs,
-        )
-
-    def _build_durable_toolsets(
-        self, toolsets: list[AbstractToolset], storage: DurableStorage, 
counter: DurableStepCounter
-    ) -> list[AbstractToolset]:
-        """Wrap each toolset with CachingToolset for durable execution."""
-        from airflow.providers.common.ai.durable.caching_toolset import 
CachingToolset
-
-        return [CachingToolset(wrapped=ts, storage=storage, counter=counter) 
for ts in toolsets]
-
-    def execute(self, context: Context) -> Any:
-        if self.enable_hitl_review and not isinstance(self.prompt, str):
-            raise TypeError(
-                f"{type(self).__name__}: enable_hitl_review=True is not 
supported "
-                f"with a non-string prompt (got {type(self.prompt).__name__}). 
"
-                f"The HITL session model requires a string prompt. Return a 
str "
-                f"prompt, or disable enable_hitl_review."
-            )
-
-        self._durable_storage = None
-        self._durable_counter = None
-
-        if self.durable:
-            from airflow.providers.common.ai.durable.step_counter import 
DurableStepCounter
-            from airflow.providers.common.ai.durable.storage import 
DurableStorage
-
-            ti = context["task_instance"]
-            self._durable_storage = DurableStorage(
+        return BaseAIHook.get_agent_hook(self.llm_conn_id, 
hook_params=hook_params)
+
+    def _build_request(self, *, prompt: str, message_history: Any = None) -> 
AgentRunRequest:

Review Comment:
   `_build_request()` annotates `prompt` as `str`, but `AgentRunRequest.prompt` 
(and decorator validation) allows multimodal `Sequence[UserContent]` prompts. 
This mismatch makes the internal helper misleading and breaks static typing for 
the decorator path that forwards a `Sequence` prompt into 
`AgentOperator.execute()`.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,415 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Any, ClassVar, Generic, TypeVar
+
+from airflow.providers.common.ai.utils.callables import is_async_callable
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+AgentT = TypeVar("AgentT")
+
+
+class Capability(str, Enum):
+    """
+    Capability tokens declared by concrete hook classes.
+
+    A hook advertises its support by including the relevant tokens in its
+    :attr:`BaseAIHook.capabilities` frozenset.
+    :meth:`BaseAIHook.validate_run_request` rejects requests that use a
+    feature whose token is absent.
+    """
+
+    TOOLSETS = "toolsets"
+    USAGE_LIMITS = "usage_limits"
+    DURABLE = "durable"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int | None = None
+    tool_calls: int | None = None
+    input_tokens: int | None = None
+    output_tokens: int | None = None
+    total_tokens: int | None = None
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports. This contract is currently validated 
by
+    the pydantic-ai hook family and may evolve as more framework backends are 
added.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type or backend-specific 
JSON schema
+        mapping (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.

Review Comment:
   The `AgentRunRequest` docstring says `toolsets` is only a list of 
`BaseToolset` instances, but the contract and `_resolve_tools()` also accept 
plain callables and backend-native tool objects. This mismatch can mislead 
hook/tool authors about what they can pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to