This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2440d198873 Add `LLMOperator` and `@task.llm` to common.ai provider 
(#62598)
2440d198873 is described below

commit 2440d1988732992ca320d197b05afb263692df13
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Feb 28 03:15:44 2026 +0000

    Add `LLMOperator` and `@task.llm` to common.ai provider (#62598)
    
    
    General-purpose LLM operator and TaskFlow decorator for prompt-based
    tasks (summarization, extraction, classification, structured output).
    The operator wraps PydanticAIHook.create_agent() with support for
    structured output via Pydantic BaseModel and pass-through agent_params
    for pydantic-ai Agent configuration (retries, model_settings, tools).
    
    Adds the "ai" tag to provider.yaml.schema.json for docs grouping.
    
    Co-authored-by: GPK <[email protected]>
    
    * Fix mypy and provider.yaml validation CI failures
    
    - Add type annotation for `agent` variable (mypy var-annotated)
    - Add `how-to-guide` to provider.yaml for llm.rst doc
    
    * Add 'llm' to spelling wordlist
    
    Auto-generated doc index files for example DAGs contain 'llm' in
    plain text where backtick quoting is not possible.
    
    ---------
    
    Co-authored-by: GPK <[email protected]>
---
 airflow-core/src/airflow/provider.yaml.schema.json |   1 +
 docs/spelling_wordlist.txt                         |   1 +
 .../operators-and-hooks-ref/ai.rst                 |  28 +++++
 providers/common/ai/docs/index.rst                 |   1 +
 providers/common/ai/docs/operators/index.rst       |  25 ++++
 providers/common/ai/docs/operators/llm.rst         | 120 +++++++++++++++++++
 providers/common/ai/provider.yaml                  |  16 ++-
 .../providers/common/ai/decorators/__init__.py     |  16 +++
 .../airflow/providers/common/ai/decorators/llm.py  | 133 +++++++++++++++++++++
 .../common/ai/example_dags/example_llm.py          | 116 ++++++++++++++++++
 .../example_dags/example_llm_analysis_pipeline.py  |  81 +++++++++++++
 .../ai/example_dags/example_llm_classification.py  |  53 ++++++++
 .../providers/common/ai/get_provider_info.py       |  16 ++-
 .../providers/common/ai/operators/__init__.py      |  16 +++
 .../airflow/providers/common/ai/operators/llm.py   |  99 +++++++++++++++
 .../ai/tests/unit/common/ai/decorators/__init__.py |  16 +++
 .../ai/tests/unit/common/ai/decorators/test_llm.py |  85 +++++++++++++
 .../ai/tests/unit/common/ai/operators/__init__.py  |  16 +++
 .../ai/tests/unit/common/ai/operators/test_llm.py  |  79 ++++++++++++
 19 files changed, 915 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/provider.yaml.schema.json 
b/airflow-core/src/airflow/provider.yaml.schema.json
index 1fdf1844620..1eb74514015 100644
--- a/airflow-core/src/airflow/provider.yaml.schema.json
+++ b/airflow-core/src/airflow/provider.yaml.schema.json
@@ -89,6 +89,7 @@
                         "items": {
                             "type": "string",
                             "enum": [
+                                "ai",
                                 "alibaba",
                                 "apache",
                                 "aws",
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 6b157586513..015f8d77d7b 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1103,6 +1103,7 @@ LiteralValue
 Liveness
 liveness
 livy
+llm
 loadBalancerIP
 localExecutor
 localexecutor
diff --git a/providers-summary-docs/operators-and-hooks-ref/ai.rst 
b/providers-summary-docs/operators-and-hooks-ref/ai.rst
new file mode 100644
index 00000000000..12868c939e9
--- /dev/null
+++ b/providers-summary-docs/operators-and-hooks-ref/ai.rst
@@ -0,0 +1,28 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+AI integrations
+---------------
+
+AI / LLM
+'''''''''
+
+These integrations allow you to interact with AI and LLM services.
+
+.. operators-hooks-ref::
+   :tags: ai
+   :header-separator: "
diff --git a/providers/common/ai/docs/index.rst 
b/providers/common/ai/docs/index.rst
index 19077bfe183..dbd01f4b3d3 100644
--- a/providers/common/ai/docs/index.rst
+++ b/providers/common/ai/docs/index.rst
@@ -36,6 +36,7 @@
 
     Connection types <connections/pydantic_ai>
     Hooks <hooks/pydantic_ai>
+    Operators <operators/index>
 
 .. toctree::
     :hidden:
diff --git a/providers/common/ai/docs/operators/index.rst 
b/providers/common/ai/docs/operators/index.rst
new file mode 100644
index 00000000000..5ca15266335
--- /dev/null
+++ b/providers/common/ai/docs/operators/index.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Common AI Operators
+===================
+
+.. toctree::
+    :maxdepth: 1
+    :glob:
+
+    *
diff --git a/providers/common/ai/docs/operators/llm.rst 
b/providers/common/ai/docs/operators/llm.rst
new file mode 100644
index 00000000000..21df887b041
--- /dev/null
+++ b/providers/common/ai/docs/operators/llm.rst
@@ -0,0 +1,120 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:llm:
+
+``LLMOperator``
+===============
+
+Use :class:`~airflow.providers.common.ai.operators.llm.LLMOperator` for
+general-purpose LLM calls — summarization, extraction, classification,
+structured output, or any prompt-based task.
+
+The operator sends a prompt to an LLM via
+:class:`~airflow.providers.common.ai.hooks.pydantic_ai.PydanticAIHook` and
+returns the output as XCom.
+
+.. seealso::
+    :ref:`Connection configuration <howto/connection:pydantic_ai>`
+
+Basic Usage
+-----------
+
+Provide a ``prompt`` and the operator returns the LLM's response as a string:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+    :language: python
+    :start-after: [START howto_operator_llm_basic]
+    :end-before: [END howto_operator_llm_basic]
+
+Structured Output
+-----------------
+
+Set ``output_type`` to a Pydantic ``BaseModel`` subclass. The LLM is instructed
+to return structured data, and the result is serialized via ``model_dump()``
+for XCom:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+    :language: python
+    :start-after: [START howto_operator_llm_structured]
+    :end-before: [END howto_operator_llm_structured]
+
+Agent Parameters
+----------------
+
+Pass additional keyword arguments to the pydantic-ai ``Agent`` constructor
+via ``agent_params`` — for example, ``retries``, ``model_settings``, or 
``tools``.
+See the `pydantic-ai Agent docs <https://ai.pydantic.dev/api/agent/>`__ for
+the full list of supported parameters.
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+    :language: python
+    :start-after: [START howto_operator_llm_agent_params]
+    :end-before: [END howto_operator_llm_agent_params]
+
+TaskFlow Decorator
+------------------
+
+The ``@task.llm`` decorator wraps ``LLMOperator``. The function returns the
+prompt string; all other parameters are passed to the operator:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+    :language: python
+    :start-after: [START howto_decorator_llm]
+    :end-before: [END howto_decorator_llm]
+
+With structured output:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+    :language: python
+    :start-after: [START howto_decorator_llm_structured]
+    :end-before: [END howto_decorator_llm_structured]
+
+Classification with ``Literal``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Set ``output_type`` to a ``Literal`` to constrain the LLM to a fixed set of
+labels — useful for classification tasks:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py
+    :language: python
+    :start-after: [START howto_decorator_llm_classification]
+    :end-before: [END howto_decorator_llm_classification]
+
+Multi-task pipeline with dynamic mapping
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Combine ``@task.llm`` with upstream and downstream tasks. Use ``.expand()``
+to process a list of items in parallel:
+
+.. exampleinclude:: 
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py
+    :language: python
+    :start-after: [START howto_decorator_llm_pipeline]
+    :end-before: [END howto_decorator_llm_pipeline]
+
+Parameters
+----------
+
+- ``prompt``: The prompt to send to the LLM (operator) or the return value of 
the
+  decorated function (decorator).
+- ``llm_conn_id``: Airflow connection ID for the LLM provider.
+- ``model_id``: Model identifier (e.g. ``"openai:gpt-5"``). Overrides the 
connection's extra field.
+- ``system_prompt``: System-level instructions for the agent. Supports Jinja 
templating.
+- ``output_type``: Expected output type (default: ``str``). Set to a Pydantic 
``BaseModel``
+  for structured output.
+- ``agent_params``: Additional keyword arguments passed to the pydantic-ai 
``Agent``
+  constructor (e.g. ``retries``, ``model_settings``, ``tools``). Supports 
Jinja templating.
diff --git a/providers/common/ai/provider.yaml 
b/providers/common/ai/provider.yaml
index 247371f289b..e2c4ca8fe60 100644
--- a/providers/common/ai/provider.yaml
+++ b/providers/common/ai/provider.yaml
@@ -29,9 +29,14 @@ versions:
   - 0.0.1
 
 integrations:
+  - integration-name: Common AI
+    external-doc-url: 
https://airflow.apache.org/docs/apache-airflow-providers-common-ai/
+    how-to-guide:
+      - /docs/apache-airflow-providers-common-ai/operators/llm.rst
+    tags: [ai]
   - integration-name: Pydantic AI
     external-doc-url: https://ai.pydantic.dev/
-    tags: [software]
+    tags: [ai]
 
 hooks:
   - integration-name: Pydantic AI
@@ -51,3 +56,12 @@ connection-types:
       placeholders:
         host: "https://api.openai.com/v1 (optional, for custom endpoints)"
         extra: '{"model": "openai:gpt-5"}'
+
+operators:
+  - integration-name: Common AI
+    python-modules:
+      - airflow.providers.common.ai.operators.llm
+
+task-decorators:
+  - class-name: airflow.providers.common.ai.decorators.llm.llm_task
+    name: llm
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/decorators/__init__.py 
b/providers/common/ai/src/airflow/providers/common/ai/decorators/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/decorators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py 
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
new file mode 100644
index 00000000000..04c194aed09
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+TaskFlow decorator for general-purpose LLM calls.
+
+The user writes a function that **returns the prompt string**. The decorator
+handles hook creation, agent configuration, LLM call, and output serialization.
+When ``output_type`` is a Pydantic ``BaseModel``, the result is serialized via
+``model_dump()`` for XCom.
+"""
+
+from __future__ import annotations
+
+from collections.abc import Callable, Collection, Mapping, Sequence
+from typing import TYPE_CHECKING, Any, ClassVar
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.compat.sdk import (
+    DecoratedOperator,
+    TaskDecorator,
+    context_merge,
+    task_decorator_factory,
+)
+from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
+from airflow.utils.operator_helpers import determine_kwargs
+
+if TYPE_CHECKING:
+    from airflow.sdk import Context
+
+
+class _LLMDecoratedOperator(DecoratedOperator, LLMOperator):
+    """
+    Wraps a callable that returns a prompt for a general-purpose LLM call.
+
+    The user function is called at execution time to produce the prompt string.
+    All other parameters (``llm_conn_id``, ``model_id``, ``system_prompt``, 
etc.)
+    are passed through to 
:class:`~airflow.providers.common.ai.operators.llm.LLMOperator`.
+
+    :param python_callable: A reference to a callable that returns the prompt 
string.
+    :param op_args: Positional arguments for the callable.
+    :param op_kwargs: Keyword arguments for the callable.
+    """
+
+    template_fields: Sequence[str] = (
+        *DecoratedOperator.template_fields,
+        *LLMOperator.template_fields,
+    )
+    template_fields_renderers: ClassVar[dict[str, str]] = {
+        **DecoratedOperator.template_fields_renderers,
+    }
+
+    custom_operator_name: str = "@task.llm"
+
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        op_args: Collection[Any] | None = None,
+        op_kwargs: Mapping[str, Any] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(
+            python_callable=python_callable,
+            op_args=op_args,
+            op_kwargs=op_kwargs,
+            prompt=SET_DURING_EXECUTION,
+            **kwargs,
+        )
+
+    def execute(self, context: Context) -> Any:
+        context_merge(context, self.op_kwargs)
+        kwargs = determine_kwargs(self.python_callable, self.op_args, context)
+
+        self.prompt = self.python_callable(*self.op_args, **kwargs)
+
+        if not isinstance(self.prompt, str) or not self.prompt.strip():
+            raise TypeError("The returned value from the @task.llm callable 
must be a non-empty string.")
+
+        self.render_template_fields(context)
+        return LLMOperator.execute(self, context)
+
+
+def llm_task(
+    python_callable: Callable | None = None,
+    **kwargs,
+) -> TaskDecorator:
+    """
+    Wrap a function that returns a prompt into a general-purpose LLM task.
+
+    The function body constructs the prompt (can use Airflow context, XCom, 
etc.).
+    The decorator handles hook creation, agent configuration, LLM call, and 
output
+    serialization.
+
+    Usage::
+
+        @task.llm(
+            llm_conn_id="openai_default",
+            system_prompt="Summarize concisely.",
+        )
+        def summarize(text: str):
+            return f"Summarize this article: {text}"
+
+    With structured output::
+
+        @task.llm(
+            llm_conn_id="openai_default",
+            system_prompt="Extract named entities.",
+            output_type=Entities,
+        )
+        def extract(text: str):
+            return f"Extract entities from: {text}"
+
+    :param python_callable: Function to decorate.
+    """
+    return task_decorator_factory(
+        python_callable=python_callable,
+        decorated_operator_class=_LLMDecoratedOperator,
+        **kwargs,
+    )
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm.py
new file mode 100644
index 00000000000..f0355a59129
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAGs demonstrating LLMOperator and @task.llm usage."""
+
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.compat.sdk import dag, task
+
+
+# [START howto_operator_llm_basic]
+@dag
+def example_llm_operator():
+    LLMOperator(
+        task_id="summarize",
+        prompt="Summarize the key findings from the Q4 earnings report.",
+        llm_conn_id="pydantic_ai_default",
+        system_prompt="You are a financial analyst. Be concise.",
+    )
+
+
+# [END howto_operator_llm_basic]
+
+example_llm_operator()
+
+
+# [START howto_operator_llm_structured]
+@dag
+def example_llm_operator_structured():
+    class Entities(BaseModel):
+        names: list[str]
+        locations: list[str]
+
+    LLMOperator(
+        task_id="extract_entities",
+        prompt="Extract all named entities from the article.",
+        llm_conn_id="pydantic_ai_default",
+        system_prompt="Extract named entities.",
+        output_type=Entities,
+    )
+
+
+# [END howto_operator_llm_structured]
+
+example_llm_operator_structured()
+
+
+# [START howto_operator_llm_agent_params]
+@dag
+def example_llm_operator_agent_params():
+    LLMOperator(
+        task_id="creative_writing",
+        prompt="Write a haiku about data pipelines.",
+        llm_conn_id="pydantic_ai_default",
+        system_prompt="You are a creative writer.",
+        agent_params={"model_settings": {"temperature": 0.9}, "retries": 3},
+    )
+
+
+# [END howto_operator_llm_agent_params]
+
+example_llm_operator_agent_params()
+
+
+# [START howto_decorator_llm]
+@dag
+def example_llm_decorator():
+    @task.llm(llm_conn_id="pydantic_ai_default", system_prompt="Summarize 
concisely.")
+    def summarize(text: str):
+        return f"Summarize this article: {text}"
+
+    summarize("Apache Airflow is a platform for programmatically authoring...")
+
+
+# [END howto_decorator_llm]
+
+example_llm_decorator()
+
+
+# [START howto_decorator_llm_structured]
+@dag
+def example_llm_decorator_structured():
+    class Entities(BaseModel):
+        names: list[str]
+        locations: list[str]
+
+    @task.llm(
+        llm_conn_id="pydantic_ai_default",
+        system_prompt="Extract named entities.",
+        output_type=Entities,
+    )
+    def extract(text: str):
+        return f"Extract entities from: {text}"
+
+    extract("Alice visited Paris and met Bob in London.")
+
+
+# [END howto_decorator_llm_structured]
+
+example_llm_decorator_structured()
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py
new file mode 100644
index 00000000000..941ba802a0d
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAG: triage support tickets with @task.llm, structured output, and 
dynamic task mapping."""
+
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+from airflow.providers.common.compat.sdk import dag, task
+
+
+# [START howto_decorator_llm_pipeline]
+@dag
+def example_llm_analysis_pipeline():
+    class TicketAnalysis(BaseModel):
+        priority: str
+        category: str
+        summary: str
+        suggested_action: str
+
+    @task
+    def get_support_tickets():
+        """Fetch unprocessed support tickets."""
+        return [
+            (
+                "Our nightly ETL pipeline has been failing for the past 3 
days. "
+                "The error shows a connection timeout to the Postgres source 
database. "
+                "This is blocking our daily financial reports."
+            ),
+            (
+                "We'd like to add a new connection type for our internal ML 
model registry. "
+                "Is there documentation on creating custom hooks?"
+            ),
+            (
+                "After upgrading to the latest version, the Grid view takes 
over "
+                "30 seconds to load for DAGs with more than 500 tasks. "
+                "Previously it loaded in under 5 seconds."
+            ),
+        ]
+
+    @task.llm(
+        llm_conn_id="pydantic_ai_default",
+        system_prompt=(
+            "Analyze the support ticket and extract: "
+            "priority (critical/high/medium/low), "
+            "category (bug/feature_request/question/performance), "
+            "a one-sentence summary, and a suggested next action."
+        ),
+        output_type=TicketAnalysis,
+    )
+    def analyze_ticket(ticket: str):
+        return f"Analyze this support ticket:\n\n{ticket}"
+
+    @task
+    def store_results(analyses: list[dict]):
+        """Store ticket analyses. In production, this would write to a 
database or ticketing system."""
+        for analysis in analyses:
+            print(f"[{analysis['priority'].upper()}] {analysis['category']}: 
{analysis['summary']}")
+
+    tickets = get_support_tickets()
+    analyses = analyze_ticket.expand(ticket=tickets)
+    store_results(analyses)
+
+
+# [END howto_decorator_llm_pipeline]
+
+example_llm_analysis_pipeline()
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py
new file mode 100644
index 00000000000..52081cb058b
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAG: classify pipeline incidents by severity using @task.llm with 
Literal output."""
+
+from __future__ import annotations
+
+from typing import Literal
+
+from airflow.providers.common.compat.sdk import dag, task
+
+
+# [START howto_decorator_llm_classification]
+@dag
+def example_llm_classification():
+    @task.llm(
+        llm_conn_id="pydantic_ai_default",
+        system_prompt=(
+            "Classify the severity of the given pipeline incident. "
+            "Use 'critical' for data loss or complete pipeline failure, "
+            "'high' for significant delays or partial failures, "
+            "'medium' for degraded performance, "
+            "'low' for cosmetic issues or minor warnings."
+        ),
+        output_type=Literal["critical", "high", "medium", "low"],
+    )
+    def classify_incident(description: str):
+        # Pre-process the description before sending to the LLM
+        return f"Classify this incident:\n{description.strip()}"
+
+    classify_incident(
+        "Scheduler heartbeat lost for 15 minutes. "
+        "Multiple DAG runs stuck in queued state. "
+        "No new tasks being scheduled across all DAGs."
+    )
+
+
+# [END howto_decorator_llm_classification]
+
+example_llm_classification()
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py 
b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
index ee2a7c03a7f..f7ae5317401 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
@@ -27,11 +27,17 @@ def get_provider_info():
         "name": "Common AI",
         "description": "AI/LLM hooks and operators for Airflow pipelines using 
`pydantic-ai <https://ai.pydantic.dev/>`__.\n",
         "integrations": [
+            {
+                "integration-name": "Common AI",
+                "external-doc-url": 
"https://airflow.apache.org/docs/apache-airflow-providers-common-ai/";,
+                "how-to-guide": 
["/docs/apache-airflow-providers-common-ai/operators/llm.rst"],
+                "tags": ["ai"],
+            },
             {
                 "integration-name": "Pydantic AI",
                 "external-doc-url": "https://ai.pydantic.dev/";,
-                "tags": ["software"],
-            }
+                "tags": ["ai"],
+            },
         ],
         "hooks": [
             {
@@ -53,4 +59,10 @@ def get_provider_info():
                 },
             }
         ],
+        "operators": [
+            {"integration-name": "Common AI", "python-modules": 
["airflow.providers.common.ai.operators.llm"]}
+        ],
+        "task-decorators": [
+            {"class-name": 
"airflow.providers.common.ai.decorators.llm.llm_task", "name": "llm"}
+        ],
     }
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/operators/__init__.py 
b/providers/common/ai/src/airflow/providers/common/ai/operators/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/operators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/operators/llm.py 
b/providers/common/ai/src/airflow/providers/common/ai/operators/llm.py
new file mode 100644
index 00000000000..b7cd69ecf1e
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/operators/llm.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Operator for general-purpose LLM calls."""
+
+from __future__ import annotations
+
+from collections.abc import Sequence
+from functools import cached_property
+from typing import TYPE_CHECKING, Any
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.hooks.pydantic_ai import PydanticAIHook
+from airflow.providers.common.compat.sdk import BaseOperator
+
+if TYPE_CHECKING:
+    from pydantic_ai import Agent
+
+    from airflow.sdk import Context
+
+
+class LLMOperator(BaseOperator):
+    """
+    Call an LLM with a prompt and return the output.
+
+    Uses a 
:class:`~airflow.providers.common.ai.hooks.pydantic_ai.PydanticAIHook`
+    for LLM access. Supports plain string output (default) and structured 
output
+    via a Pydantic ``BaseModel``. When ``output_type`` is a ``BaseModel`` 
subclass,
+    the result is serialized via ``model_dump()`` for XCom.
+
+    :param prompt: The prompt to send to the LLM.
+    :param llm_conn_id: Connection ID for the LLM provider.
+    :param model_id: Model identifier (e.g. ``"openai:gpt-5"``).
+        Overrides the model stored in the connection's extra field.
+    :param system_prompt: System-level instructions for the LLM agent.
+    :param output_type: Expected output type. Default ``str``. Set to a 
Pydantic
+        ``BaseModel`` subclass for structured output.
+    :param agent_params: Additional keyword arguments passed to the pydantic-ai
+        ``Agent`` constructor (e.g. ``retries``, ``model_settings``, 
``tools``).
+        See `pydantic-ai Agent docs <https://ai.pydantic.dev/api/agent/>`__
+        for the full list.
+    """
+
+    template_fields: Sequence[str] = (
+        "prompt",
+        "llm_conn_id",
+        "model_id",
+        "system_prompt",
+        "agent_params",
+    )
+
+    def __init__(
+        self,
+        *,
+        prompt: str,
+        llm_conn_id: str,
+        model_id: str | None = None,
+        system_prompt: str = "",
+        output_type: type = str,
+        agent_params: dict[str, Any] | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.prompt = prompt
+        self.llm_conn_id = llm_conn_id
+        self.model_id = model_id
+        self.system_prompt = system_prompt
+        self.output_type = output_type
+        self.agent_params = agent_params or {}
+
+    @cached_property
+    def llm_hook(self) -> PydanticAIHook:
+        """Return PydanticAIHook for the configured LLM connection."""
+        return PydanticAIHook(llm_conn_id=self.llm_conn_id, 
model_id=self.model_id)
+
+    def execute(self, context: Context) -> Any:
+        agent: Agent[None, Any] = self.llm_hook.create_agent(
+            output_type=self.output_type, instructions=self.system_prompt, 
**self.agent_params
+        )
+        result = agent.run_sync(self.prompt)
+        output = result.output
+
+        if isinstance(output, BaseModel):
+            return output.model_dump()
+        return output
diff --git a/providers/common/ai/tests/unit/common/ai/decorators/__init__.py 
b/providers/common/ai/tests/unit/common/ai/decorators/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/decorators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/common/ai/tests/unit/common/ai/decorators/test_llm.py 
b/providers/common/ai/tests/unit/common/ai/decorators/test_llm.py
new file mode 100644
index 00000000000..d7161dfef1c
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/decorators/test_llm.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from airflow.providers.common.ai.decorators.llm import _LLMDecoratedOperator
+
+
+class TestLLMDecoratedOperator:
+    def test_custom_operator_name(self):
+        assert _LLMDecoratedOperator.custom_operator_name == "@task.llm"
+
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    def test_execute_calls_callable_and_returns_output(self, mock_hook_cls):
+        """The callable's return value becomes the LLM prompt."""
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_result = MagicMock(spec=["output"])
+        mock_result.output = "This is a summary."
+        mock_agent.run_sync.return_value = mock_result
+        mock_hook_cls.return_value.create_agent.return_value = mock_agent
+
+        def my_prompt():
+            return "Summarize this text"
+
+        op = _LLMDecoratedOperator(task_id="test", python_callable=my_prompt, 
llm_conn_id="my_llm")
+        result = op.execute(context={})
+
+        assert result == "This is a summary."
+        assert op.prompt == "Summarize this text"
+        mock_agent.run_sync.assert_called_once_with("Summarize this text")
+
+    @pytest.mark.parametrize(
+        "return_value",
+        [42, "", "   ", None],
+        ids=["non-string", "empty", "whitespace", "none"],
+    )
+    def test_execute_raises_on_invalid_prompt(self, return_value):
+        """TypeError when the callable returns a non-string or blank string."""
+        op = _LLMDecoratedOperator(
+            task_id="test",
+            python_callable=lambda: return_value,
+            llm_conn_id="my_llm",
+        )
+        with pytest.raises(TypeError, match="non-empty string"):
+            op.execute(context={})
+
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    def test_execute_merges_op_kwargs_into_callable(self, mock_hook_cls):
+        """op_kwargs are resolved by the callable to build the prompt."""
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_result = MagicMock(spec=["output"])
+        mock_result.output = "done"
+        mock_agent.run_sync.return_value = mock_result
+        mock_hook_cls.return_value.create_agent.return_value = mock_agent
+
+        def my_prompt(topic):
+            return f"Summarize {topic}"
+
+        op = _LLMDecoratedOperator(
+            task_id="test",
+            python_callable=my_prompt,
+            llm_conn_id="my_llm",
+            op_kwargs={"topic": "quantum computing"},
+        )
+        op.execute(context={"task_instance": MagicMock()})
+
+        assert op.prompt == "Summarize quantum computing"
+        mock_agent.run_sync.assert_called_once_with("Summarize quantum 
computing")
diff --git a/providers/common/ai/tests/unit/common/ai/operators/__init__.py 
b/providers/common/ai/tests/unit/common/ai/operators/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/operators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/common/ai/tests/unit/common/ai/operators/test_llm.py 
b/providers/common/ai/tests/unit/common/ai/operators/test_llm.py
new file mode 100644
index 00000000000..de7b35000af
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/operators/test_llm.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+
+
+class TestLLMOperator:
+    def test_template_fields(self):
+        expected = {"prompt", "llm_conn_id", "model_id", "system_prompt", 
"agent_params"}
+        assert set(LLMOperator.template_fields) == expected
+
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    def test_execute_returns_string_output(self, mock_hook_cls):
+        """Default output_type=str returns the LLM string directly."""
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_result = MagicMock(spec=["output"])
+        mock_result.output = "Paris is the capital of France."
+        mock_agent.run_sync.return_value = mock_result
+        mock_hook_cls.return_value.create_agent.return_value = mock_agent
+
+        op = LLMOperator(task_id="test", prompt="What is the capital of 
France?", llm_conn_id="my_llm")
+        result = op.execute(context=MagicMock())
+
+        assert result == "Paris is the capital of France."
+        mock_agent.run_sync.assert_called_once_with("What is the capital of 
France?")
+        
mock_hook_cls.return_value.create_agent.assert_called_once_with(output_type=str,
 instructions="")
+        mock_hook_cls.assert_called_once_with(llm_conn_id="my_llm", 
model_id=None)
+
+    @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook", 
autospec=True)
+    def test_execute_structured_output_with_all_params(self, mock_hook_cls):
+        """Structured output via model_dump(), with model_id, system_prompt, 
and agent_params."""
+
+        class Entities(BaseModel):
+            names: list[str]
+
+        mock_agent = MagicMock(spec=["run_sync"])
+        mock_result = MagicMock(spec=["output"])
+        mock_result.output = Entities(names=["Alice", "Bob"])
+        mock_agent.run_sync.return_value = mock_result
+        mock_hook_cls.return_value.create_agent.return_value = mock_agent
+
+        op = LLMOperator(
+            task_id="test",
+            prompt="Extract entities",
+            llm_conn_id="my_llm",
+            model_id="openai:gpt-5",
+            system_prompt="You are an extractor.",
+            output_type=Entities,
+            agent_params={"retries": 3, "model_settings": {"temperature": 
0.9}},
+        )
+        result = op.execute(context=MagicMock())
+
+        assert result == {"names": ["Alice", "Bob"]}
+        mock_hook_cls.assert_called_once_with(llm_conn_id="my_llm", 
model_id="openai:gpt-5")
+        mock_hook_cls.return_value.create_agent.assert_called_once_with(
+            output_type=Entities,
+            instructions="You are an extractor.",
+            retries=3,
+            model_settings={"temperature": 0.9},
+        )


Reply via email to