This is an automated email from the ASF dual-hosted git repository.

vikramkoka pushed a commit to branch aip99-llamaindex-example
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/aip99-llamaindex-example by 
this push:
     new ba2e9b385ee Update example_llamaindex_rag.py
     new c57f894d89e Merge branch 'aip99-llamaindex-example' of 
https://github.com/apache/airflow into aip99-llamaindex-example
ba2e9b385ee is described below

commit ba2e9b385eec015886132a1fc31f5543a7bf84f4
Author: Vikram Koka <[email protected]>
AuthorDate: Wed May 27 11:40:57 2026 -0700

    Update example_llamaindex_rag.py
    
    reverting changes from last commit
---
 .../ai/example_dags/example_llamaindex_rag.py      | 90 +++++++++++++---------
 1 file changed, 52 insertions(+), 38 deletions(-)

diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py
index fe546aa21e4..1f7c0238457 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_rag.py
@@ -14,26 +14,38 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Example DAGs demonstrating RAG pipelines with LlamaIndex operators."""
+"""Example DAGs demonstrating RAG pipelines with LlamaIndex operators.
+
+Three patterns:
+
+1. Full RAG pipeline -- load -> embed -> retrieve -> answer in one DAG.
+2. Separate index/query DAGs -- production-shaped split (scheduled
+   indexing job + on-demand query DAG).
+3. Multi-source RAG -- combine multiple loaders with source metadata.
+
+The ``LLMOperator`` synthesis step uses a ``pydanticai_default`` connection
+because :class:`~airflow.providers.common.ai.operators.llm.LLMOperator` is
+pydantic-ai-backed; the LlamaIndex operators use ``llamaindex_default``.
+The two connection types are intentional -- they back different frameworks.
+"""
 
 from __future__ import annotations
 
 from airflow.providers.common.ai.operators.document_loader import 
DocumentLoaderOperator
-from airflow.providers.common.ai.operators.llamaindex_embedding import 
EmbeddingOperator
-from airflow.providers.common.ai.operators.llamaindex_retrieval import 
RetrievalOperator
+from airflow.providers.common.ai.operators.llamaindex_embedding import 
LlamaIndexEmbeddingOperator
+from airflow.providers.common.ai.operators.llamaindex_retrieval import 
LlamaIndexRetrievalOperator
 from airflow.providers.common.ai.operators.llm import LLMOperator
 from airflow.providers.common.compat.sdk import dag, task
 
 # ---------------------------------------------------------------------------
-# 1. Full RAG pipeline: load → embed → retrieve → answer
+# 1. Full RAG pipeline: load -> embed -> retrieve -> answer
 # ---------------------------------------------------------------------------
 
 
 # [START howto_llamaindex_rag_pipeline]
-@dag
+@dag(schedule=None, tags=["example"])
 def example_llamaindex_rag_pipeline():
-    """
-    End-to-end RAG pipeline.
+    """End-to-end RAG pipeline in a single DAG.
 
     1. Parse local text files into document dicts.
     2. Chunk and embed the documents, persisting the index to disk.
@@ -46,20 +58,22 @@ def example_llamaindex_rag_pipeline():
         file_extensions=[".txt", ".md", ".pdf"],
     )
 
-    embed = EmbeddingOperator(
+    embed = LlamaIndexEmbeddingOperator(
         task_id="embed_docs",
-        documents="{{ ti.xcom_pull(task_ids='load_docs') }}",
-        llm_conn_id="pydanticai_default",
+        documents=load.output,
+        embed_model="text-embedding-3-small",
+        llm_conn_id="llamaindex_default",
         chunk_size=512,
         chunk_overlap=50,
         persist_dir="/opt/airflow/data/indexes/kb_index",
     )
 
-    retrieve = RetrievalOperator(
+    retrieve = LlamaIndexRetrievalOperator(
         task_id="retrieve",
         query="What are the main components of Apache Airflow?",
         index_persist_dir="/opt/airflow/data/indexes/kb_index",
-        llm_conn_id="pydanticai_default",
+        embed_model="text-embedding-3-small",
+        llm_conn_id="llamaindex_default",
         top_k=5,
     )
 
@@ -81,7 +95,7 @@ def example_llamaindex_rag_pipeline():
         system_prompt="Answer based only on the provided context. Cite sources 
when possible.",
     )
 
-    load >> embed >> retrieve >> context >> answer
+    embed >> retrieve >> context >> answer
 
 
 # [END howto_llamaindex_rag_pipeline]
@@ -90,35 +104,33 @@ example_llamaindex_rag_pipeline()
 
 
 # ---------------------------------------------------------------------------
-# 2. PDF knowledge base with separate index and query Dags
+# 2. Production-shaped split: scheduled indexing + on-demand query
 # ---------------------------------------------------------------------------
 
 
 # [START howto_llamaindex_index_dag]
-@dag(schedule="@weekly")
+@dag(schedule="@weekly", tags=["example"])
 def example_llamaindex_index_pdf():
-    """
-    Weekly indexing Dag: parse PDFs and build a persistent vector index.
+    """Weekly indexing DAG -- keep the vector index fresh as PDFs arrive.
 
-    Run this on a schedule to keep the index fresh as new documents arrive.
-    The query Dag (below) reads the persisted index on demand.
+    The companion query DAG (below) reads the persisted index on demand.
     """
     load = DocumentLoaderOperator(
         task_id="load_pdfs",
         source_path="/opt/airflow/data/reports/*.pdf",
     )
 
