This is an automated email from the ASF dual-hosted git repository.
vikramkoka pushed a commit to branch aip99-llamaindex-example
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/aip99-llamaindex-example by
this push:
new ba2e9b385ee Update example_llamaindex_rag.py
new c57f894d89e Merge branch 'aip99-llamaindex-example' of
https://github.com/apache/airflow into aip99-llamaindex-example
ba2e9b385ee is described below
commit ba2e9b385eec015886132a1fc31f5543a7bf84f4
Author: Vikram Koka <[email protected]>
AuthorDate: Wed May 27 11:40:57 2026 -0700
Update example_llamaindex_rag.py
reverting changes from last commit
---
.../ai/example_dags/example_llamaindex_rag.py | 90 +++++++++++++---------
1 file changed, 52 insertions(+), 38 deletions(-)
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py
index fe546aa21e4..1f7c0238457 100644
---
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py
+++
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py
@@ -14,26 +14,38 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Example DAGs demonstrating RAG pipelines with LlamaIndex operators."""
+"""Example DAGs demonstrating RAG pipelines with LlamaIndex operators.
+
+Three patterns:
+
+1. Full RAG pipeline -- load -> embed -> retrieve -> answer in one DAG.
+2. Separate index/query DAGs -- production-shaped split (scheduled
+ indexing job + on-demand query DAG).
+3. Multi-source RAG -- combine multiple loaders with source metadata.
+
+The ``LLMOperator`` synthesis step uses a ``pydanticai_default`` connection
+because :class:`~airflow.providers.common.ai.operators.llm.LLMOperator` is
+pydantic-ai-backed; the LlamaIndex operators use ``llamaindex_default``.
+The two connection types are intentional -- they back different frameworks.
+"""
from __future__ import annotations
from airflow.providers.common.ai.operators.document_loader import
DocumentLoaderOperator
-from airflow.providers.common.ai.operators.llamaindex_embedding import
EmbeddingOperator
-from airflow.providers.common.ai.operators.llamaindex_retrieval import
RetrievalOperator
+from airflow.providers.common.ai.operators.llamaindex_embedding import
LlamaIndexEmbeddingOperator
+from airflow.providers.common.ai.operators.llamaindex_retrieval import
LlamaIndexRetrievalOperator
from airflow.providers.common.ai.operators.llm import LLMOperator
from airflow.providers.common.compat.sdk import dag, task
# ---------------------------------------------------------------------------
-# 1. Full RAG pipeline: load → embed → retrieve → answer
+# 1. Full RAG pipeline: load -> embed -> retrieve -> answer
# ---------------------------------------------------------------------------
# [START howto_llamaindex_rag_pipeline]
-@dag
+@dag(schedule=None, tags=["example"])
def example_llamaindex_rag_pipeline():
- """
- End-to-end RAG pipeline.
+ """End-to-end RAG pipeline in a single DAG.
1. Parse local text files into document dicts.
2. Chunk and embed the documents, persisting the index to disk.
@@ -46,20 +58,22 @@ def example_llamaindex_rag_pipeline():
file_extensions=[".txt", ".md", ".pdf"],
)
- embed = EmbeddingOperator(
+ embed = LlamaIndexEmbeddingOperator(
task_id="embed_docs",
- documents="{{ ti.xcom_pull(task_ids='load_docs') }}",
- llm_conn_id="pydanticai_default",
+ documents=load.output,
+ embed_model="text-embedding-3-small",
+ llm_conn_id="llamaindex_default",
chunk_size=512,
chunk_overlap=50,
persist_dir="/opt/airflow/data/indexes/kb_index",
)
- retrieve = RetrievalOperator(
+ retrieve = LlamaIndexRetrievalOperator(
task_id="retrieve",
query="What are the main components of Apache Airflow?",
index_persist_dir="/opt/airflow/data/indexes/kb_index",
- llm_conn_id="pydanticai_default",
+ embed_model="text-embedding-3-small",
+ llm_conn_id="llamaindex_default",
top_k=5,
)
@@ -81,7 +95,7 @@ def example_llamaindex_rag_pipeline():
system_prompt="Answer based only on the provided context. Cite sources
when possible.",
)
- load >> embed >> retrieve >> context >> answer
+ embed >> retrieve >> context >> answer
# [END howto_llamaindex_rag_pipeline]
@@ -90,35 +104,33 @@ example_llamaindex_rag_pipeline()
# ---------------------------------------------------------------------------
-# 2. PDF knowledge base with separate index and query Dags
+# 2. Production-shaped split: scheduled indexing + on-demand query
# ---------------------------------------------------------------------------
# [START howto_llamaindex_index_dag]
-@dag(schedule="@weekly")
+@dag(schedule="@weekly", tags=["example"])
def example_llamaindex_index_pdf():
- """
- Weekly indexing Dag: parse PDFs and build a persistent vector index.
+ """Weekly indexing DAG -- keep the vector index fresh as PDFs arrive.
- Run this on a schedule to keep the index fresh as new documents arrive.
- The query Dag (below) reads the persisted index on demand.
+ The companion query DAG (below) reads the persisted index on demand.
"""
load = DocumentLoaderOperator(
task_id="load_pdfs",
source_path="/opt/airflow/data/reports/*.pdf",
)
- EmbeddingOperator(
+ build_index = LlamaIndexEmbeddingOperator(
task_id="build_index",
- documents="{{ ti.xcom_pull(task_ids='load_pdfs') }}",
- llm_conn_id="pydanticai_default",
+ documents=load.output,
embed_model="text-embedding-3-small",
+ llm_conn_id="llamaindex_default",
chunk_size=1024,
chunk_overlap=100,
persist_dir="/opt/airflow/data/indexes/reports_index",
)
- load >> "build_index"
+ load >> build_index
# [END howto_llamaindex_index_dag]
@@ -130,18 +142,19 @@ example_llamaindex_index_pdf()
@dag(
schedule=None,
params={"question": "Summarize the key findings from the latest quarterly
report."},
+ tags=["example"],
)
def example_llamaindex_query():
- """
- On-demand query Dag: retrieve from a pre-built index and synthesize an
answer.
+ """On-demand query DAG -- retrieve from a pre-built index and synthesize.
Trigger manually or via API with a ``question`` parameter.
"""
- retrieve = RetrievalOperator(
+ retrieve = LlamaIndexRetrievalOperator(
task_id="retrieve",
query="{{ params.question }}",
index_persist_dir="/opt/airflow/data/indexes/reports_index",
- llm_conn_id="pydanticai_default",
+ embed_model="text-embedding-3-small",
+ llm_conn_id="llamaindex_default",
top_k=5,
)
@@ -153,7 +166,7 @@ def example_llamaindex_query():
context = format_context(retrieve.output)
- LLMOperator(
+ synthesize = LLMOperator(
task_id="synthesize",
prompt=(
"Question: {{ params.question }}\n\n"
@@ -167,7 +180,7 @@ def example_llamaindex_query():
),
)
- retrieve >> context >> "synthesize"
+ context >> synthesize
# [END howto_llamaindex_query_dag]
@@ -176,18 +189,18 @@ example_llamaindex_query()
# ---------------------------------------------------------------------------
-# 3. Multi-source RAG: CSV + text files with metadata
+# 3. Multi-source RAG: combine CSV product data with text documentation
# ---------------------------------------------------------------------------
# [START howto_llamaindex_multi_source]
-@dag
+@dag(schedule=None, tags=["example"])
def example_llamaindex_multi_source():
- """
- Multi-source RAG: combine CSV product data with text documentation.
+ """Combine multiple loaders with source-tagging metadata.
- Shows how DocumentLoaderOperator handles different file formats and
- how metadata_fields tags documents by source for filtered retrieval.
+ Shows how ``DocumentLoaderOperator`` handles different file formats and
+ how ``metadata_fields`` tags documents by source for filtered retrieval
+ downstream.
"""
load_products = DocumentLoaderOperator(
task_id="load_products",
@@ -208,14 +221,15 @@ def example_llamaindex_multi_source():
merged = merge_documents(load_products.output, load_docs.output)
- EmbeddingOperator(
+ embed_all = LlamaIndexEmbeddingOperator(
task_id="embed_all",
documents=merged,
- llm_conn_id="pydanticai_default",
+ embed_model="text-embedding-3-small",
+ llm_conn_id="llamaindex_default",
persist_dir="/opt/airflow/data/indexes/multi_source_index",
)
- [load_products, load_docs] >> merged >> "embed_all"
+ embed_all
# [END howto_llamaindex_multi_source]