This is an automated email from the ASF dual-hosted git repository. vikramkoka pushed a commit to branch aip99-llamaindex-example in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 614e181452332ba7bbb07cad15a7fbbb2fed459a Author: Vikram Koka <[email protected]> AuthorDate: Tue May 19 16:21:58 2026 +0100 Add example DAGs for LlamaIndex RAG pipelines - Adds `example_llamaindex_rag.py` with three example DAGs demonstrating RAG patterns using the new LlamaIndex operators - **Full RAG pipeline**: DocumentLoaderOperator → EmbeddingOperator → RetrievalOperator → LLMOperator - **Separate index/query DAGs**: weekly PDF indexing DAG + on-demand parameterized query DAG (production pattern) - **Multi-source RAG**: combines CSV and text files with metadata tagging, merges via @task, then embeds ## Dependencies Requires PR #67120 (DocumentLoaderOperator) and PR #67121 (LlamaIndex operators) to merge first. ## Test plan - [ ] Verify DAG file parses without errors after dependency PRs merge - [ ] Verify all three DAGs appear in the Airflow UI - [ ] Test full RAG pipeline end-to-end with sample text files and an OpenAI connection - [ ] Test parameterized query DAG with custom `question` parameter --- .../ai/example_dags/example_llamaindex_rag.py | 223 +++++++++++++++++++++ 1 file changed, 223 insertions(+) diff --git a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py new file mode 100644 index 00000000000..fe546aa21e4 --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py @@ -0,0 +1,223 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example DAGs demonstrating RAG pipelines with LlamaIndex operators.""" + +from __future__ import annotations + +from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator +from airflow.providers.common.ai.operators.llamaindex_embedding import EmbeddingOperator +from airflow.providers.common.ai.operators.llamaindex_retrieval import RetrievalOperator +from airflow.providers.common.ai.operators.llm import LLMOperator +from airflow.providers.common.compat.sdk import dag, task + +# --------------------------------------------------------------------------- +# 1. Full RAG pipeline: load → embed → retrieve → answer +# --------------------------------------------------------------------------- + + +# [START howto_llamaindex_rag_pipeline] +@dag +def example_llamaindex_rag_pipeline(): + """ + End-to-end RAG pipeline. + + 1. Parse local text files into document dicts. + 2. Chunk and embed the documents, persisting the index to disk. + 3. Retrieve relevant chunks for a user question. + 4. Synthesize an answer using the retrieved context. + """ + load = DocumentLoaderOperator( + task_id="load_docs", + source_path="/opt/airflow/data/knowledge_base/", + file_extensions=[".txt", ".md", ".pdf"], + ) + + embed = EmbeddingOperator( + task_id="embed_docs", + documents="{{ ti.xcom_pull(task_ids='load_docs') }}", + llm_conn_id="pydanticai_default", + chunk_size=512, + chunk_overlap=50, + persist_dir="/opt/airflow/data/indexes/kb_index", + ) + + retrieve = RetrievalOperator( + task_id="retrieve", + query="What are the main components of Apache Airflow?", + index_persist_dir="/opt/airflow/data/indexes/kb_index", + llm_conn_id="pydanticai_default", + top_k=5, + ) + + @task + def format_context(retrieval_result: dict) -> str: + chunks = retrieval_result["chunks"] + return "\n\n---\n\n".join(chunk["text"] for chunk in chunks) + + context = format_context(retrieve.output) + + answer = LLMOperator( + task_id="answer", + prompt=( + "Using the context below, answer the question: " + "What are the main components of Apache Airflow?\n\n" + "Context:\n{{ ti.xcom_pull(task_ids='format_context') }}" + ), + llm_conn_id="pydanticai_default", + system_prompt="Answer based only on the provided context. Cite sources when possible.", + ) + + load >> embed >> retrieve >> context >> answer + + +# [END howto_llamaindex_rag_pipeline] + +example_llamaindex_rag_pipeline() + + +# --------------------------------------------------------------------------- +# 2. PDF knowledge base with separate index and query Dags +# --------------------------------------------------------------------------- + + +# [START howto_llamaindex_index_dag] +@dag(schedule="@weekly") +def example_llamaindex_index_pdf(): + """ + Weekly indexing Dag: parse PDFs and build a persistent vector index. + + Run this on a schedule to keep the index fresh as new documents arrive. + The query Dag (below) reads the persisted index on demand. + """ + load = DocumentLoaderOperator( + task_id="load_pdfs", + source_path="/opt/airflow/data/reports/*.pdf", + ) + + EmbeddingOperator( + task_id="build_index", + documents="{{ ti.xcom_pull(task_ids='load_pdfs') }}", + llm_conn_id="pydanticai_default", + embed_model="text-embedding-3-small", + chunk_size=1024, + chunk_overlap=100, + persist_dir="/opt/airflow/data/indexes/reports_index", + ) + + load >> "build_index" + + +# [END howto_llamaindex_index_dag] + +example_llamaindex_index_pdf() + + +# [START howto_llamaindex_query_dag] +@dag( + schedule=None, + params={"question": "Summarize the key findings from the latest quarterly report."}, +) +def example_llamaindex_query(): + """ + On-demand query Dag: retrieve from a pre-built index and synthesize an answer. + + Trigger manually or via API with a ``question`` parameter. + """ + retrieve = RetrievalOperator( + task_id="retrieve", + query="{{ params.question }}", + index_persist_dir="/opt/airflow/data/indexes/reports_index", + llm_conn_id="pydanticai_default", + top_k=5, + ) + + @task + def format_context(retrieval_result: dict) -> str: + chunks = retrieval_result["chunks"] + numbered = [f"[{i + 1}] {chunk['text']}" for i, chunk in enumerate(chunks)] + return "\n\n".join(numbered) + + context = format_context(retrieve.output) + + LLMOperator( + task_id="synthesize", + prompt=( + "Question: {{ params.question }}\n\n" + "Relevant excerpts:\n{{ ti.xcom_pull(task_ids='format_context') }}\n\n" + "Provide a detailed answer with references to the excerpt numbers." + ), + llm_conn_id="pydanticai_default", + system_prompt=( + "You are a research assistant. Answer the question using only the " + "provided excerpts. Reference excerpt numbers in square brackets." + ), + ) + + retrieve >> context >> "synthesize" + + +# [END howto_llamaindex_query_dag] + +example_llamaindex_query() + + +# --------------------------------------------------------------------------- +# 3. Multi-source RAG: CSV + text files with metadata +# --------------------------------------------------------------------------- + + +# [START howto_llamaindex_multi_source] +@dag +def example_llamaindex_multi_source(): + """ + Multi-source RAG: combine CSV product data with text documentation. + + Shows how DocumentLoaderOperator handles different file formats and + how metadata_fields tags documents by source for filtered retrieval. + """ + load_products = DocumentLoaderOperator( + task_id="load_products", + source_path="/opt/airflow/data/products.csv", + metadata_fields={"source": "product_catalog", "department": "engineering"}, + ) + + load_docs = DocumentLoaderOperator( + task_id="load_docs", + source_path="/opt/airflow/data/documentation/", + file_extensions=[".md", ".txt"], + metadata_fields={"source": "documentation"}, + ) + + @task + def merge_documents(products: list[dict], docs: list[dict]) -> list[dict]: + return products + docs + + merged = merge_documents(load_products.output, load_docs.output) + + EmbeddingOperator( + task_id="embed_all", + documents=merged, + llm_conn_id="pydanticai_default", + persist_dir="/opt/airflow/data/indexes/multi_source_index", + ) + + [load_products, load_docs] >> merged >> "embed_all" + + +# [END howto_llamaindex_multi_source] + +example_llamaindex_multi_source()