-    EmbeddingOperator(
+    build_index = LlamaIndexEmbeddingOperator(
         task_id="build_index",
-        documents="{{ ti.xcom_pull(task_ids='load_pdfs') }}",
-        llm_conn_id="pydanticai_default",
+        documents=load.output,
         embed_model="text-embedding-3-small",
+        llm_conn_id="llamaindex_default",
         chunk_size=1024,
         chunk_overlap=100,
         persist_dir="/opt/airflow/data/indexes/reports_index",
     )
 
-    load >> "build_index"
+    load >> build_index
 
 
 # [END howto_llamaindex_index_dag]
@@ -130,18 +142,19 @@ example_llamaindex_index_pdf()
 @dag(
     schedule=None,
     params={"question": "Summarize the key findings from the latest quarterly 
report."},
+    tags=["example"],
 )
 def example_llamaindex_query():
-    """
-    On-demand query Dag: retrieve from a pre-built index and synthesize an 
answer.
+    """On-demand query DAG -- retrieve from a pre-built index and synthesize.
 
     Trigger manually or via API with a ``question`` parameter.
     """
-    retrieve = RetrievalOperator(
+    retrieve = LlamaIndexRetrievalOperator(
         task_id="retrieve",
         query="{{ params.question }}",
         index_persist_dir="/opt/airflow/data/indexes/reports_index",
-        llm_conn_id="pydanticai_default",
+        embed_model="text-embedding-3-small",
+        llm_conn_id="llamaindex_default",
         top_k=5,
     )
 
@@ -153,7 +166,7 @@ def example_llamaindex_query():
 
     context = format_context(retrieve.output)
 
-    LLMOperator(
+    synthesize = LLMOperator(
         task_id="synthesize",
         prompt=(
             "Question: {{ params.question }}\n\n"
@@ -167,7 +180,7 @@ def example_llamaindex_query():
         ),
     )
 
-    retrieve >> context >> "synthesize"
+    context >> synthesize
 
 
 # [END howto_llamaindex_query_dag]
@@ -176,18 +189,18 @@ example_llamaindex_query()
 
 
 # ---------------------------------------------------------------------------
-# 3. Multi-source RAG: CSV + text files with metadata
+# 3. Multi-source RAG: combine CSV product data with text documentation
 # ---------------------------------------------------------------------------
 
 
 # [START howto_llamaindex_multi_source]
-@dag
+@dag(schedule=None, tags=["example"])
 def example_llamaindex_multi_source():
-    """
-    Multi-source RAG: combine CSV product data with text documentation.
+    """Combine multiple loaders with source-tagging metadata.
 
-    Shows how DocumentLoaderOperator handles different file formats and
-    how metadata_fields tags documents by source for filtered retrieval.
+    Shows how ``DocumentLoaderOperator`` handles different file formats and
+    how ``metadata_fields`` tags documents by source for filtered retrieval
+    downstream.
     """
     load_products = DocumentLoaderOperator(
         task_id="load_products",
@@ -208,14 +221,15 @@ def example_llamaindex_multi_source():
 
     merged = merge_documents(load_products.output, load_docs.output)
 
-    EmbeddingOperator(
+    embed_all = LlamaIndexEmbeddingOperator(
         task_id="embed_all",
         documents=merged,
-        llm_conn_id="pydanticai_default",
+        embed_model="text-embedding-3-small",
+        llm_conn_id="llamaindex_default",
         persist_dir="/opt/airflow/data/indexes/multi_source_index",
     )
 
-    [load_products, load_docs] >> merged >> "embed_all"
+    embed_all
 
 
 # [END howto_llamaindex_multi_source]

Reply via email to