This is an automated email from the ASF dual-hosted git repository. vikramkoka pushed a commit to branch aip99-crewai in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cd2c2aa865fc6186cff0e443177d9da170cd5912 Author: Vikram Koka <[email protected]> AuthorDate: Tue May 19 16:49:01 2026 +0100 Add CrewAI framework hook to common.ai provider - Add CrewAIHook(BaseHook) that bridges Airflow connections to CrewAI's LLM constructor, reusing the existing pydanticai connection type — users configure one connection for PydanticAI, LlamaIndex, LangChain, and CrewAI - Hook provides _resolve_connection_kwargs() (password → api_key, host → base_url) and get_llm() returning a crewai.LLM instance with credentials injected via constructor (no env var mutation) - Register CrewAI integration and hook in provider.yaml, add crewai>=1.14.0 optional dependency, add docs with usage example New files - hooks/crewai.py — CrewAIHook implementation (~80 lines) - tests/unit/common/ai/hooks/test_crewai.py — 8 unit tests covering init, connection extraction, and get_llm() - docs/hooks/crewai.rst — Hook docs with Agent/Crew/Task usage example, connection config, parameter table Design notes Follows the exact pattern established by the LangChain hook. Simpler surface area — no embed_conn_id or get_embedding_model() since CrewAI doesn't need separate embedding models (LiteLLM handles model routing internally). --- providers/common/ai/docs/hooks/crewai.rst | 99 +++++++++++++++ providers/common/ai/docs/index.rst | 1 + providers/common/ai/provider.yaml | 6 + providers/common/ai/pyproject.toml | 3 + .../airflow/providers/common/ai/hooks/crewai.py | 79 ++++++++++++ .../ai/tests/unit/common/ai/hooks/test_crewai.py | 136 +++++++++++++++++++++ 6 files changed, 324 insertions(+) diff --git a/providers/common/ai/docs/hooks/crewai.rst b/providers/common/ai/docs/hooks/crewai.rst new file mode 100644 index 00000000000..f0edf94b8ca --- /dev/null +++ b/providers/common/ai/docs/hooks/crewai.rst @@ -0,0 +1,99 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/hook:crewai: + +``CrewAIHook`` +============== + +Use :class:`~airflow.providers.common.ai.hooks.crewai.CrewAIHook` +to bridge Airflow connections to CrewAI model constructors. The hook +extracts credentials from an Airflow connection and returns a configured +``crewai.LLM`` instance that can be passed directly to CrewAI agents. + +The hook reuses the ``pydanticai`` connection type, so users configure a +single connection for PydanticAI operators, LlamaIndex tasks, LangChain +tasks, and CrewAI tasks. + +.. seealso:: + :ref:`Connection configuration <howto/connection:pydanticai>` + +Basic Usage +----------- + +Use the hook in a ``@task`` function to get a configured LLM for CrewAI +agents: + +.. code-block:: python + + from airflow.providers.common.ai.hooks.crewai import CrewAIHook + + @task + def run_crew(topic: str) -> str: + hook = CrewAIHook(llm_conn_id="pydanticai_default", llm_model="openai/gpt-4o") + llm = hook.get_llm() + + from crewai import Agent, Crew, Task + + researcher = Agent( + role="Researcher", + goal=f"Research {topic}", + backstory="You are an expert researcher.", + llm=llm, + ) + research_task = Task( + description=f"Research the latest developments in {topic}.", + expected_output="A summary of key findings.", + agent=researcher, + ) + crew = Crew(agents=[researcher], tasks=[research_task]) + result = crew.kickoff() + return result.raw + +Connection Configuration +------------------------ + +The hook reads credentials from the Airflow connection: + +- **password** -- API key (passed as ``api_key`` to ``crewai.LLM``) +- **host** -- Base URL (passed as ``base_url``; optional, for custom + endpoints) + +Parameters +---------- + +.. list-table:: + :header-rows: 1 + :widths: 25 15 60 + + * - Parameter + - Default + - Description + * - ``llm_conn_id`` + - ``pydanticai_default`` + - Airflow connection ID for the LLM provider. + * - ``llm_model`` + - ``None`` + - Model name in LiteLLM format (e.g. ``openai/gpt-4o``). + Required for ``get_llm()``. + +Dependencies +------------ + +Install the ``crewai`` extra to use this hook:: + + pip install apache-airflow-providers-common-ai[crewai] diff --git a/providers/common/ai/docs/index.rst b/providers/common/ai/docs/index.rst index e96ba4cfd27..6c5d39c6f66 100644 --- a/providers/common/ai/docs/index.rst +++ b/providers/common/ai/docs/index.rst @@ -37,6 +37,7 @@ Connection types <connections/pydantic_ai> MCP connection <connections/mcp> Hooks <hooks/pydantic_ai> + CrewAI Hook <hooks/crewai> Toolsets <toolsets> Operators <operators/index> HITL Review <hitl_review> diff --git a/providers/common/ai/provider.yaml b/providers/common/ai/provider.yaml index 2a13392ea99..2e7742a4246 100644 --- a/providers/common/ai/provider.yaml +++ b/providers/common/ai/provider.yaml @@ -48,6 +48,9 @@ integrations: - integration-name: MCP Server external-doc-url: https://modelcontextprotocol.io/ tags: [ai] + - integration-name: CrewAI + external-doc-url: https://docs.crewai.com/ + tags: [ai] hooks: - integration-name: Pydantic AI @@ -56,6 +59,9 @@ hooks: - integration-name: MCP Server python-modules: - airflow.providers.common.ai.hooks.mcp + - integration-name: CrewAI + python-modules: + - airflow.providers.common.ai.hooks.crewai plugins: - name: hitl_review diff --git a/providers/common/ai/pyproject.toml b/providers/common/ai/pyproject.toml index 57ba93f7461..8d21a315c9b 100644 --- a/providers/common/ai/pyproject.toml +++ b/providers/common/ai/pyproject.toml @@ -95,6 +95,9 @@ dependencies = [ "common.sql" = [ "apache-airflow-providers-common-sql" ] +"crewai" = [ + "crewai>=1.14.0", +] [dependency-groups] dev = [ diff --git a/providers/common/ai/src/airflow/providers/common/ai/hooks/crewai.py b/providers/common/ai/src/airflow/providers/common/ai/hooks/crewai.py new file mode 100644 index 00000000000..b88b80f0c96 --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/hooks/crewai.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Hook for CrewAI integration with Airflow connections.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseHook + +if TYPE_CHECKING: + from crewai import LLM + + +class CrewAIHook(BaseHook): + """ + Bridge Airflow connections to CrewAI model constructors. + + Reuses the ``pydanticai`` connection type so users configure a single + connection for PydanticAI operators, LlamaIndex tasks, LangChain tasks, + and CrewAI tasks. + + :param llm_conn_id: Airflow connection ID for the LLM provider. + :param llm_model: Model name in LiteLLM format (e.g. ``openai/gpt-4o``, + ``anthropic/claude-sonnet-4-20250514``). Required for :meth:`get_llm`. + """ + + conn_name_attr = "llm_conn_id" + default_conn_name = "pydanticai_default" + conn_type = "pydanticai" + hook_name = "CrewAI" + + def __init__( + self, + llm_conn_id: str = "pydanticai_default", + llm_model: str | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.llm_conn_id = llm_conn_id + self.llm_model = llm_model + + def _resolve_connection_kwargs(self, conn_id: str) -> dict[str, Any]: + """Extract API key and base URL from an Airflow connection.""" + conn = self.get_connection(conn_id) + kwargs: dict[str, Any] = {} + if conn.password: + kwargs["api_key"] = conn.password + if conn.host: + kwargs["base_url"] = conn.host + return kwargs + + def get_llm(self) -> LLM: + """ + Return a CrewAI LLM configured from the Airflow connection. + + Requires ``llm_model`` to be set on the hook. + """ + if not self.llm_model: + raise ValueError("llm_model must be set to use get_llm()") + + from crewai import LLM + + conn_kwargs = self._resolve_connection_kwargs(self.llm_conn_id) + return LLM(model=self.llm_model, **conn_kwargs) diff --git a/providers/common/ai/tests/unit/common/ai/hooks/test_crewai.py b/providers/common/ai/tests/unit/common/ai/hooks/test_crewai.py new file mode 100644 index 00000000000..70eb37cfa71 --- /dev/null +++ b/providers/common/ai/tests/unit/common/ai/hooks/test_crewai.py @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.providers.common.ai.hooks.crewai import CrewAIHook + + +class TestCrewAIHookInit: + def test_default_params(self): + hook = CrewAIHook() + assert hook.llm_conn_id == "pydanticai_default" + assert hook.llm_model is None + + def test_custom_params(self): + hook = CrewAIHook(llm_conn_id="my_conn", llm_model="openai/gpt-4o") + assert hook.llm_conn_id == "my_conn" + assert hook.llm_model == "openai/gpt-4o" + + def test_conn_type_is_pydanticai(self): + assert CrewAIHook.conn_type == "pydanticai" + assert CrewAIHook.default_conn_name == "pydanticai_default" + + +class TestResolveConnectionKwargs: + @patch.object(CrewAIHook, "get_connection") + def test_extracts_password_as_api_key(self, mock_get_conn): + mock_conn = MagicMock() + mock_conn.password = "sk-test-key" + mock_conn.host = "" + mock_get_conn.return_value = mock_conn + + hook = CrewAIHook() + result = hook._resolve_connection_kwargs("test_conn") + + assert result == {"api_key": "sk-test-key"} + + @patch.object(CrewAIHook, "get_connection") + def test_extracts_host_as_base_url(self, mock_get_conn): + mock_conn = MagicMock() + mock_conn.password = "" + mock_conn.host = "https://custom.api.com" + mock_get_conn.return_value = mock_conn + + hook = CrewAIHook() + result = hook._resolve_connection_kwargs("test_conn") + + assert result == {"base_url": "https://custom.api.com"} + + @patch.object(CrewAIHook, "get_connection") + def test_both_password_and_host(self, mock_get_conn): + mock_conn = MagicMock() + mock_conn.password = "sk-key" + mock_conn.host = "https://api.example.com" + mock_get_conn.return_value = mock_conn + + hook = CrewAIHook() + result = hook._resolve_connection_kwargs("test_conn") + + assert result == {"api_key": "sk-key", "base_url": "https://api.example.com"} + + @patch.object(CrewAIHook, "get_connection") + def test_empty_fields_return_empty_dict(self, mock_get_conn): + mock_conn = MagicMock() + mock_conn.password = "" + mock_conn.host = "" + mock_get_conn.return_value = mock_conn + + hook = CrewAIHook() + result = hook._resolve_connection_kwargs("test_conn") + + assert result == {} + + +def _make_mock_crewai_module(): + mock_module = MagicMock() + mock_cls = MagicMock() + mock_module.LLM = mock_cls + return mock_module, mock_cls + + +class TestGetLlm: + def test_raises_without_llm_model(self): + hook = CrewAIHook() + with pytest.raises(ValueError, match="llm_model must be set"): + hook.get_llm() + + @patch.object(CrewAIHook, "get_connection") + def test_returns_crewai_llm(self, mock_get_conn): + mock_conn = MagicMock() + mock_conn.password = "sk-test" + mock_conn.host = "" + mock_get_conn.return_value = mock_conn + + mock_module, mock_cls = _make_mock_crewai_module() + + hook = CrewAIHook(llm_model="openai/gpt-4o") + with patch.dict("sys.modules", {"crewai": mock_module}): + result = hook.get_llm() + + mock_cls.assert_called_once_with(model="openai/gpt-4o", api_key="sk-test") + assert result == mock_cls.return_value + + @patch.object(CrewAIHook, "get_connection") + def test_passes_base_url(self, mock_get_conn): + mock_conn = MagicMock() + mock_conn.password = "sk-test" + mock_conn.host = "https://custom.api.com" + mock_get_conn.return_value = mock_conn + + mock_module, mock_cls = _make_mock_crewai_module() + + hook = CrewAIHook(llm_model="openai/gpt-4o") + with patch.dict("sys.modules", {"crewai": mock_module}): + hook.get_llm() + + mock_cls.assert_called_once_with( + model="openai/gpt-4o", api_key="sk-test", base_url="https://custom.api.com" + )
