This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 28a205bc87e Add a bridge to expose `common.ai` toolsets as LangChain 
tools (#67791)
28a205bc87e is described below

commit 28a205bc87eb9cd076be9d0020ad3179517d8c97
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Jun 1 11:21:29 2026 +0100

    Add a bridge to expose `common.ai` toolsets as LangChain tools (#67791)
    
    * Add a bridge to expose common.ai toolsets as LangChain tools
    
    common.ai's curated toolsets (SQLToolset, HookToolset, MCPToolset) are
    pydantic-ai AbstractToolsets and already work natively with AgentOperator.
    Add the reverse direction: airflow_toolset_to_langchain_tools(toolset)
    converts any of them into LangChain StructuredTool objects, so a LangChain
    agent or chain running inside an Airflow task can call Airflow's
    connection-managed, validated tools.
    
    The forward direction (LangChain tools into AgentOperator) is already 
covered
    by pydantic-ai's upstream pydantic_ai.ext.langchain.LangChainToolset, so 
this
    keeps both bridge directions in common.ai rather than introducing a separate
    provider for a single converter function.
    
    * Handle ModelRetry from bridged toolsets so LangChain agents self-correct
    
    Toolsets like SQLToolset raise pydantic-ai's ModelRetry to ask the model to
    correct its input (for example an unknown column). pydantic-ai turns that 
into a
    retry prompt, but LangChain does not know the type, and under create_agent's
    default tool-error handling any tool exception aborts the run.
    
    Catch ModelRetry in the bridge and return its message as the tool output, 
so the
    model sees the guidance and tries again. This mirrors ModelRetry's intent 
-- a
    feed-the-model-and-retry signal, not a failure -- and works regardless of 
how
    the agent is configured to handle tool errors.
---
 docs/spelling_wordlist.txt                         |   2 +
 providers/common/ai/docs/toolsets.rst              |  65 +++++++
 .../example_langchain_toolset_bridge.py            |  93 ++++++++++
 .../providers/common/ai/toolsets/__init__.py       |   8 +-
 .../common/ai/toolsets/langchain_bridge.py         | 172 ++++++++++++++++++
 .../common/ai/toolsets/test_langchain_bridge.py    | 193 +++++++++++++++++++++
 6 files changed, 532 insertions(+), 1 deletion(-)

diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index d8837585eed..f8e7efca67f 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -955,6 +955,7 @@ Kylin
 kylin
 Kyverno
 Lakehouse
+langchain
 LanguageServiceClient
 lastname
 latencies
@@ -1574,6 +1575,7 @@ Stringified
 stringified
 stringify
 Struct
+StructuredTool
 STS
 subchart
 subclassed
diff --git a/providers/common/ai/docs/toolsets.rst 
b/providers/common/ai/docs/toolsets.rst
index 33fc04f0e18..ee6d8a85b63 100644
--- a/providers/common/ai/docs/toolsets.rst
+++ b/providers/common/ai/docs/toolsets.rst
@@ -407,6 +407,71 @@ resolves sources to local ``SKILL.md`` directories that 
any loader accepts:
 ``resolve_skills`` needs the Git provider (for ``GitSkills``) but not 
pydantic-ai,
 and removes any cloned directories when the ``with`` block exits.
 
+Working with LangChain
+----------------------
+
+Tools bridge in both directions between common.ai's toolsets and LangChain.
+
+**LangChain tools → ``AgentOperator``.** No Airflow code is needed. pydantic-ai
+ships `pydantic_ai.ext.langchain.LangChainToolset
+<https://ai.pydantic.dev/toolsets/>`__ upstream, which wraps existing LangChain
+tools as an ``AbstractToolset``. Drop it straight into ``AgentOperator``:
+
+.. code-block:: python
+
+    from pydantic_ai.ext.langchain import LangChainToolset
+
+    AgentOperator(
+        task_id="agent_with_langchain_tools",
+        prompt="Research the question and summarise.",
+        llm_conn_id="pydanticai_default",
+        toolsets=[LangChainToolset([my_langchain_tool])],
+    )
+
+**common.ai toolsets → LangChain.** The reverse direction is what
+:func:`~airflow.providers.common.ai.toolsets.langchain_bridge.airflow_toolset_to_langchain_tools`
+provides. It converts any pydantic-ai toolset -- including ``SQLToolset``,
+``HookToolset``, and ``MCPToolset`` -- into a list of LangChain
+``StructuredTool`` objects, so a LangChain agent or chain can call Airflow's
+curated, connection-managed tools:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_langchain_toolset_bridge.py
+    :language: python
+    :start-after: [START example_langchain_toolset_bridge]
+    :end-before: [END example_langchain_toolset_bridge]
+
+Each generated tool keeps the source tool's name, description, and argument
+schema, and routes calls back through the original toolset, so the toolset's 
own
+behaviour (connection resolution, ``SQLToolset``'s SQL validation, and
+``allowed_tables`` filtering) still applies. ``get_tools`` runs eagerly at
+conversion time to enumerate the tools.
+
+When a toolset raises pydantic-ai's ``ModelRetry`` to ask the model to correct
+its input (``SQLToolset`` does this on, for example, an unknown column), the
+bridge returns that message as the tool's output so the model sees it and tries
+again. ``ModelRetry`` is a feed-the-model-and-retry signal rather than a
+failure, so returning it preserves the self-correction the toolset was written
+for and works no matter how the agent is configured to handle tool errors
+(raising would abort the run under ``create_agent``'s default handling).
+
+The bridge does not hold a toolset session open across calls: ``get_tools`` and
+every tool call each run under their own event loop, so for ``MCPToolset`` the
+connection is opened and torn down around each call. It reconnects per call,
+which is fine for stateless tools but unsuitable for ``stdio`` MCP servers (or
+any server that keeps state between calls), since each call starts a fresh
+session.
+
+.. note::
+
+    Outside an agent run there is no live ``RunContext``, so the bridge builds 
a
+    minimal one with an inert placeholder model. The bundled toolsets ignore 
the
+    context, so this is transparent for them. A custom toolset that reads live
+    run state (``ctx.model``, ``ctx.messages``, ``ctx.usage``) will not behave
+    correctly when bridged standalone.
+
+Requires the ``langchain`` extra:
+``pip install "apache-airflow-providers-common-ai[langchain]"``
+
 
 Security
 --------
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_toolset_bridge.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_toolset_bridge.py
new file mode 100644
index 00000000000..1bfad3c8e8f
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_toolset_bridge.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Expose an Airflow toolset to a LangChain agent (the reverse bridge).
+
+common.ai's curated toolsets (``SQLToolset``, ``HookToolset``, ``MCPToolset``)
+are pydantic-ai toolsets, so ``AgentOperator`` uses them natively. This example
+shows the *reverse* direction: convert a ``SQLToolset`` into LangChain
+``StructuredTool`` objects with
+:func:`~airflow.providers.common.ai.toolsets.langchain_bridge.airflow_toolset_to_langchain_tools`
+and hand them to a LangChain ReAct agent built with ``create_agent``. The agent
+gets Airflow's managed connections and read-only SQL validation for free.
+
+**Forward direction** (LangChain tools -> ``AgentOperator``): no Airflow code
+is needed. pydantic-ai ships ``pydantic_ai.ext.langchain.LangChainToolset``
+upstream, so ``LangChainToolset([my_langchain_tool])`` drops straight into
+``AgentOperator(toolsets=[...])``. See https://ai.pydantic.dev for details.
+
+Before running:
+
+1. Install LangChain: ``pip install 
"apache-airflow-providers-common-ai[langchain]" langchain-openai``
+2. Create a ``langchain`` connection named ``langchain_default`` (set
+   ``password`` to your API key) for the model.
+3. Create a database connection (``DB_CONN_ID``, default ``sql_default``) whose
+   hook is a ``DbApiHook`` (e.g. SQLite, Postgres, MySQL).
+"""
+
+from __future__ import annotations
+
+import os
+
+from airflow.providers.common.compat.sdk import dag, task
+
+LLM_CONN_ID = os.environ.get("LLM_CONN_ID", "langchain_default")
+LLM_MODEL = os.environ.get("LLM_MODEL", "openai:gpt-4o")
+DB_CONN_ID = os.environ.get("DB_CONN_ID", "sql_default")
+
+DEFAULT_QUESTION = "Which tables exist, and how many rows does each contain?"
+
+
+# [START example_langchain_toolset_bridge]
+@dag(tags=["example"])
+def example_langchain_toolset_bridge():
+    """Run a LangChain SQL agent backed by Airflow's curated ``SQLToolset``."""
+
+    @task
+    def run_sql_agent(question: str = DEFAULT_QUESTION) -> str:
+        from langchain.agents import create_agent
+
+        from airflow.providers.common.ai.hooks.langchain import LangChainHook
+        from airflow.providers.common.ai.toolsets import 
airflow_toolset_to_langchain_tools
+        from airflow.providers.common.ai.toolsets.sql import SQLToolset
+
+        # Airflow's curated, read-only SQL toolset, exposed as LangChain tools.
+        # The bridge carries each tool's name, description, and args schema, 
and
+        # routes calls back through SQLToolset (connection resolution + SQL
+        # validation included).
+        tools = 
airflow_toolset_to_langchain_tools(SQLToolset(db_conn_id=DB_CONN_ID))
+
+        model = LangChainHook(llm_conn_id=LLM_CONN_ID, 
llm_model=LLM_MODEL).get_chat_model()
+        agent = create_agent(
+            model,
+            tools=tools,
+            system_prompt=(
+                "You are a SQL analyst. Use list_tables and get_schema to 
explore "
+                "the database, then run read-only queries to answer the 
question."
+            ),
+        )
+
+        result = agent.invoke({"messages": [{"role": "user", "content": 
question}]})
+        return result["messages"][-1].content
+
+    run_sql_agent()
+
+
+# [END example_langchain_toolset_bridge]
+
+
+example_langchain_toolset_bridge()
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/toolsets/__init__.py 
b/providers/common/ai/src/airflow/providers/common/ai/toolsets/__init__.py
index 957531e71ef..6c30fa4a733 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/toolsets/__init__.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/toolsets/__init__.py
@@ -20,10 +20,16 @@ from __future__ import annotations
 
 from airflow.providers.common.ai.toolsets.hook import HookToolset
 
-__all__ = ["HookToolset", "MCPToolset", "SQLToolset"]
+__all__ = ["HookToolset", "MCPToolset", "SQLToolset", 
"airflow_toolset_to_langchain_tools"]
 
 
 def __getattr__(name: str):
+    if name == "airflow_toolset_to_langchain_tools":
+        from airflow.providers.common.ai.toolsets.langchain_bridge import (
+            airflow_toolset_to_langchain_tools,
+        )
+
+        return airflow_toolset_to_langchain_tools
     if name == "SQLToolset":
         try:
             from airflow.providers.common.ai.toolsets.sql import SQLToolset
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/toolsets/langchain_bridge.py
 
b/providers/common/ai/src/airflow/providers/common/ai/toolsets/langchain_bridge.py
new file mode 100644
index 00000000000..3f5762679ae
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/toolsets/langchain_bridge.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Bridge pydantic-ai toolsets into LangChain tools.
+
+This is the reverse of pydantic-ai's upstream ``pydantic_ai.ext.langchain``
+bridge. Upstream turns LangChain tools *into* a pydantic-ai toolset
+(:class:`~pydantic_ai.ext.langchain.LangChainToolset`) so they can be used with
+common.ai's ``AgentOperator``. This module goes the other way: it turns a
+pydantic-ai :class:`~pydantic_ai.toolsets.abstract.AbstractToolset` -- such as
+common.ai's :class:`~airflow.providers.common.ai.toolsets.sql.SQLToolset`,
+:class:`~airflow.providers.common.ai.toolsets.hook.HookToolset`, or
+:class:`~airflow.providers.common.ai.toolsets.mcp.MCPToolset` -- into a list of
+LangChain ``StructuredTool`` objects, so Airflow's curated tools can be handed
+to a LangChain agent or chain.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import concurrent.futures
+from typing import TYPE_CHECKING, Any
+
+from pydantic_ai import RunContext
+from pydantic_ai.exceptions import ModelRetry
+from pydantic_ai.models.test import TestModel
+from pydantic_ai.usage import RunUsage
+
+if TYPE_CHECKING:
+    from collections.abc import Coroutine
+
+    from langchain_core.tools import StructuredTool
+    from pydantic_ai.toolsets.abstract import AbstractToolset, ToolsetTool
+
+
+def _run_coro_sync(coro: Coroutine[Any, Any, Any]) -> Any:
+    """
+    Run an awaitable to completion from synchronous code.
+
+    LangChain's ``StructuredTool.func`` is synchronous and is what an Airflow
+    ``@task`` calls, but a pydantic-ai toolset's ``get_tools`` / ``call_tool``
+    are coroutines. When no event loop is running we drive the coroutine with
+    :func:`asyncio.run`; if one is already running in this thread (an async
+    caller) we run it in a worker thread to avoid nesting loops.
+    """
+    try:
+        asyncio.get_running_loop()
+    except RuntimeError:
+        return asyncio.run(coro)
+    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
+        return pool.submit(asyncio.run, coro).result()
+
+
+def airflow_toolset_to_langchain_tools(
+    toolset: AbstractToolset[Any],
+    *,
+    deps: Any = None,
+) -> list[StructuredTool]:
+    """
+    Convert a pydantic-ai toolset into a list of LangChain ``StructuredTool`` 
objects.
+
+    Each returned tool is backed by ``toolset.call_tool`` and carries the
+    ``args_schema`` derived from the tool's JSON schema, so a LangChain agent 
or
+    chain can call it the same way it calls any native LangChain tool.
+
+    If a tool raises pydantic-ai's :exc:`~pydantic_ai.exceptions.ModelRetry`
+    (the bundled SQL toolsets do this to ask the model to correct its input,
+    e.g. an unknown column), the bridge returns the retry message as the tool's
+    output so the model sees it and tries again. ``ModelRetry`` is a
+    feed-the-model-and-retry signal, not a failure; returning it mirrors that 
and
+    works regardless of how the agent handles tool errors. Raising instead 
would
+    abort the run under ``create_agent``'s default tool-error handling.
+
+    The toolset's ``get_tools`` is invoked eagerly here to enumerate the tools.
+
+    .. warning::
+        The bridge does not hold a toolset session open across calls. 
``get_tools``
+        and every ``call_tool`` each run under their own event loop, and 
pydantic-ai
+        opens and tears the connection down around each one. For 
``MCPToolset`` this
+        means the server is reconnected on every tool call. That is fine for
+        stateless tools (and for HTTP/SSE servers, modulo per-call latency), 
but an
+        ``MCPServerStdio`` server, or any server that keeps state between 
calls,
+        will lose that state because each call starts a fresh process/session.
+
+    .. note::
+        A pydantic-ai toolset is normally driven inside an agent run, where a
+        live :class:`~pydantic_ai.RunContext` carries the model, usage, and
+        message history. Outside an agent run there is no such context, so this
+        bridge builds a minimal one with an inert placeholder model. The 
curated
+        common.ai toolsets (``SQLToolset``, ``HookToolset``, ``MCPToolset``)
+        ignore the context, so this works for them. A custom toolset that reads
+        live run state (``ctx.model``, ``ctx.messages``, ``ctx.usage``) will 
not
+        behave correctly when bridged standalone.
+
+    :param toolset: The pydantic-ai toolset to convert.
+    :param deps: Optional dependency object exposed to the toolset as
+        ``ctx.deps``. Defaults to ``None``.
+    :return: A list of LangChain ``StructuredTool`` objects, one per tool in 
the
+        toolset.
+    """
+    try:
+        from langchain_core.tools import StructuredTool
+    except ImportError as e:
+        from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+        raise AirflowOptionalProviderFeatureException(e)
+
+    # An inert placeholder context. The curated common.ai toolsets ignore it;
+    # TestModel satisfies RunContext's required `model` field without reaching 
a
+    # real LLM (the bridge never runs the model, only the tools).
+    ctx: RunContext[Any] = RunContext(deps=deps, model=TestModel(), 
usage=RunUsage())
+
+    toolset_tools = _run_coro_sync(toolset.get_tools(ctx))
+
+    return [
+        _build_structured_tool(toolset, name, toolset_tool, ctx, 
StructuredTool)
+        for name, toolset_tool in toolset_tools.items()
+    ]
+
+
+def _build_structured_tool(
+    toolset: AbstractToolset[Any],
+    name: str,
+    toolset_tool: ToolsetTool[Any],
+    ctx: RunContext[Any],
+    structured_tool_cls: type[StructuredTool],
+) -> StructuredTool:
+    """Build a single LangChain ``StructuredTool`` from one pydantic-ai 
tool."""
+    tool_def = toolset_tool.tool_def
+
+    def _validate(kwargs: dict[str, Any]) -> dict[str, Any]:
+        # Mirrors what pydantic-ai's ToolManager does before dispatch, which 
the
+        # bridge bypasses. A passthrough validator (the bundled toolsets) 
returns
+        # the args unchanged; a typed one coerces them (e.g. "5" -> 5).
+        return toolset_tool.args_validator.validate_python(kwargs)
+
+    def _sync_call(**kwargs: Any) -> Any:
+        try:
+            return _run_coro_sync(toolset.call_tool(name, _validate(kwargs), 
ctx, toolset_tool))
+        except ModelRetry as e:
+            # ModelRetry is a "feed this back to the model and retry" signal, 
not a
+            # failure. Return the message as the tool output so the model 
self-corrects
+            # (see docstring); raising would abort under create_agent's 
default handling.
+            return str(e)
+
+    async def _async_call(**kwargs: Any) -> Any:
+        try:
+            return await toolset.call_tool(name, _validate(kwargs), ctx, 
toolset_tool)
+        except ModelRetry as e:
+            return str(e)
+
+    return structured_tool_cls.from_function(
+        func=_sync_call,
+        coroutine=_async_call,
+        name=name,
+        description=tool_def.description or name,
+        args_schema=tool_def.parameters_json_schema,
+    )
diff --git 
a/providers/common/ai/tests/unit/common/ai/toolsets/test_langchain_bridge.py 
b/providers/common/ai/tests/unit/common/ai/toolsets/test_langchain_bridge.py
new file mode 100644
index 00000000000..6187f4e9d4c
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/toolsets/test_langchain_bridge.py
@@ -0,0 +1,193 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import sys
+from typing import Any
+
+import pytest
+
+pytest.importorskip("langchain_core")
+
+from pydantic_ai.exceptions import ModelRetry
+from pydantic_ai.tools import ToolDefinition
+from pydantic_ai.toolsets.abstract import AbstractToolset, ToolsetTool
+from pydantic_core import SchemaValidator, core_schema
+
+from airflow.providers.common.ai.toolsets.langchain_bridge import 
airflow_toolset_to_langchain_tools
+
+_PASSTHROUGH = SchemaValidator(core_schema.any_schema())
+# Coerces the ``n`` field to int so we can assert the args_validator runs.
+_INT_VALIDATOR = SchemaValidator(
+    core_schema.typed_dict_schema({"n": 
core_schema.typed_dict_field(core_schema.int_schema())})
+)
+
+_ECHO_SCHEMA: dict[str, Any] = {
+    "type": "object",
+    "properties": {"text": {"type": "string", "description": "Text to echo."}},
+    "required": ["text"],
+}
+_ADD_ONE_SCHEMA: dict[str, Any] = {
+    "type": "object",
+    "properties": {"n": {"type": "integer", "description": "A number."}},
+    "required": ["n"],
+}
+_BOOM_SCHEMA: dict[str, Any] = {"type": "object", "properties": {}}
+
+
+class FakeToolset(AbstractToolset[None]):
+    """Minimal toolset with two tools, recording the args each tool is called 
with."""
+
+    def __init__(self) -> None:
+        self.calls: list[tuple[str, dict[str, Any]]] = []
+
+    @property
+    def id(self) -> str:
+        return "fake"
+
+    async def get_tools(self, ctx) -> dict[str, ToolsetTool[None]]:
+        return {
+            "echo": ToolsetTool(
+                toolset=self,
+                tool_def=ToolDefinition(
+                    name="echo", description="Echo the text back.", 
parameters_json_schema=_ECHO_SCHEMA
+                ),
+                max_retries=1,
+                args_validator=_PASSTHROUGH,
+            ),
+            "add_one": ToolsetTool(
+                toolset=self,
+                tool_def=ToolDefinition(
+                    name="add_one", description="Add one to n.", 
parameters_json_schema=_ADD_ONE_SCHEMA
+                ),
+                max_retries=1,
+                args_validator=_INT_VALIDATOR,
+            ),
+            "boom": ToolsetTool(
+                toolset=self,
+                tool_def=ToolDefinition(
+                    name="boom",
+                    description="Always asks the model to retry.",
+                    parameters_json_schema=_BOOM_SCHEMA,
+                ),
+                max_retries=1,
+                args_validator=_PASSTHROUGH,
+            ),
+        }
+
+    async def call_tool(self, name, tool_args, ctx, tool) -> Any:
+        self.calls.append((name, tool_args))
+        if name == "echo":
+            return f"echo: {tool_args['text']}"
+        if name == "add_one":
+            return tool_args["n"] + 1
+        if name == "boom":
+            raise ModelRetry("fix your input and try again")
+        raise ValueError(name)
+
+
+class TestAirflowToolsetToLangChainTools:
+    def test_returns_one_tool_per_toolset_tool(self):
+        tools = airflow_toolset_to_langchain_tools(FakeToolset())
+
+        assert {t.name for t in tools} == {"echo", "add_one", "boom"}
+
+    def test_carries_description_and_args_schema(self):
+        tools = {t.name: t for t in 
airflow_toolset_to_langchain_tools(FakeToolset())}
+
+        echo = tools["echo"]
+        assert echo.description == "Echo the text back."
+        # ``args`` is derived from the tool's parameters_json_schema.
+        assert "text" in echo.args
+        assert echo.args["text"]["type"] == "string"
+
+    def test_sync_invoke_calls_through_to_toolset(self):
+        toolset = FakeToolset()
+        echo = {t.name: t for t in 
airflow_toolset_to_langchain_tools(toolset)}["echo"]
+
+        result = echo.invoke({"text": "hi"})
+
+        assert result == "echo: hi"
+        assert toolset.calls == [("echo", {"text": "hi"})]
+
+    def test_args_validator_coerces_before_call(self):
+        toolset = FakeToolset()
+        add_one = {t.name: t for t in 
airflow_toolset_to_langchain_tools(toolset)}["add_one"]
+
+        # LangChain passes the raw value through; the toolset's validator 
coerces
+        # the string "5" to the int 5 before call_tool sees it.
+        result = add_one.invoke({"n": "5"})
+
+        assert result == 6
+        assert toolset.calls == [("add_one", {"n": 5})]
+
+    def test_async_invoke_calls_through_to_toolset(self):
+        toolset = FakeToolset()
+        echo = {t.name: t for t in 
airflow_toolset_to_langchain_tools(toolset)}["echo"]
+
+        result = asyncio.run(echo.ainvoke({"text": "yo"}))
+
+        assert result == "echo: yo"
+        assert toolset.calls == [("echo", {"text": "yo"})]
+
+    def test_model_retry_returned_as_tool_output_sync(self):
+        # ModelRetry is a "retry with this guidance" signal, not a failure: the
+        # bridge returns the message as the tool output so the model 
self-corrects
+        # rather than aborting the agent run.
+        boom = {t.name: t for t in 
airflow_toolset_to_langchain_tools(FakeToolset())}["boom"]
+
+        assert boom.invoke({}) == "fix your input and try again"
+
+    def test_model_retry_returned_as_tool_output_async(self):
+        boom = {t.name: t for t in 
airflow_toolset_to_langchain_tools(FakeToolset())}["boom"]
+
+        assert asyncio.run(boom.ainvoke({})) == "fix your input and try again"
+
+    def test_deps_are_exposed_on_the_run_context(self):
+        sentinel = object()
+        captured: dict[str, Any] = {}
+
+        class DepsToolset(FakeToolset):
+            async def get_tools(self, ctx) -> dict[str, ToolsetTool[None]]:
+                captured["deps"] = ctx.deps
+                return await super().get_tools(ctx)
+
+        airflow_toolset_to_langchain_tools(DepsToolset(), deps=sentinel)
+
+        assert captured["deps"] is sentinel
+
+    def test_missing_langchain_raises_optional_feature_exception(self, 
monkeypatch):
+        from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+        # Setting the module to None in sys.modules makes the lazy import 
inside
+        # the bridge raise ImportError, exercising the optional-dependency 
path.
+        monkeypatch.setitem(sys.modules, "langchain_core.tools", None)
+
+        with pytest.raises(AirflowOptionalProviderFeatureException):
+            airflow_toolset_to_langchain_tools(FakeToolset())
+
+
+class TestSQLToolsetConversion:
+    def test_sql_toolset_exposes_its_four_tools(self):
+        # get_tools / construction do not touch the database, so no connection
+        # is needed to enumerate the tools.
+        sql = pytest.importorskip("airflow.providers.common.ai.toolsets.sql")
+
+        tools = 
airflow_toolset_to_langchain_tools(sql.SQLToolset(db_conn_id="db"))
+
+        assert {t.name for t in tools} == {"list_tables", "get_schema", 
"query", "check_query"}

Reply via email to