kaxil commented on code in PR #67450:
URL: https://github.com/apache/airflow/pull/67450#discussion_r3295557121


##########
providers/common/ai/src/airflow/providers/common/ai/hooks/strands_ai.py:
##########
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Hooks for LLM agents via the Strands Agents SDK."""
+
+from __future__ import annotations
+
+import functools
+from abc import abstractmethod
+from typing import Any
+
+from airflow.providers.common.ai.hooks.base_ai import (
+    AgentRunRequest,
+    AgentRunResult,
+    BaseAIHook,
+    SkillSpec,
+    ToolSpec,
+)
+
+
+class StrandsHook(BaseAIHook):
+    """
+    Base hook for LLM agents via `Strands Agents 
<https://strandsagents.com/>`__.
+
+    Subclasses implement :meth:`get_model` to return a configured Strands 
model instance
+    (for example :class:`strands.models.gemini.GeminiModel`). The
+    :meth:`create_agent`, :meth:`run_agent`, and :meth:`_tool_spec_to_native`
+    implementations are shared across all Strands model backends.
+    """
+
+    conn_name_attr = "llm_conn_id"
+    default_conn_name = "strands_default"
+
+    supports_toolsets = True
+    supports_durable = False
+    supports_usage_limits = False
+    supports_skills = True
+
+    def __init__(
+        self,
+        llm_conn_id: str | None = None,
+        model_id: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.llm_conn_id = llm_conn_id if llm_conn_id is not None else 
self.default_conn_name
+        self.model_id = model_id
+        self._resolved_model_id: str | None = None
+
+    @abstractmethod
+    def get_model(self) -> Any:
+        """Return a configured Strands model instance."""
+
+    def _tool_spec_to_native(self, spec: ToolSpec) -> Any:
+        """Convert a 
:class:`~airflow.providers.common.ai.hooks.base_ai.ToolSpec` to a Strands 
tool."""
+        try:
+            from strands import tool as strands_tool
+        except ImportError as e:
+            from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+            raise AirflowOptionalProviderFeatureException(
+                "The 'strands-agents' package is required for StrandsHook. "
+                "Install it with: pip install 
'apache-airflow-providers-common-ai[strands]'"
+            ) from e
+
+        fn = spec.fn
+
+        # Strands infers tool name from __name__ and description from __doc__.
+        # functools.wraps preserves __wrapped__ so inspect.signature() follows 
it
+        # for parameter schema inference, then we override name/doc from spec.
+        @functools.wraps(fn)
+        def tool_fn(*args: Any, **kwargs: Any) -> Any:
+            return fn(*args, **kwargs)
+
+        tool_fn.__name__ = spec.name.replace("-", "_")

Review Comment:
   `spec.name.replace("-", "_")` only handles hyphens. Names with spaces (`"my 
tool"`), dots (`"ns.tool"`), leading digits (`"123-tool"` -> `"123_tool"`), or 
empty strings still pass through and produce invalid Python identifiers, which 
breaks Strands' `inspect.signature()`-based schema inference downstream.
   
   Suggest a stricter normalizer plus a non-empty guard, e.g.:
   
   ```python
   import re
   safe = re.sub(r"\W|^(?=\d)", "_", spec.name) or "tool"
   tool_fn.__name__ = safe
   ```
   
   or reject invalid names up-front in `ToolSpec` validation.



##########
providers/common/ai/provider.yaml:
##########
@@ -59,6 +59,9 @@ integrations:
       - 
/docs/apache-airflow-providers-common-ai/operators/llamaindex_embedding.rst
       - 
/docs/apache-airflow-providers-common-ai/operators/llamaindex_retrieval.rst
     tags: [ai]
+  - integration-name: Strands Agents

Review Comment:
   Missing `how-to-guide:` for the Strands Agents integration. The Pydantic AI 
and LlamaIndex entries above each list `how-to-guide:` pointing at their `.rst` 
files (see lines 47-50 and 58-60), but the Strands block has only 
`integration-name`, `external-doc-url`, and `tags`.
   
   Without `how-to-guide`, the new `strands.rst` / `example_strands.py` won't 
be linked from the integration index page in the docs. Add the `.rst` paths 
once the docs file is in place.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/strands_ai.py:
##########
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Hooks for LLM agents via the Strands Agents SDK."""
+
+from __future__ import annotations
+
+import functools
+from abc import abstractmethod
+from typing import Any
+
+from airflow.providers.common.ai.hooks.base_ai import (
+    AgentRunRequest,
+    AgentRunResult,
+    BaseAIHook,
+    SkillSpec,
+    ToolSpec,
+)
+
+
+class StrandsHook(BaseAIHook):
+    """
+    Base hook for LLM agents via `Strands Agents 
<https://strandsagents.com/>`__.
+
+    Subclasses implement :meth:`get_model` to return a configured Strands 
model instance
+    (for example :class:`strands.models.gemini.GeminiModel`). The
+    :meth:`create_agent`, :meth:`run_agent`, and :meth:`_tool_spec_to_native`
+    implementations are shared across all Strands model backends.
+    """
+
+    conn_name_attr = "llm_conn_id"
+    default_conn_name = "strands_default"
+
+    supports_toolsets = True
+    supports_durable = False
+    supports_usage_limits = False
+    supports_skills = True
+
+    def __init__(
+        self,
+        llm_conn_id: str | None = None,
+        model_id: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.llm_conn_id = llm_conn_id if llm_conn_id is not None else 
self.default_conn_name
+        self.model_id = model_id
+        self._resolved_model_id: str | None = None
+
+    @abstractmethod
+    def get_model(self) -> Any:
+        """Return a configured Strands model instance."""
+
+    def _tool_spec_to_native(self, spec: ToolSpec) -> Any:
+        """Convert a 
:class:`~airflow.providers.common.ai.hooks.base_ai.ToolSpec` to a Strands 
tool."""
+        try:
+            from strands import tool as strands_tool
+        except ImportError as e:
+            from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+            raise AirflowOptionalProviderFeatureException(
+                "The 'strands-agents' package is required for StrandsHook. "
+                "Install it with: pip install 
'apache-airflow-providers-common-ai[strands]'"
+            ) from e
+
+        fn = spec.fn
+
+        # Strands infers tool name from __name__ and description from __doc__.
+        # functools.wraps preserves __wrapped__ so inspect.signature() follows 
it
+        # for parameter schema inference, then we override name/doc from spec.
+        @functools.wraps(fn)
+        def tool_fn(*args: Any, **kwargs: Any) -> Any:
+            return fn(*args, **kwargs)
+
+        tool_fn.__name__ = spec.name.replace("-", "_")
+        tool_fn.__doc__ = spec.description
+        return strands_tool(tool_fn)
+
+    def _skill_spec_to_native(self, skill: str | SkillSpec) -> Any:
+        """Convert a skill source to a Strands-native skill object or path."""
+        if isinstance(skill, SkillSpec) and not skill.path:
+            if skill.name and skill.description and skill.instructions:
+                from strands import Skill
+
+                return Skill(
+                    name=skill.name,
+                    description=skill.description,
+                    instructions=skill.instructions,
+                )
+            raise ValueError("SkillSpec must set 'path' or all of 'name', 
'description', and 'instructions'.")
+        return super()._skill_spec_to_native(skill)
+
+    def _build_skills_plugin(self, request: AgentRunRequest) -> Any | None:
+        """Build a Strands ``AgentSkills`` plugin when skill sources are 
configured."""
+        sources = self._resolve_skill_sources(request)
+        if not sources:
+            return None
+
+        try:
+            from strands import AgentSkills
+        except ImportError as e:
+            from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+            raise AirflowOptionalProviderFeatureException(
+                "The 'strands-agents' package is required for Strands skills. "
+                "Install it with: pip install 
'apache-airflow-providers-common-ai[strands]'"
+            ) from e
+
+        skills_arg: Any = sources[0] if len(sources) == 1 else sources
+        return AgentSkills(skills=skills_arg, **dict(request.skills_params or 
{}))
+
+    def create_agent(self, request: AgentRunRequest) -> Any:
+        """Build a Strands ``Agent`` from *request*."""
+        try:
+            from strands import Agent
+        except ImportError as e:
+            from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+            raise AirflowOptionalProviderFeatureException(
+                "The 'strands-agents' package is required for StrandsHook. "
+                "Install it with: pip install 
'apache-airflow-providers-common-ai[strands]'"
+            ) from e
+
+        native_tools: list[Any] = []
+        if request.toolsets:
+            native_tools = self._resolve_tools(
+                request.toolsets,
+                request.enable_tool_logging,
+                None,  # durable execution is not supported for Strands
+                None,
+            )
+
+        agent_kwargs: dict[str, Any] = dict(request.agent_params or {})
+        if request.instructions:
+            agent_kwargs["system_prompt"] = request.instructions
+
+        plugins: list[Any] = list(agent_kwargs.pop("plugins", []) or [])
+        skills_plugin = self._build_skills_plugin(request)
+        if skills_plugin is not None:
+            plugins.append(skills_plugin)
+        if plugins:
+            agent_kwargs["plugins"] = plugins
+
+        return Agent(model=self.get_model(), tools=native_tools or [], 
**agent_kwargs)
+
+    def run_agent(self, agent: Any, request: AgentRunRequest) -> 
AgentRunResult:
+        """Run the Strands *agent* for *request* and return a normalized 
:class:`AgentRunResult`."""
+        response = agent(request.prompt)
+        return AgentRunResult(
+            output=str(response),
+            model_name=self._resolved_model_id or self.model_id,
+        )
+
+    def test_connection(self) -> tuple[bool, str]:
+        """Validate the connection by instantiating the model (no API call)."""

Review Comment:
   Minor: the "(no API call)" claim isn't guaranteed. `get_model()` calls 
`GeminiModel(**kwargs)`, whose constructor behavior depends on the 
`strands-agents`/`google-genai` SDK version -- some versions probe credentials 
or metadata at construction.
   
   Either drop the parenthetical or pin it: "no remote call as of 
strands-agents X.Y". Otherwise a future SDK bump could quietly turn 
`test_connection` into a network call without anyone noticing.



##########
providers/common/ai/src/airflow/providers/common/ai/operators/agent.py:
##########
@@ -171,6 +181,8 @@ def __init__(
         self.system_prompt = system_prompt
         self.output_type = output_type
         self.toolsets = toolsets
+        self.skills = skills
+        self.skills_params = skills_params or {}

Review Comment:
   `skills_params` was added to `template_fields` (line ~148) but is stored and 
used as a `dict`. Jinja renders a dict to a `str` (e.g. `"{'strict': True}"`), 
not back to a dict, so at execute time `dict(self.skills_params)` either raises 
or yields character pairs.
   
   Options:
   - drop `skills_params` from `template_fields` if templating it isn't a real 
requirement,
   - register a JSON renderer via `template_fields_renderers = 
{"skills_params": "json"}` and pre-parse,
   - or template individual values rather than the whole dict.
   
   Same caveat already applies to the pre-existing `agent_params` -- worth a 
follow-up there.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/strands_ai.py:
##########
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Hooks for LLM agents via the Strands Agents SDK."""
+
+from __future__ import annotations
+
+import functools
+from abc import abstractmethod
+from typing import Any
+
+from airflow.providers.common.ai.hooks.base_ai import (
+    AgentRunRequest,
+    AgentRunResult,
+    BaseAIHook,
+    SkillSpec,
+    ToolSpec,
+)
+
+
+class StrandsHook(BaseAIHook):
+    """
+    Base hook for LLM agents via `Strands Agents 
<https://strandsagents.com/>`__.
+
+    Subclasses implement :meth:`get_model` to return a configured Strands 
model instance
+    (for example :class:`strands.models.gemini.GeminiModel`). The
+    :meth:`create_agent`, :meth:`run_agent`, and :meth:`_tool_spec_to_native`
+    implementations are shared across all Strands model backends.
+    """
+
+    conn_name_attr = "llm_conn_id"
+    default_conn_name = "strands_default"
+
+    supports_toolsets = True
+    supports_durable = False
+    supports_usage_limits = False
+    supports_skills = True
+
+    def __init__(
+        self,
+        llm_conn_id: str | None = None,
+        model_id: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.llm_conn_id = llm_conn_id if llm_conn_id is not None else 
self.default_conn_name
+        self.model_id = model_id
+        self._resolved_model_id: str | None = None
+
+    @abstractmethod
+    def get_model(self) -> Any:
+        """Return a configured Strands model instance."""
+
+    def _tool_spec_to_native(self, spec: ToolSpec) -> Any:
+        """Convert a 
:class:`~airflow.providers.common.ai.hooks.base_ai.ToolSpec` to a Strands 
tool."""
+        try:
+            from strands import tool as strands_tool
+        except ImportError as e:
+            from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+            raise AirflowOptionalProviderFeatureException(
+                "The 'strands-agents' package is required for StrandsHook. "
+                "Install it with: pip install 
'apache-airflow-providers-common-ai[strands]'"
+            ) from e
+
+        fn = spec.fn
+
+        # Strands infers tool name from __name__ and description from __doc__.
+        # functools.wraps preserves __wrapped__ so inspect.signature() follows 
it
+        # for parameter schema inference, then we override name/doc from spec.
+        @functools.wraps(fn)
+        def tool_fn(*args: Any, **kwargs: Any) -> Any:
+            return fn(*args, **kwargs)
+
+        tool_fn.__name__ = spec.name.replace("-", "_")
+        tool_fn.__doc__ = spec.description
+        return strands_tool(tool_fn)
+
+    def _skill_spec_to_native(self, skill: str | SkillSpec) -> Any:
+        """Convert a skill source to a Strands-native skill object or path."""
+        if isinstance(skill, SkillSpec) and not skill.path:

Review Comment:
   Precedence behavior here silently drops fields: when a `SkillSpec` has 
**both** `path` set and inline `name`/`description`/`instructions` set, the 
`not skill.path` branch is skipped and `super()._skill_spec_to_native(skill)` 
returns `skill.path` -- the inline fields are discarded with no warning.
   
   Also, validation only happens inside the Strands branch, so a 
`SkillSpec(name="x")` (missing instructions/description, no path) only fails 
when Strands is the backend. Other hooks via the base path get a different 
error or none.
   
   Recommend moving the mutually-exclusive validation into 
`SkillSpec.__post_init__` so misuse fails fast regardless of backend, and 
explicitly rejecting `path + inline` combinations rather than silently 
preferring one.



##########
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_strands.py:
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAGs demonstrating Strands agent hooks, AgentOperator, and 
skills."""
+
+from __future__ import annotations
+
+from airflow.providers.common.ai.hooks.base_ai import AgentRunRequest, 
BaseAIHook, SkillSpec
+from airflow.providers.common.ai.operators.agent import AgentOperator
+from airflow.providers.common.ai.toolsets.sql import SQLToolset
+from airflow.providers.common.compat.sdk import dag, task
+
+# Requires: pip install 'apache-airflow-providers-common-ai[strands]'
+# Connection: strands_default (conn_type selects backend — e.g. strands-gemini 
for Google Gemini)
+
+# ---------------------------------------------------------------------------
+# 1. Basic AgentOperator with Strands
+# ---------------------------------------------------------------------------
+
+
+# [START howto_operator_strands_basic]
+@dag(schedule=None, tags=["example"])
+def example_strands_basic():
+    AgentOperator(
+        task_id="summarize",
+        prompt="Summarize the key benefits of using Airflow for data 
pipelines.",
+        llm_conn_id="strands_default",
+        system_prompt="You are a concise technical writer.",
+    )
+
+
+# [END howto_operator_strands_basic]
+
+example_strands_basic()
+
+
+# ---------------------------------------------------------------------------
+# 2. AgentOperator with skills (filesystem paths)
+# ---------------------------------------------------------------------------
+
+
+# [START howto_operator_strands_skills]
+@dag(schedule=None, tags=["example"])
+def example_strands_skills():
+    AgentOperator(
+        task_id="process_pdf",
+        prompt="Extract tables from report.pdf and summarize the findings.",
+        llm_conn_id="strands_default",
+        system_prompt="You are a document processing assistant.",
+        skills=["/opt/airflow/skills/pdf-processing"],

Review Comment:
   Hardcoded host paths (`/opt/airflow/skills/pdf-processing` here, 
`/opt/airflow/skills/web-research` further down) won't exist on a stock install 
-- anyone copy-pasting the example will hit a confusing failure at agent build 
time.
   
   Suggest one of: (1) use `Variable.get("strands_skill_path")` so the path is 
configurable, (2) point to a path that ships with the provider (e.g. relative 
to `example_dags/skills/`), or (3) add an inline comment explicitly telling the 
user to place their skill bundle at that path before running the DAG.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to