This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2440d198873 Add `LLMOperator` and `@task.llm` to common.ai provider
(#62598)
2440d198873 is described below
commit 2440d1988732992ca320d197b05afb263692df13
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Feb 28 03:15:44 2026 +0000
Add `LLMOperator` and `@task.llm` to common.ai provider (#62598)
General-purpose LLM operator and TaskFlow decorator for prompt-based
tasks (summarization, extraction, classification, structured output).
The operator wraps PydanticAIHook.create_agent() with support for
structured output via Pydantic BaseModel and pass-through agent_params
for pydantic-ai Agent configuration (retries, model_settings, tools).
Adds the "ai" tag to provider.yaml.schema.json for docs grouping.
Co-authored-by: GPK <[email protected]>
* Fix mypy and provider.yaml validation CI failures
- Add type annotation for `agent` variable (mypy var-annotated)
- Add `how-to-guide` to provider.yaml for llm.rst doc
* Add 'llm' to spelling wordlist
Auto-generated doc index files for example DAGs contain 'llm' in
plain text where backtick quoting is not possible.
---------
Co-authored-by: GPK <[email protected]>
---
airflow-core/src/airflow/provider.yaml.schema.json | 1 +
docs/spelling_wordlist.txt | 1 +
.../operators-and-hooks-ref/ai.rst | 28 +++++
providers/common/ai/docs/index.rst | 1 +
providers/common/ai/docs/operators/index.rst | 25 ++++
providers/common/ai/docs/operators/llm.rst | 120 +++++++++++++++++++
providers/common/ai/provider.yaml | 16 ++-
.../providers/common/ai/decorators/__init__.py | 16 +++
.../airflow/providers/common/ai/decorators/llm.py | 133 +++++++++++++++++++++
.../common/ai/example_dags/example_llm.py | 116 ++++++++++++++++++
.../example_dags/example_llm_analysis_pipeline.py | 81 +++++++++++++
.../ai/example_dags/example_llm_classification.py | 53 ++++++++
.../providers/common/ai/get_provider_info.py | 16 ++-
.../providers/common/ai/operators/__init__.py | 16 +++
.../airflow/providers/common/ai/operators/llm.py | 99 +++++++++++++++
.../ai/tests/unit/common/ai/decorators/__init__.py | 16 +++
.../ai/tests/unit/common/ai/decorators/test_llm.py | 85 +++++++++++++
.../ai/tests/unit/common/ai/operators/__init__.py | 16 +++
.../ai/tests/unit/common/ai/operators/test_llm.py | 79 ++++++++++++
19 files changed, 915 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/provider.yaml.schema.json
b/airflow-core/src/airflow/provider.yaml.schema.json
index 1fdf1844620..1eb74514015 100644
--- a/airflow-core/src/airflow/provider.yaml.schema.json
+++ b/airflow-core/src/airflow/provider.yaml.schema.json
@@ -89,6 +89,7 @@
"items": {
"type": "string",
"enum": [
+ "ai",
"alibaba",
"apache",
"aws",
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 6b157586513..015f8d77d7b 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1103,6 +1103,7 @@ LiteralValue
Liveness
liveness
livy
+llm
loadBalancerIP
localExecutor
localexecutor
diff --git a/providers-summary-docs/operators-and-hooks-ref/ai.rst
b/providers-summary-docs/operators-and-hooks-ref/ai.rst
new file mode 100644
index 00000000000..12868c939e9
--- /dev/null
+++ b/providers-summary-docs/operators-and-hooks-ref/ai.rst
@@ -0,0 +1,28 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+AI integrations
+---------------
+
+AI / LLM
+'''''''''
+
+These integrations allow you to interact with AI and LLM services.
+
+.. operators-hooks-ref::
+ :tags: ai
+ :header-separator: "
diff --git a/providers/common/ai/docs/index.rst
b/providers/common/ai/docs/index.rst
index 19077bfe183..dbd01f4b3d3 100644
--- a/providers/common/ai/docs/index.rst
+++ b/providers/common/ai/docs/index.rst
@@ -36,6 +36,7 @@
Connection types <connections/pydantic_ai>
Hooks <hooks/pydantic_ai>
+ Operators <operators/index>
.. toctree::
:hidden:
diff --git a/providers/common/ai/docs/operators/index.rst
b/providers/common/ai/docs/operators/index.rst
new file mode 100644
index 00000000000..5ca15266335
--- /dev/null
+++ b/providers/common/ai/docs/operators/index.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Common AI Operators
+===================
+
+.. toctree::
+ :maxdepth: 1
+ :glob:
+
+ *
diff --git a/providers/common/ai/docs/operators/llm.rst
b/providers/common/ai/docs/operators/llm.rst
new file mode 100644
index 00000000000..21df887b041
--- /dev/null
+++ b/providers/common/ai/docs/operators/llm.rst
@@ -0,0 +1,120 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _howto/operator:llm:
+
+``LLMOperator``
+===============
+
+Use :class:`~airflow.providers.common.ai.operators.llm.LLMOperator` for
+general-purpose LLM calls — summarization, extraction, classification,
+structured output, or any prompt-based task.
+
+The operator sends a prompt to an LLM via
+:class:`~airflow.providers.common.ai.hooks.pydantic_ai.PydanticAIHook` and
+returns the output as XCom.
+
+.. seealso::
+ :ref:`Connection configuration <howto/connection:pydantic_ai>`
+
+Basic Usage
+-----------
+
+Provide a ``prompt`` and the operator returns the LLM's response as a string:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+ :language: python
+ :start-after: [START howto_operator_llm_basic]
+ :end-before: [END howto_operator_llm_basic]
+
+Structured Output
+-----------------
+
+Set ``output_type`` to a Pydantic ``BaseModel`` subclass. The LLM is instructed
+to return structured data, and the result is serialized via ``model_dump()``
+for XCom:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+ :language: python
+ :start-after: [START howto_operator_llm_structured]
+ :end-before: [END howto_operator_llm_structured]
+
+Agent Parameters
+----------------
+
+Pass additional keyword arguments to the pydantic-ai ``Agent`` constructor
+via ``agent_params`` — for example, ``retries``, ``model_settings``, or
``tools``.
+See the `pydantic-ai Agent docs <https://ai.pydantic.dev/api/agent/>`__ for
+the full list of supported parameters.
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+ :language: python
+ :start-after: [START howto_operator_llm_agent_params]
+ :end-before: [END howto_operator_llm_agent_params]
+
+TaskFlow Decorator
+------------------
+
+The ``@task.llm`` decorator wraps ``LLMOperator``. The function returns the
+prompt string; all other parameters are passed to the operator:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+ :language: python
+ :start-after: [START howto_decorator_llm]
+ :end-before: [END howto_decorator_llm]
+
+With structured output:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
+ :language: python
+ :start-after: [START howto_decorator_llm_structured]
+ :end-before: [END howto_decorator_llm_structured]
+
+Classification with ``Literal``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Set ``output_type`` to a ``Literal`` to constrain the LLM to a fixed set of
+labels — useful for classification tasks:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py
+ :language: python
+ :start-after: [START howto_decorator_llm_classification]
+ :end-before: [END howto_decorator_llm_classification]
+
+Multi-task pipeline with dynamic mapping
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Combine ``@task.llm`` with upstream and downstream tasks. Use ``.expand()``
+to process a list of items in parallel:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py
+ :language: python
+ :start-after: [START howto_decorator_llm_pipeline]
+ :end-before: [END howto_decorator_llm_pipeline]
+
+Parameters
+----------
+
+- ``prompt``: The prompt to send to the LLM (operator) or the return value of
the
+ decorated function (decorator).
+- ``llm_conn_id``: Airflow connection ID for the LLM provider.
+- ``model_id``: Model identifier (e.g. ``"openai:gpt-5"``). Overrides the
connection's extra field.
+- ``system_prompt``: System-level instructions for the agent. Supports Jinja
templating.
+- ``output_type``: Expected output type (default: ``str``). Set to a Pydantic
``BaseModel``
+ for structured output.
+- ``agent_params``: Additional keyword arguments passed to the pydantic-ai
``Agent``
+ constructor (e.g. ``retries``, ``model_settings``, ``tools``). Supports
Jinja templating.
diff --git a/providers/common/ai/provider.yaml
b/providers/common/ai/provider.yaml
index 247371f289b..e2c4ca8fe60 100644
--- a/providers/common/ai/provider.yaml
+++ b/providers/common/ai/provider.yaml
@@ -29,9 +29,14 @@ versions:
- 0.0.1
integrations:
+ - integration-name: Common AI
+ external-doc-url:
https://airflow.apache.org/docs/apache-airflow-providers-common-ai/
+ how-to-guide:
+ - /docs/apache-airflow-providers-common-ai/operators/llm.rst
+ tags: [ai]
- integration-name: Pydantic AI
external-doc-url: https://ai.pydantic.dev/
- tags: [software]
+ tags: [ai]
hooks:
- integration-name: Pydantic AI
@@ -51,3 +56,12 @@ connection-types:
placeholders:
host: "https://api.openai.com/v1 (optional, for custom endpoints)"
extra: '{"model": "openai:gpt-5"}'
+
+operators:
+ - integration-name: Common AI
+ python-modules:
+ - airflow.providers.common.ai.operators.llm
+
+task-decorators:
+ - class-name: airflow.providers.common.ai.decorators.llm.llm_task
+ name: llm
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/decorators/__init__.py
b/providers/common/ai/src/airflow/providers/common/ai/decorators/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/decorators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
new file mode 100644
index 00000000000..04c194aed09
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+TaskFlow decorator for general-purpose LLM calls.
+
+The user writes a function that **returns the prompt string**. The decorator
+handles hook creation, agent configuration, LLM call, and output serialization.
+When ``output_type`` is a Pydantic ``BaseModel``, the result is serialized via
+``model_dump()`` for XCom.
+"""
+
+from __future__ import annotations
+
+from collections.abc import Callable, Collection, Mapping, Sequence
+from typing import TYPE_CHECKING, Any, ClassVar
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.compat.sdk import (
+ DecoratedOperator,
+ TaskDecorator,
+ context_merge,
+ task_decorator_factory,
+)
+from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
+from airflow.utils.operator_helpers import determine_kwargs
+
+if TYPE_CHECKING:
+ from airflow.sdk import Context
+
+
+class _LLMDecoratedOperator(DecoratedOperator, LLMOperator):
+ """
+ Wraps a callable that returns a prompt for a general-purpose LLM call.
+
+ The user function is called at execution time to produce the prompt string.
+ All other parameters (``llm_conn_id``, ``model_id``, ``system_prompt``,
etc.)
+ are passed through to
:class:`~airflow.providers.common.ai.operators.llm.LLMOperator`.
+
+ :param python_callable: A reference to a callable that returns the prompt
string.
+ :param op_args: Positional arguments for the callable.
+ :param op_kwargs: Keyword arguments for the callable.
+ """
+
+ template_fields: Sequence[str] = (
+ *DecoratedOperator.template_fields,
+ *LLMOperator.template_fields,
+ )
+ template_fields_renderers: ClassVar[dict[str, str]] = {
+ **DecoratedOperator.template_fields_renderers,
+ }
+
+ custom_operator_name: str = "@task.llm"
+
+ def __init__(
+ self,
+ *,
+ python_callable: Callable,
+ op_args: Collection[Any] | None = None,
+ op_kwargs: Mapping[str, Any] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(
+ python_callable=python_callable,
+ op_args=op_args,
+ op_kwargs=op_kwargs,
+ prompt=SET_DURING_EXECUTION,
+ **kwargs,
+ )
+
+ def execute(self, context: Context) -> Any:
+ context_merge(context, self.op_kwargs)
+ kwargs = determine_kwargs(self.python_callable, self.op_args, context)
+
+ self.prompt = self.python_callable(*self.op_args, **kwargs)
+
+ if not isinstance(self.prompt, str) or not self.prompt.strip():
+ raise TypeError("The returned value from the @task.llm callable
must be a non-empty string.")
+
+ self.render_template_fields(context)
+ return LLMOperator.execute(self, context)
+
+
+def llm_task(
+ python_callable: Callable | None = None,
+ **kwargs,
+) -> TaskDecorator:
+ """
+ Wrap a function that returns a prompt into a general-purpose LLM task.
+
+ The function body constructs the prompt (can use Airflow context, XCom,
etc.).
+ The decorator handles hook creation, agent configuration, LLM call, and
output
+ serialization.
+
+ Usage::
+
+ @task.llm(
+ llm_conn_id="openai_default",
+ system_prompt="Summarize concisely.",
+ )
+ def summarize(text: str):
+ return f"Summarize this article: {text}"
+
+ With structured output::
+
+ @task.llm(
+ llm_conn_id="openai_default",
+ system_prompt="Extract named entities.",
+ output_type=Entities,
+ )
+ def extract(text: str):
+ return f"Extract entities from: {text}"
+
+ :param python_callable: Function to decorate.
+ """
+ return task_decorator_factory(
+ python_callable=python_callable,
+ decorated_operator_class=_LLMDecoratedOperator,
+ **kwargs,
+ )
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm.py
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm.py
new file mode 100644
index 00000000000..f0355a59129
--- /dev/null
+++
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAGs demonstrating LLMOperator and @task.llm usage."""
+
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.compat.sdk import dag, task
+
+
+# [START howto_operator_llm_basic]
+@dag
+def example_llm_operator():
+ LLMOperator(
+ task_id="summarize",
+ prompt="Summarize the key findings from the Q4 earnings report.",
+ llm_conn_id="pydantic_ai_default",
+ system_prompt="You are a financial analyst. Be concise.",
+ )
+
+
+# [END howto_operator_llm_basic]
+
+example_llm_operator()
+
+
+# [START howto_operator_llm_structured]
+@dag
+def example_llm_operator_structured():
+ class Entities(BaseModel):
+ names: list[str]
+ locations: list[str]
+
+ LLMOperator(
+ task_id="extract_entities",
+ prompt="Extract all named entities from the article.",
+ llm_conn_id="pydantic_ai_default",
+ system_prompt="Extract named entities.",
+ output_type=Entities,
+ )
+
+
+# [END howto_operator_llm_structured]
+
+example_llm_operator_structured()
+
+
+# [START howto_operator_llm_agent_params]
+@dag
+def example_llm_operator_agent_params():
+ LLMOperator(
+ task_id="creative_writing",
+ prompt="Write a haiku about data pipelines.",
+ llm_conn_id="pydantic_ai_default",
+ system_prompt="You are a creative writer.",
+ agent_params={"model_settings": {"temperature": 0.9}, "retries": 3},
+ )
+
+
+# [END howto_operator_llm_agent_params]
+
+example_llm_operator_agent_params()
+
+
+# [START howto_decorator_llm]
+@dag
+def example_llm_decorator():
+ @task.llm(llm_conn_id="pydantic_ai_default", system_prompt="Summarize
concisely.")
+ def summarize(text: str):
+ return f"Summarize this article: {text}"
+
+ summarize("Apache Airflow is a platform for programmatically authoring...")
+
+
+# [END howto_decorator_llm]
+
+example_llm_decorator()
+
+
+# [START howto_decorator_llm_structured]
+@dag
+def example_llm_decorator_structured():
+ class Entities(BaseModel):
+ names: list[str]
+ locations: list[str]
+
+ @task.llm(
+ llm_conn_id="pydantic_ai_default",
+ system_prompt="Extract named entities.",
+ output_type=Entities,
+ )
+ def extract(text: str):
+ return f"Extract entities from: {text}"
+
+ extract("Alice visited Paris and met Bob in London.")
+
+
+# [END howto_decorator_llm_structured]
+
+example_llm_decorator_structured()
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py
new file mode 100644
index 00000000000..941ba802a0d
--- /dev/null
+++
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAG: triage support tickets with @task.llm, structured output, and
dynamic task mapping."""
+
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+from airflow.providers.common.compat.sdk import dag, task
+
+
+# [START howto_decorator_llm_pipeline]
+@dag
+def example_llm_analysis_pipeline():
+ class TicketAnalysis(BaseModel):
+ priority: str
+ category: str
+ summary: str
+ suggested_action: str
+
+ @task
+ def get_support_tickets():
+ """Fetch unprocessed support tickets."""
+ return [
+ (
+ "Our nightly ETL pipeline has been failing for the past 3
days. "
+ "The error shows a connection timeout to the Postgres source
database. "
+ "This is blocking our daily financial reports."
+ ),
+ (
+ "We'd like to add a new connection type for our internal ML
model registry. "
+ "Is there documentation on creating custom hooks?"
+ ),
+ (
+ "After upgrading to the latest version, the Grid view takes
over "
+ "30 seconds to load for DAGs with more than 500 tasks. "
+ "Previously it loaded in under 5 seconds."
+ ),
+ ]
+
+ @task.llm(
+ llm_conn_id="pydantic_ai_default",
+ system_prompt=(
+ "Analyze the support ticket and extract: "
+ "priority (critical/high/medium/low), "
+ "category (bug/feature_request/question/performance), "
+ "a one-sentence summary, and a suggested next action."
+ ),
+ output_type=TicketAnalysis,
+ )
+ def analyze_ticket(ticket: str):
+ return f"Analyze this support ticket:\n\n{ticket}"
+
+ @task
+ def store_results(analyses: list[dict]):
+ """Store ticket analyses. In production, this would write to a
database or ticketing system."""
+ for analysis in analyses:
+ print(f"[{analysis['priority'].upper()}] {analysis['category']}:
{analysis['summary']}")
+
+ tickets = get_support_tickets()
+ analyses = analyze_ticket.expand(ticket=tickets)
+ store_results(analyses)
+
+
+# [END howto_decorator_llm_pipeline]
+
+example_llm_analysis_pipeline()
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py
new file mode 100644
index 00000000000..52081cb058b
--- /dev/null
+++
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_classification.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAG: classify pipeline incidents by severity using @task.llm with
Literal output."""
+
+from __future__ import annotations
+
+from typing import Literal
+
+from airflow.providers.common.compat.sdk import dag, task
+
+
+# [START howto_decorator_llm_classification]
+@dag
+def example_llm_classification():
+ @task.llm(
+ llm_conn_id="pydantic_ai_default",
+ system_prompt=(
+ "Classify the severity of the given pipeline incident. "
+ "Use 'critical' for data loss or complete pipeline failure, "
+ "'high' for significant delays or partial failures, "
+ "'medium' for degraded performance, "
+ "'low' for cosmetic issues or minor warnings."
+ ),
+ output_type=Literal["critical", "high", "medium", "low"],
+ )
+ def classify_incident(description: str):
+ # Pre-process the description before sending to the LLM
+ return f"Classify this incident:\n{description.strip()}"
+
+ classify_incident(
+ "Scheduler heartbeat lost for 15 minutes. "
+ "Multiple DAG runs stuck in queued state. "
+ "No new tasks being scheduled across all DAGs."
+ )
+
+
+# [END howto_decorator_llm_classification]
+
+example_llm_classification()
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
index ee2a7c03a7f..f7ae5317401 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
@@ -27,11 +27,17 @@ def get_provider_info():
"name": "Common AI",
"description": "AI/LLM hooks and operators for Airflow pipelines using
`pydantic-ai <https://ai.pydantic.dev/>`__.\n",
"integrations": [
+ {
+ "integration-name": "Common AI",
+ "external-doc-url":
"https://airflow.apache.org/docs/apache-airflow-providers-common-ai/",
+ "how-to-guide":
["/docs/apache-airflow-providers-common-ai/operators/llm.rst"],
+ "tags": ["ai"],
+ },
{
"integration-name": "Pydantic AI",
"external-doc-url": "https://ai.pydantic.dev/",
- "tags": ["software"],
- }
+ "tags": ["ai"],
+ },
],
"hooks": [
{
@@ -53,4 +59,10 @@ def get_provider_info():
},
}
],
+ "operators": [
+ {"integration-name": "Common AI", "python-modules":
["airflow.providers.common.ai.operators.llm"]}
+ ],
+ "task-decorators": [
+ {"class-name":
"airflow.providers.common.ai.decorators.llm.llm_task", "name": "llm"}
+ ],
}
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/operators/__init__.py
b/providers/common/ai/src/airflow/providers/common/ai/operators/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/operators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/operators/llm.py
b/providers/common/ai/src/airflow/providers/common/ai/operators/llm.py
new file mode 100644
index 00000000000..b7cd69ecf1e
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/operators/llm.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Operator for general-purpose LLM calls."""
+
+from __future__ import annotations
+
+from collections.abc import Sequence
+from functools import cached_property
+from typing import TYPE_CHECKING, Any
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.hooks.pydantic_ai import PydanticAIHook
+from airflow.providers.common.compat.sdk import BaseOperator
+
+if TYPE_CHECKING:
+ from pydantic_ai import Agent
+
+ from airflow.sdk import Context
+
+
+class LLMOperator(BaseOperator):
+ """
+ Call an LLM with a prompt and return the output.
+
+ Uses a
:class:`~airflow.providers.common.ai.hooks.pydantic_ai.PydanticAIHook`
+ for LLM access. Supports plain string output (default) and structured
output
+ via a Pydantic ``BaseModel``. When ``output_type`` is a ``BaseModel``
subclass,
+ the result is serialized via ``model_dump()`` for XCom.
+
+ :param prompt: The prompt to send to the LLM.
+ :param llm_conn_id: Connection ID for the LLM provider.
+ :param model_id: Model identifier (e.g. ``"openai:gpt-5"``).
+ Overrides the model stored in the connection's extra field.
+ :param system_prompt: System-level instructions for the LLM agent.
+ :param output_type: Expected output type. Default ``str``. Set to a
Pydantic
+ ``BaseModel`` subclass for structured output.
+ :param agent_params: Additional keyword arguments passed to the pydantic-ai
+ ``Agent`` constructor (e.g. ``retries``, ``model_settings``,
``tools``).
+ See `pydantic-ai Agent docs <https://ai.pydantic.dev/api/agent/>`__
+ for the full list.
+ """
+
+ template_fields: Sequence[str] = (
+ "prompt",
+ "llm_conn_id",
+ "model_id",
+ "system_prompt",
+ "agent_params",
+ )
+
+ def __init__(
+ self,
+ *,
+ prompt: str,
+ llm_conn_id: str,
+ model_id: str | None = None,
+ system_prompt: str = "",
+ output_type: type = str,
+ agent_params: dict[str, Any] | None = None,
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.prompt = prompt
+ self.llm_conn_id = llm_conn_id
+ self.model_id = model_id
+ self.system_prompt = system_prompt
+ self.output_type = output_type
+ self.agent_params = agent_params or {}
+
+ @cached_property
+ def llm_hook(self) -> PydanticAIHook:
+ """Return PydanticAIHook for the configured LLM connection."""
+ return PydanticAIHook(llm_conn_id=self.llm_conn_id,
model_id=self.model_id)
+
+ def execute(self, context: Context) -> Any:
+ agent: Agent[None, Any] = self.llm_hook.create_agent(
+ output_type=self.output_type, instructions=self.system_prompt,
**self.agent_params
+ )
+ result = agent.run_sync(self.prompt)
+ output = result.output
+
+ if isinstance(output, BaseModel):
+ return output.model_dump()
+ return output
diff --git a/providers/common/ai/tests/unit/common/ai/decorators/__init__.py
b/providers/common/ai/tests/unit/common/ai/decorators/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/decorators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/common/ai/tests/unit/common/ai/decorators/test_llm.py
b/providers/common/ai/tests/unit/common/ai/decorators/test_llm.py
new file mode 100644
index 00000000000..d7161dfef1c
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/decorators/test_llm.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from airflow.providers.common.ai.decorators.llm import _LLMDecoratedOperator
+
+
+class TestLLMDecoratedOperator:
+ def test_custom_operator_name(self):
+ assert _LLMDecoratedOperator.custom_operator_name == "@task.llm"
+
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ def test_execute_calls_callable_and_returns_output(self, mock_hook_cls):
+ """The callable's return value becomes the LLM prompt."""
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_result = MagicMock(spec=["output"])
+ mock_result.output = "This is a summary."
+ mock_agent.run_sync.return_value = mock_result
+ mock_hook_cls.return_value.create_agent.return_value = mock_agent
+
+ def my_prompt():
+ return "Summarize this text"
+
+ op = _LLMDecoratedOperator(task_id="test", python_callable=my_prompt,
llm_conn_id="my_llm")
+ result = op.execute(context={})
+
+ assert result == "This is a summary."
+ assert op.prompt == "Summarize this text"
+ mock_agent.run_sync.assert_called_once_with("Summarize this text")
+
+ @pytest.mark.parametrize(
+ "return_value",
+ [42, "", " ", None],
+ ids=["non-string", "empty", "whitespace", "none"],
+ )
+ def test_execute_raises_on_invalid_prompt(self, return_value):
+ """TypeError when the callable returns a non-string or blank string."""
+ op = _LLMDecoratedOperator(
+ task_id="test",
+ python_callable=lambda: return_value,
+ llm_conn_id="my_llm",
+ )
+ with pytest.raises(TypeError, match="non-empty string"):
+ op.execute(context={})
+
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ def test_execute_merges_op_kwargs_into_callable(self, mock_hook_cls):
+ """op_kwargs are resolved by the callable to build the prompt."""
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_result = MagicMock(spec=["output"])
+ mock_result.output = "done"
+ mock_agent.run_sync.return_value = mock_result
+ mock_hook_cls.return_value.create_agent.return_value = mock_agent
+
+ def my_prompt(topic):
+ return f"Summarize {topic}"
+
+ op = _LLMDecoratedOperator(
+ task_id="test",
+ python_callable=my_prompt,
+ llm_conn_id="my_llm",
+ op_kwargs={"topic": "quantum computing"},
+ )
+ op.execute(context={"task_instance": MagicMock()})
+
+ assert op.prompt == "Summarize quantum computing"
+ mock_agent.run_sync.assert_called_once_with("Summarize quantum
computing")
diff --git a/providers/common/ai/tests/unit/common/ai/operators/__init__.py
b/providers/common/ai/tests/unit/common/ai/operators/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/operators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/common/ai/tests/unit/common/ai/operators/test_llm.py
b/providers/common/ai/tests/unit/common/ai/operators/test_llm.py
new file mode 100644
index 00000000000..de7b35000af
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/operators/test_llm.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+
+
+class TestLLMOperator:
+ def test_template_fields(self):
+ expected = {"prompt", "llm_conn_id", "model_id", "system_prompt",
"agent_params"}
+ assert set(LLMOperator.template_fields) == expected
+
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ def test_execute_returns_string_output(self, mock_hook_cls):
+ """Default output_type=str returns the LLM string directly."""
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_result = MagicMock(spec=["output"])
+ mock_result.output = "Paris is the capital of France."
+ mock_agent.run_sync.return_value = mock_result
+ mock_hook_cls.return_value.create_agent.return_value = mock_agent
+
+ op = LLMOperator(task_id="test", prompt="What is the capital of
France?", llm_conn_id="my_llm")
+ result = op.execute(context=MagicMock())
+
+ assert result == "Paris is the capital of France."
+ mock_agent.run_sync.assert_called_once_with("What is the capital of
France?")
+
mock_hook_cls.return_value.create_agent.assert_called_once_with(output_type=str,
instructions="")
+ mock_hook_cls.assert_called_once_with(llm_conn_id="my_llm",
model_id=None)
+
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ def test_execute_structured_output_with_all_params(self, mock_hook_cls):
+ """Structured output via model_dump(), with model_id, system_prompt,
and agent_params."""
+
+ class Entities(BaseModel):
+ names: list[str]
+
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_result = MagicMock(spec=["output"])
+ mock_result.output = Entities(names=["Alice", "Bob"])
+ mock_agent.run_sync.return_value = mock_result
+ mock_hook_cls.return_value.create_agent.return_value = mock_agent
+
+ op = LLMOperator(
+ task_id="test",
+ prompt="Extract entities",
+ llm_conn_id="my_llm",
+ model_id="openai:gpt-5",
+ system_prompt="You are an extractor.",
+ output_type=Entities,
+ agent_params={"retries": 3, "model_settings": {"temperature":
0.9}},
+ )
+ result = op.execute(context=MagicMock())
+
+ assert result == {"names": ["Alice", "Bob"]}
+ mock_hook_cls.assert_called_once_with(llm_conn_id="my_llm",
model_id="openai:gpt-5")
+ mock_hook_cls.return_value.create_agent.assert_called_once_with(
+ output_type=Entities,
+ instructions="You are an extractor.",
+ retries=3,
+ model_settings={"temperature": 0.9},
+ )