This is an automated email from the ASF dual-hosted git repository.

vikramkoka pushed a commit to branch aip99-llamaindex-example
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fb652fb844be544c948881c9890cb080c4ed4178
Author: Vikram Koka <[email protected]>
AuthorDate: Thu May 28 17:42:04 2026 -0700

    Add SEC 10-K financial analysis example DAG using LlamaIndex for Common.ai 
provider
    
     Two Dags demonstrating a multi-company financial research pipeline
      using real SEC 10-K filings fetched live from the EDGAR public API:
    
      - Indexing DAG (weekly): fetches 10-K filings by stock ticker,
        extracts Risk Factors and MD&A sections, builds per-company
        vector indexes via LlamaIndexEmbeddingOperator with DTM fan-out.
      - Analysis DAG (on-demand): analyst submits tickers and a comparison
        question via HITLEntryOperator, LLM decomposes into company-specific
        sub-questions (N decided at runtime), retrieves from per-company
        indexes via DTM, synthesizes a structured report (AnalysisReport
        Pydantic model with UsageLimits), formats for human review, and
        gates on ApprovalOperator.
    
      Features showcased: Dynamic Task Mapping, structured LLM output,
      UsageLimits, HITL input and approval, live external API integration,
      DAG-level Params.
    
      Default tickers: AAPL, MSFT, UBER, LYFT, AMZN.
---
 .../ai/example_dags/example_llamaindex_10k.py      | 547 +++++++++++++++++++++
 1 file changed, 547 insertions(+)

diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
new file mode 100644
index 00000000000..ce6b5381e14
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
@@ -0,0 +1,547 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+SEC 10-K financial analysis -- LlamaIndex RAG with live SEC EDGAR data.
+
+Two production-shaped Dags that demonstrate a multi-company financial
+research pipeline using real SEC 10-K filings fetched from the EDGAR
+public API: one fetches and indexes filings on a schedule, the other
+decomposes a comparison question at runtime and fans out retrieval via
+Dynamic Task Mapping.
+
+Filings are fetched from the SEC EDGAR public API (free, no auth
+required) using stock ticker symbols.  Any US publicly-traded company
+is supported -- configure via the ``tickers`` Dag parameter.
+
+``example_llamaindex_10k_index`` (weekly schedule):
+
+.. code-block:: text
+
+    fetch_filings (@task, live from SEC EDGAR)
+        -> build_index (LlamaIndexEmbeddingOperator, mapped x N companies)
+
+``example_llamaindex_10k_analysis`` (manual trigger):
+
+.. code-block:: text
+
+    analyst_question  (HITLEntryOperator)
+        -> get_question        (@task)
+        -> get_tickers         (@task)
+        -> decompose_question  (@task.llm, structured output)
+        -> extract_sub_questions (@task)
+        -> build_retrieval_kwargs (@task)
+        -> retrieve             (LlamaIndexRetrievalOperator x N, DTM)
+        -> collect_results      (@task)
+        -> synthesize_report    (LLMOperator, UsageLimits + structured output)
+        -> format_report        (@task, readable text for reviewer)
+        -> review_report        (ApprovalOperator)
+
+**What this makes visible that a notebook hides:**
+
+* The LLM decides how many sub-questions to create -- N is unknown at
+  parse time, determined at runtime via Dynamic Task Mapping.
+* Each retrieval is an independent task instance: if one company's index
+  is unavailable, only that instance retries.
+* The synthesis step's token budget (``UsageLimits``) and output schema
+  (``AnalysisReport``) are auditable in XCom.
+* An analyst reviews the report before it reaches the investment committee.
+
+Before running:
+
+1. Create an LLM connection ``pydanticai_default`` for synthesis and
+   decomposition, and ``llamaindex_default`` for embedding and retrieval.
+2. Update ``EDGAR_USER_AGENT`` with your name and email (SEC requires a
+   descriptive User-Agent header on all EDGAR API requests).
+3. Run ``example_llamaindex_10k_index`` once to fetch filings and build
+   the vector indexes.
+4. Trigger ``example_llamaindex_10k_analysis`` to run the query pipeline.
+"""
+
+from __future__ import annotations
+
+import json
+import re
+import urllib.request
+from datetime import timedelta
+from typing import Any
+
+from pydantic import BaseModel
+from pydantic_ai.usage import UsageLimits
+
+from airflow.providers.common.ai.operators.llamaindex_embedding import 
LlamaIndexEmbeddingOperator
+from airflow.providers.common.ai.operators.llamaindex_retrieval import 
LlamaIndexRetrievalOperator
+from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.compat.sdk import dag, task
+from airflow.providers.standard.operators.hitl import ApprovalOperator, 
HITLEntryOperator
+from airflow.sdk import Param
+
+# ---------------------------------------------------------------------------
+# Configuration
+# ---------------------------------------------------------------------------
+
+LLM_CONN_ID = "pydanticai_default"
+LLAMAINDEX_CONN_ID = "llamaindex_default"
+INDEX_BASE_DIR = "/opt/airflow/data/indexes/10k"
+DEFAULT_TICKERS = "AAPL,MSFT,UBER,LYFT,AMZN"
+
+# SEC EDGAR requires a descriptive User-Agent header with a contact email
+# on every request.  Update this before running in production.
+EDGAR_USER_AGENT = "Apache Airflow Example [email protected]"
+
+# ---------------------------------------------------------------------------
+# Structured output models
+# ---------------------------------------------------------------------------
+
+# [START 10k_structured_output]
+
+
+class SubQuestion(BaseModel):
+    """One sub-question targeting a specific company."""
+
+    sub_question: str
+    ticker: str
+
+
+class DecomposedQuestion(BaseModel):
+    """LLM-produced decomposition of the analyst's question."""
+
+    sub_questions: list[SubQuestion]
+
+
+class AnalysisReport(BaseModel):
+    """Structured financial comparison report."""
+
+    executive_summary: str
+    company_findings: list[dict]
+    key_risks: list[str]
+    recommendations: list[str]
+
+
+# [END 10k_structured_output]
+
+# ---------------------------------------------------------------------------
+# SEC EDGAR helpers
+# ---------------------------------------------------------------------------
+
+
+def _edgar_get_json(url: str) -> Any:
+    """GET a JSON endpoint from SEC EDGAR (public, no auth)."""
+    req = urllib.request.Request(
+        url,
+        headers={"User-Agent": EDGAR_USER_AGENT, "Accept": "application/json"},
+    )
+    with urllib.request.urlopen(req, timeout=30) as resp:
+        return json.loads(resp.read())
+
+
+def _edgar_get_text(url: str) -> str:
+    """GET an HTML/text document from SEC EDGAR."""
+    req = urllib.request.Request(url, headers={"User-Agent": EDGAR_USER_AGENT})
+    with urllib.request.urlopen(req, timeout=60) as resp:
+        return resp.read().decode("utf-8", errors="replace")
+
+
+def _strip_html_tags(html: str) -> str:
+    """Remove HTML tags, returning plain text."""
+    text = re.sub(r"<[^>]+>", " ", html)
+    return re.sub(r"\s+", " ", text).strip()
+
+
+def _resolve_ticker(ticker: str) -> tuple[int, str]:
+    """Look up a company's CIK number and name from its stock ticker.
+
+    Returns (cik, company_name).
+    """
+    data = _edgar_get_json("https://www.sec.gov/files/company_tickers.json";)
+    for entry in data.values():
+        if entry["ticker"].upper() == ticker.upper():
+            return int(entry["cik_str"]), entry["title"]
+    raise ValueError(f"Ticker {ticker!r} not found in SEC EDGAR. Check the 
ticker symbol and try again.")
+
+
+def _find_latest_10k(cik: int) -> tuple[str, str]:
+    """Find the latest 10-K filing URL and date for a given CIK.
+
+    Returns (document_url, filing_date).
+    """
+    padded = str(cik).zfill(10)
+    submissions = 
_edgar_get_json(f"https://data.sec.gov/submissions/CIK{padded}.json";)
+    recent = submissions["filings"]["recent"]
+    for i, form in enumerate(recent["form"]):
+        if form == "10-K":
+            accession = recent["accessionNumber"][i].replace("-", "")
+            primary_doc = recent["primaryDocument"][i]
+            filing_date = recent["filingDate"][i]
+            url = 
f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession}/{primary_doc}";
+            return url, filing_date
+    raise ValueError(f"No 10-K filing found for CIK {padded}")
+
+
+def _extract_filing_sections(full_text: str, ticker: str, company_name: str, 
filing_date: str) -> list[dict]:
+    """Split 10-K text into section-aware documents for embedding.
+
+    Attempts to extract Item 1A (Risk Factors) and Item 7 (MD&A).
+    Falls back to the first 40 000 characters if section markers
+    are not found.
+    """
+    metadata_base = {"ticker": ticker, "company": company_name, "filing_date": 
filing_date}
+    sections = []
+
+    section_patterns = [
+        ("risk_factors", 
r"(?i)(item\s+1a\b.{0,40}risk\s+factors.*?)(?=\bitem\s+1b\b|\bitem\s+2\b)"),
+        ("mda", 
r"(?i)(item\s+7\b.{0,60}management.*?)(?=\bitem\s+7a\b|\bitem\s+8\b)"),
+    ]
+
+    for section_name, pattern in section_patterns:
+        matches = list(re.finditer(pattern, full_text, re.DOTALL))
+        if matches:
+            best = max(matches, key=lambda m: len(m.group(1)))
+            section_text = best.group(1).strip()[:20_000]
+            if len(section_text) > 200:
+                sections.append(
+                    {
+                        "text": section_text,
+                        "metadata": {**metadata_base, "section": section_name},
+                    }
+                )
+
+    if not sections:
+        sections.append(
+            {
+                "text": full_text[:40_000],
+                "metadata": {**metadata_base, "section": "full_filing"},
+            }
+        )
+
+    return sections
+
+
+# ---------------------------------------------------------------------------
+# System prompts
+# ---------------------------------------------------------------------------
+
+DECOMPOSE_SYSTEM_PROMPT = """\
+You are a financial research assistant. Given a comparison question and a \
+list of companies (identified by stock ticker), decompose it into specific \
+sub-questions, one per company. Each sub-question should target information \
+that would be found in the company's 10-K filing (Risk Factors or MD&A \
+sections). Use the exact ticker symbol in the ``ticker`` field."""
+
+SYNTHESIS_SYSTEM_PROMPT = """\
+You are a senior financial analyst. Given retrieval results from multiple \
+companies' 10-K filings, synthesize a structured comparison report. \
+Cite specific data points from the source text. Be precise about which \
+company each finding relates to."""
+
+DEFAULT_QUESTION = (
+    "Compare the risk factors and revenue trends across these companies. "
+    "Which company faces the most concentrated risk and which shows the "
+    "strongest growth trajectory?"
+)
+
+# =========================================================================
+# DAG 1: Fetch and index filings (scheduled)
+# =========================================================================
+
+
+# [START example_llamaindex_10k_index]
+@dag(
+    schedule="@weekly",
+    catchup=False,
+    params={
+        "tickers": Param(
+            DEFAULT_TICKERS,
+            type="string",
+            description="Comma-separated stock tickers to fetch and index 
(e.g. AAPL,MSFT,UBER)",
+        ),
+    },
+    tags=["example", "llamaindex", "10k"],
+)
+def example_llamaindex_10k_index():
+    """
+    Fetch 10-K filings from SEC EDGAR and build per-company vector indexes.
+
+    Runs weekly to refresh indexes when new filings arrive.  Each company
+    gets its own persisted index via Dynamic Task Mapping.
+
+    Task graph::
+
+        fetch_filings (@task, live from SEC EDGAR)
+            -> build_index (LlamaIndexEmbeddingOperator x N companies)
+    """
+
+    # [START 10k_index_dtm]
+    @task
+    def fetch_filings(params: dict) -> list[dict]:
+        tickers = [t.strip().upper() for t in params["tickers"].split(",") if 
t.strip()]
+        result = []
+        for ticker in tickers:
+            cik, company_name = _resolve_ticker(ticker)
+            doc_url, filing_date = _find_latest_10k(cik)
+            html = _edgar_get_text(doc_url)
+            plain_text = _strip_html_tags(html)
+            documents = _extract_filing_sections(plain_text, ticker, 
company_name, filing_date)
+            result.append(
+                {
+                    "documents": documents,
+                    "persist_dir": f"{INDEX_BASE_DIR}/{ticker.lower()}",
+                }
+            )
+        return result
+
+    LlamaIndexEmbeddingOperator.partial(
+        task_id="build_index",
+        embed_model="text-embedding-3-small",
+        llm_conn_id=LLAMAINDEX_CONN_ID,
+        chunk_size=512,
+        chunk_overlap=50,
+    ).expand_kwargs(fetch_filings())
+    # [END 10k_index_dtm]
+
+
+# [END example_llamaindex_10k_index]
+
+example_llamaindex_10k_index()
+
+
+# =========================================================================
+# DAG 2: Analyst query pipeline (on-demand)
+# =========================================================================
+
+
+# [START example_llamaindex_10k_analysis]
+@dag(
+    schedule=None,
+    catchup=False,
+    params={
+        "tickers": Param(
+            DEFAULT_TICKERS,
+            type="string",
+            description="Comma-separated stock tickers (must match indexed 
companies)",
+        ),
+    },
+    tags=["example", "llamaindex", "10k"],
+)
+def example_llamaindex_10k_analysis():
+    """
+    Multi-company financial comparison via LLM-driven sub-question 
decomposition.
+
+    An analyst submits a comparison question.  The LLM decomposes it into
+    company-specific sub-questions (N decided at runtime), each sub-question
+    retrieves from the appropriate company's vector index in parallel via
+    Dynamic Task Mapping, and the results are synthesized into a structured
+    report for human review.
+
+    Task graph::
+
+        analyst_input     (HITLEntryOperator, tickers + question)
+            -> get_question        (@task)
+            -> get_tickers         (@task)
+            -> decompose_question  (@task.llm, structured output)
+            -> extract_sub_questions (@task)
+            -> build_retrieval_kwargs (@task)
+            -> retrieve             (LlamaIndexRetrievalOperator x N, DTM)
+            -> collect_results      (@task)
+            -> synthesize_report    (LLMOperator, UsageLimits + AnalysisReport)
+            -> format_report        (@task, readable text for reviewer)
+            -> review_report        (ApprovalOperator)
+    """
+
+    # ------------------------------------------------------------------
+    # Step 1: Analyst submits the comparison question via HITL.
+    # ------------------------------------------------------------------
+    # [START 10k_hitl_entry]
+    analyst_input = HITLEntryOperator(
+        task_id="analyst_input",
+        subject="Enter a 10-K comparison question",
+        params={
+            "tickers": Param(
+                DEFAULT_TICKERS,
+                type="string",
+                description="Comma-separated stock tickers to compare (must 
match indexed companies)",
+            ),
+            "question": Param(
+                DEFAULT_QUESTION,
+                type="string",
+                description="Financial comparison question across the selected 
companies",
+            ),
+        },
+        response_timeout=timedelta(hours=1),
+    )
+    # [END 10k_hitl_entry]
+
+    @task
+    def get_question(hitl_response: dict) -> str:
+        return hitl_response["params_input"]["question"]
+
+    question = get_question(analyst_input.output)
+
+    @task
+    def get_tickers(hitl_response: dict) -> str:
+        return hitl_response["params_input"]["tickers"]
+
+    tickers = get_tickers(analyst_input.output)
+
+    # ------------------------------------------------------------------
+    # Step 2: LLM decomposes the question into company-specific
+    # sub-questions.  N is decided by the LLM at runtime -- this is the
+    # dynamic adaptation that a static Dag cannot express.
+    # ------------------------------------------------------------------
+    # [START 10k_decompose]
+    @task.llm(
+        llm_conn_id=LLM_CONN_ID,
+        system_prompt=DECOMPOSE_SYSTEM_PROMPT,
+        output_type=DecomposedQuestion,
+    )
+    def decompose_question(question: str, tickers: str) -> str:
+        return (
+            f"Decompose this question into company-specific sub-questions.\n"
+            f"Available companies (by ticker): {tickers}\n\n"
+            f"Question: {question}"
+        )
+
+    decomposed = decompose_question(question, tickers)
+    # [END 10k_decompose]
+
+    @task
+    def extract_sub_questions(decomposed: dict) -> list[dict]:
+        return decomposed["sub_questions"]
+
+    sub_questions = extract_sub_questions(decomposed)
+
+    # ------------------------------------------------------------------
+    # Step 3: Map sub-questions to LlamaIndexRetrievalOperator kwargs.
+    # Each sub-question targets a specific company's pre-built index.
+    # ------------------------------------------------------------------
+    @task
+    def build_retrieval_kwargs(sub_questions: list[dict]) -> list[dict]:
+        return [
+            {
+                "query": sq["sub_question"],
+                "index_persist_dir": 
f"{INDEX_BASE_DIR}/{sq['ticker'].lower()}",
+            }
+            for sq in sub_questions
+        ]
+
+    retrieval_kwargs = build_retrieval_kwargs(sub_questions)
+
+    # ------------------------------------------------------------------
+    # Step 4: Retrieve relevant chunks for each sub-question.
+    # Dynamic Task Mapping fans out one LlamaIndexRetrievalOperator
+    # per sub-question, each targeting the company's vector index.
+    # ------------------------------------------------------------------
+    # [START 10k_dtm_retrieval]
+    retrieval_results = LlamaIndexRetrievalOperator.partial(
+        task_id="retrieve",
+        embed_model="text-embedding-3-small",
+        llm_conn_id=LLAMAINDEX_CONN_ID,
+        top_k=5,
+    ).expand_kwargs(retrieval_kwargs)
+    # [END 10k_dtm_retrieval]
+
+    # ------------------------------------------------------------------
+    # Step 5: Collect all retrieval results into a single context.
+    # Mapped outputs preserve input order, so zip with sub_questions
+    # re-associates each result with its company.
+    # ------------------------------------------------------------------
+    @task
+    def collect_results(sub_questions: list[dict], results: list[dict]) -> str:
+        sections = []
+        for sq, r in zip(sub_questions, results):
+            chunks_text = "\n".join(
+                f"  [{i + 1}] (score {c.get('score') or 0.0:.2f}) {c['text']}"
+                for i, c in enumerate(r["chunks"])
+            )
+            sections.append(f"## {sq['ticker']} -- 
{sq['sub_question']}\n{chunks_text}")
+        return "\n\n".join(sections)
+
+    collected = collect_results(sub_questions, retrieval_results.output)
+
+    # ------------------------------------------------------------------
+    # Step 6: Synthesize a structured comparison report.
+    # UsageLimits caps the token spend; output_type=AnalysisReport
+    # enforces the Pydantic schema on the LLM response.
+    # ------------------------------------------------------------------
+    # [START 10k_synthesis]
+    synthesize = LLMOperator(
+        task_id="synthesize_report",
+        llm_conn_id=LLM_CONN_ID,
+        system_prompt=SYNTHESIS_SYSTEM_PROMPT,
+        prompt="""\
+Synthesize a cross-company financial comparison from these retrieval results.
+Cite specific data points and scores.
+
+{{ ti.xcom_pull(task_ids='collect_results') }}""",
+        output_type=AnalysisReport,
+        usage_limits=UsageLimits(
+            request_limit=10,
+            input_tokens_limit=50_000,
+            output_tokens_limit=16_000,
+        ),
+    )
+    # [END 10k_synthesis]
+    collected >> synthesize
+
+    # ------------------------------------------------------------------
+    # Step 7: Format the structured report into readable text for the
+    # human reviewer.  The LLM produced a dict (via output_type=
+    # AnalysisReport); this task renders it as clean prose.
+    # ------------------------------------------------------------------
+    @task
+    def format_report(report: dict) -> str:
+        lines = [f"# Executive Summary\n\n{report['executive_summary']}"]
+
+        if report.get("company_findings"):
+            lines.append("\n# Company Findings")
+            for finding in report["company_findings"]:
+                company = finding.get("company") or finding.get("ticker", 
"Unknown")
+                lines.append(f"\n## {company}")
+                for key, value in finding.items():
+                    if key not in ("company", "ticker"):
+                        lines.append(f"- **{key}**: {value}")
+
+        if report.get("key_risks"):
+            lines.append("\n# Key Risks")
+            for risk in report["key_risks"]:
+                lines.append(f"- {risk}")
+
+        if report.get("recommendations"):
+            lines.append("\n# Recommendations")
+            for rec in report["recommendations"]:
+                lines.append(f"- {rec}")
+
+        return "\n".join(lines)
+
+    review_body = format_report(synthesize.output)
+
+    # ------------------------------------------------------------------
+    # Step 8: Analyst reviews the report before it reaches the
+    # investment committee.
+    # ------------------------------------------------------------------
+    # [START 10k_hitl_approval]
+    ApprovalOperator(
+        task_id="review_report",
+        subject="Review 10-K comparison report before sharing",
+        body=review_body,
+        response_timeout=timedelta(hours=24),
+    )
+    # [END 10k_hitl_approval]
+
+
+# [END example_llamaindex_10k_analysis]
+
+example_llamaindex_10k_analysis()

Reply via email to