This is an automated email from the ASF dual-hosted git repository. vikramkoka pushed a commit to branch aip99-langchain-example in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f52fb93bfd975f2bd17895656913b6479ce47213 Author: Vikram Koka <[email protected]> AuthorDate: Wed May 27 13:03:42 2026 -0700 Add LangChain SEC 10-K financial analysis example DAGs LangChain counterpart to `example_llamaindex_10k.py` — same DAG shape, same sample data, same Pydantic models, different framework underneath. Demonstrates that the framework choice is a swappable implementation detail while Airflow provides orchestration, HITL, and observability. Two DAGs: - `example_langchain_10k_index` (weekly): builds per-company FAISS vector indexes via Dynamic Task Mapping using LangChainHook + RecursiveCharacterTextSplitter + FAISS. - `example_langchain_10k_analysis` (manual trigger): HITL entry → @task.llm sub-question decomposition with structured output → Dynamic Task Mapping fan-out retrieval (one FAISS query per sub-question) → collect → LLMOperator synthesis with UsageLimits and AnalysisReport schema → ApprovalOperator gate. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../ai/example_dags/example_langchain_10k.py | 592 +++++++++++++++++++++ 1 file changed, 592 insertions(+) diff --git a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py new file mode 100644 index 00000000000..05a30df5b19 --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py @@ -0,0 +1,592 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +SEC 10-K financial analysis -- LangChain RAG with Dynamic Task Mapping. + +Two production-shaped Dags that demonstrate a multi-company financial +research pipeline: one builds per-company FAISS vector indexes on a +schedule, the other decomposes a comparison question at runtime and fans +out retrieval via Dynamic Task Mapping. + +This is the LangChain counterpart to ``example_llamaindex_10k.py``. +Both share the same DAG shape (decompose -> fan-out retrieval -> collect +-> synthesize -> approve) and the same sample filing data, demonstrating +that the framework choice is a swappable implementation detail while +Airflow provides the orchestration. + +``example_langchain_10k_index`` (weekly schedule): + +.. code-block:: text + + prepare_filings (@task) + -> build_index (@task, mapped x N companies) + Uses LangChainHook + RecursiveCharacterTextSplitter + FAISS + +``example_langchain_10k_analysis`` (manual trigger): + +.. code-block:: text + + analyst_question (HITLEntryOperator) + -> decompose_question (@task.llm, structured output) + -> extract_sub_questions (@task) + -> build_retrieval_kwargs (@task) + -> retrieve (@task, mapped x N sub-questions) + -> collect_results (@task) + -> synthesize_report (LLMOperator, UsageLimits + structured output) + -> review_report (ApprovalOperator) + +**What this makes visible that a notebook hides:** + +* The LLM decides how many sub-questions to create -- N is unknown at + parse time, determined at runtime via Dynamic Task Mapping. +* Each retrieval is an independent task instance: if one company's index + is unavailable, only that instance retries. +* The synthesis step's token budget (``UsageLimits``) and output schema + (``AnalysisReport``) are auditable in XCom. +* An analyst reviews the report before it reaches the investment committee. + +**LangChain-specific components used:** + +* ``LangChainHook`` -- vendor-agnostic model dispatch via ``init_embeddings`` +* ``RecursiveCharacterTextSplitter`` -- character-based chunking +* ``FAISS`` -- in-process vector store (no external server) + +**Pattern difference from the LlamaIndex counterpart:** + +The LlamaIndex indexing DAG passes document dicts via XCom to +``LlamaIndexEmbeddingOperator``. Here, the indexing DAG passes a company +key and each ``@task`` re-reads from the ``SAMPLE_FILINGS`` global, +because LangChain's ``FAISS.from_documents`` requires ``Document`` +objects that are not JSON-serializable for XCom. In production, both +patterns converge: ``DocumentLoaderOperator`` produces serializable dicts +that either framework can consume. + +Before running: + +1. Install LangChain packages:: + + pip install langchain langchain-openai langchain-text-splitters \\ + langchain-community faiss-cpu + +2. Create an LLM connection ``pydanticai_default`` for synthesis and + decomposition, and ``langchain_default`` for embedding and retrieval. +3. Run ``example_langchain_10k_index`` once to build the sample indexes. +4. Trigger ``example_langchain_10k_analysis`` to run the query pipeline. +""" + +from __future__ import annotations + +from datetime import timedelta + +from pydantic import BaseModel +from pydantic_ai.usage import UsageLimits + +from airflow.providers.common.ai.operators.llm import LLMOperator +from airflow.providers.common.compat.sdk import dag, task +from airflow.providers.standard.operators.hitl import ApprovalOperator, HITLEntryOperator +from airflow.sdk import Param + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +LLM_CONN_ID = "pydanticai_default" +LANGCHAIN_CONN_ID = "langchain_default" +EMBEDDING_MODEL = "openai:text-embedding-3-small" +INDEX_BASE_DIR = "/opt/airflow/data/indexes/10k_langchain" + +COMPANY_LIST = [ + "acme_manufacturing", + "globex_financial", + "initech_software", + "umbrella_biotech", + "stark_energy", +] + +# --------------------------------------------------------------------------- +# Structured output models +# --------------------------------------------------------------------------- + + +class SubQuestion(BaseModel): + """One sub-question targeting a specific company.""" + + sub_question: str + company: str + + +class DecomposedQuestion(BaseModel): + """LLM-produced decomposition of the analyst's question.""" + + sub_questions: list[SubQuestion] + + +class AnalysisReport(BaseModel): + """Structured financial comparison report.""" + + executive_summary: str + company_findings: list[dict] + key_risks: list[str] + recommendations: list[str] + + +# --------------------------------------------------------------------------- +# Sample 10-K filing data (inline, self-contained) +# +# Each company has Risk Factors and MD&A narrative text mimicking real +# SEC 10-K sections. Same data as the LlamaIndex counterpart -- the +# framework-agnostic story is: same input, same DAG shape, different +# framework handling the chunking and retrieval. +# +# For production use, replace with DocumentLoaderOperator pointed at +# actual filings: +# +# DocumentLoaderOperator( +# task_id="load_filings", +# source_path="/data/filings/acme_manufacturing/*.pdf", +# ) +# --------------------------------------------------------------------------- + +SAMPLE_FILINGS: dict[str, list[dict]] = { + "acme_manufacturing": [ + { + "text": ( + "Risk Factors: Acme Manufacturing relies on a global supply chain " + "spanning 14 countries. Disruptions from geopolitical tensions, " + "port congestion, or raw material shortages could materially affect " + "production timelines. The company's largest supplier accounts for " + "23% of total input costs, creating single-source concentration risk. " + "Tariff escalation in key markets remains a persistent threat to " + "operating margins." + ), + "metadata": {"company": "acme_manufacturing", "section": "risk_factors"}, + }, + { + "text": ( + "MD&A: Revenue for fiscal 2025 increased 8% year-over-year to " + "$4.2 billion, driven by industrial automation demand. Gross margins " + "contracted 120 basis points to 34.1% due to elevated steel and " + "semiconductor costs. The company invested $380M in two new " + "manufacturing facilities in Mexico and Vietnam to diversify its " + "production footprint away from single-region concentration." + ), + "metadata": {"company": "acme_manufacturing", "section": "mda"}, + }, + ], + "globex_financial": [ + { + "text": ( + "Risk Factors: Globex Financial operates under regulatory frameworks " + "in 28 jurisdictions. Changes in capital adequacy requirements, " + "anti-money laundering rules, or data residency laws could require " + "significant compliance investment. Interest rate volatility directly " + "impacts the fixed-income portfolio, which represents 62% of assets " + "under management. Cybersecurity threats to trading infrastructure " + "pose systemic risk." + ), + "metadata": {"company": "globex_financial", "section": "risk_factors"}, + }, + { + "text": ( + "MD&A: Total revenue reached $2.8 billion, up 12% year-over-year. " + "Net interest income grew 18% as the rate environment favoured the " + "loan portfolio mix. Fee income from wealth management grew 9%, " + "offsetting a 5% decline in trading revenue due to lower volatility. " + "The efficiency ratio improved to 58.3% from 61.1% through " + "operational automation and branch consolidation." + ), + "metadata": {"company": "globex_financial", "section": "mda"}, + }, + ], + "initech_software": [ + { + "text": ( + "Risk Factors: Initech Software depends on annual subscription " + "renewals for 78% of revenue. Customer churn in the SMB segment " + "increased to 14% as competitors launched lower-cost alternatives. " + "Rapid AI integration into the product suite introduces model " + "reliability and hallucination risks that could erode customer trust. " + "Key-person dependency on the founding engineering team remains high." + ), + "metadata": {"company": "initech_software", "section": "risk_factors"}, + }, + { + "text": ( + "MD&A: ARR surpassed $1.1 billion, a 22% increase. Enterprise " + "segment net retention reached 118%, driven by AI-powered analytics " + "upsells. R&D spend increased to 28% of revenue as the company " + "accelerated LLM integration across the platform. Free cash flow " + "margin expanded to 19% despite the investment increase, reflecting " + "operating leverage in the cloud infrastructure." + ), + "metadata": {"company": "initech_software", "section": "mda"}, + }, + ], + "umbrella_biotech": [ + { + "text": ( + "Risk Factors: Umbrella BioTech's pipeline is concentrated in " + "oncology, with three candidates in Phase II trials. Failure of the " + "lead compound UB-401 would eliminate 45% of projected 2028 revenue. " + "FDA regulatory timelines are unpredictable and recent guidance " + "changes on companion diagnostics may delay approval pathways. " + "Patent cliff on the existing portfolio begins in 2027." + ), + "metadata": {"company": "umbrella_biotech", "section": "risk_factors"}, + }, + { + "text": ( + "MD&A: Total revenue was $890 million, down 3% as the legacy " + "immunology franchise faced biosimilar competition. R&D expenditure " + "rose 31% to $420 million to fund three pivotal oncology trials. " + "The company secured a $500 million licensing deal for UB-401 rights " + "in Asia-Pacific, providing non-dilutive funding through 2027. " + "Cash runway is 34 months at current burn rate." + ), + "metadata": {"company": "umbrella_biotech", "section": "mda"}, + }, + ], + "stark_energy": [ + { + "text": ( + "Risk Factors: Stark Energy's project pipeline depends on federal " + "tax credits (ITC/PTC) that face political uncertainty. Permitting " + "delays for utility-scale solar and wind projects averaged 18 months " + "in 2025, up from 12 months in 2023. Grid interconnection queues " + "exceed 2,500 GW nationally, creating bottlenecks that may strand " + "permitted projects. Lithium and rare-earth supply constraints " + "threaten battery storage deployment timelines." + ), + "metadata": {"company": "stark_energy", "section": "risk_factors"}, + }, + { + "text": ( + "MD&A: Revenue grew 35% to $1.6 billion as 4.2 GW of solar and " + "wind capacity reached commercial operation. EBITDA margins expanded " + "to 28% from 22% as scale effects reduced per-MW installation " + "costs. The company added 8 GWh of contracted battery storage " + "backlog, now totalling 14 GWh. Capital expenditure of $1.1 billion " + "was funded through project finance and a $400 million green bond " + "issuance at 5.2% yield." + ), + "metadata": {"company": "stark_energy", "section": "mda"}, + }, + ], +} + +# --------------------------------------------------------------------------- +# System prompts +# --------------------------------------------------------------------------- + +DECOMPOSE_SYSTEM_PROMPT = """\ +You are a financial research assistant. Given a comparison question and a \ +list of companies, decompose it into specific sub-questions, one per \ +company. Each sub-question should target information that would be found \ +in the company's 10-K filing (Risk Factors or MD&A sections).""" + +SYNTHESIS_SYSTEM_PROMPT = """\ +You are a senior financial analyst. Given retrieval results from multiple \ +companies' 10-K filings, synthesize a structured comparison report. \ +Cite specific data points from the source text. Be precise about which \ +company each finding relates to.""" + +DEFAULT_QUESTION = ( + "Compare the risk factors and revenue trends across all five companies. " + "Which company faces the most concentrated risk and which shows the " + "strongest growth trajectory?" +) + +# ========================================================================= +# DAG 1: Build per-company FAISS indexes (scheduled) +# ========================================================================= + + +# [START example_langchain_10k_index] +@dag(schedule="@weekly", catchup=False, tags=["example", "langchain", "10k"]) +def example_langchain_10k_index(): + """ + Build per-company FAISS vector indexes from 10-K filing text. + + Runs weekly to refresh indexes when new filings arrive. Each company + gets its own persisted FAISS index via Dynamic Task Mapping. + + Uses LangChain's ``RecursiveCharacterTextSplitter`` for chunking and + ``LangChainHook.get_embedding_model()`` for vendor-agnostic embedding + via ``init_embeddings``. + + Task graph:: + + prepare_filings (@task) + -> build_index (@task x N companies) + """ + + # [START 10k_langchain_index_dtm] + @task + def prepare_filings() -> list[dict]: + return [ + { + "company": company, + "persist_dir": f"{INDEX_BASE_DIR}/{company}", + } + for company in COMPANY_LIST + ] + + @task + def build_index(company: str, persist_dir: str) -> dict: + import os + + from langchain_community.vectorstores import FAISS + from langchain_core.documents import Document + from langchain_text_splitters import RecursiveCharacterTextSplitter + + from airflow.providers.common.ai.hooks.langchain import LangChainHook + + hook = LangChainHook( + llm_conn_id=LANGCHAIN_CONN_ID, + embed_model=EMBEDDING_MODEL, + ) + embeddings = hook.get_embedding_model() + + docs = [ + Document(page_content=filing["text"], metadata=filing["metadata"]) + for filing in SAMPLE_FILINGS[company] + ] + + splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50) + chunks = splitter.split_documents(docs) + + vectorstore = FAISS.from_documents(chunks, embeddings) + + os.makedirs(persist_dir, exist_ok=True) + vectorstore.save_local(persist_dir) + + return { + "company": company, + "document_count": len(docs), + "chunk_count": len(chunks), + "persist_dir": persist_dir, + } + + build_index.expand_kwargs(prepare_filings()) + # [END 10k_langchain_index_dtm] + + +# [END example_langchain_10k_index] + +example_langchain_10k_index() + + +# ========================================================================= +# DAG 2: Analyst query pipeline (on-demand) +# ========================================================================= + + +# [START example_langchain_10k_analysis] +@dag(schedule=None, catchup=False, tags=["example", "langchain", "10k"]) +def example_langchain_10k_analysis(): + """ + Multi-company financial comparison via LLM-driven sub-question decomposition. + + An analyst submits a comparison question. The LLM decomposes it into + company-specific sub-questions (N decided at runtime), each sub-question + retrieves from the appropriate company's FAISS index in parallel via + Dynamic Task Mapping, and the results are synthesized into a structured + report for human review. + + Uses ``LangChainHook`` for vendor-agnostic embedding dispatch and + LangChain's ``FAISS.load_local`` / ``similarity_search_with_score`` + for retrieval. + + Task graph:: + + analyst_question (HITLEntryOperator) + -> decompose_question (@task.llm, structured output) + -> extract_sub_questions (@task) + -> build_retrieval_kwargs (@task) + -> retrieve (@task x N, Dynamic Task Mapping) + -> collect_results (@task) + -> synthesize_report (LLMOperator, UsageLimits + AnalysisReport) + -> review_report (ApprovalOperator) + """ + + # ------------------------------------------------------------------ + # Step 1: Analyst submits the comparison question via HITL. + # ------------------------------------------------------------------ + # [START 10k_langchain_hitl_entry] + analyst_question = HITLEntryOperator( + task_id="analyst_question", + subject="Enter a 10-K comparison question", + params={ + "question": Param( + DEFAULT_QUESTION, + type="string", + description="Financial comparison question across the five companies", + ), + }, + response_timeout=timedelta(hours=1), + ) + # [END 10k_langchain_hitl_entry] + + @task + def get_question(hitl_response: dict) -> str: + return hitl_response["params_input"]["question"] + + question = get_question(analyst_question.output) + + # ------------------------------------------------------------------ + # Step 2: LLM decomposes the question into company-specific + # sub-questions. N is decided by the LLM at runtime. + # ------------------------------------------------------------------ + # [START 10k_langchain_decompose] + @task.llm( + llm_conn_id=LLM_CONN_ID, + system_prompt=DECOMPOSE_SYSTEM_PROMPT, + output_type=DecomposedQuestion, + ) + def decompose_question(question: str) -> str: + companies = ", ".join(COMPANY_LIST) + return ( + f"Decompose this question into company-specific sub-questions.\n" + f"Available companies: {companies}\n\n" + f"Question: {question}" + ) + + decomposed = decompose_question(question) + # [END 10k_langchain_decompose] + + @task + def extract_sub_questions(decomposed: dict) -> list[dict]: + return decomposed["sub_questions"] + + sub_questions = extract_sub_questions(decomposed) + + # ------------------------------------------------------------------ + # Step 3: Map sub-questions to retrieval kwargs. + # Each sub-question targets a specific company's FAISS index. + # ------------------------------------------------------------------ + @task + def build_retrieval_kwargs(sub_questions: list[dict]) -> list[dict]: + return [ + { + "query": sq["sub_question"], + "index_dir": f"{INDEX_BASE_DIR}/{sq['company']}", + } + for sq in sub_questions + ] + + retrieval_kwargs = build_retrieval_kwargs(sub_questions) + + # ------------------------------------------------------------------ + # Step 4: Retrieve relevant chunks for each sub-question. + # Dynamic Task Mapping fans out one retrieval task per sub-question, + # each loading the company's FAISS index via LangChainHook. + # ------------------------------------------------------------------ + # [START 10k_langchain_dtm_retrieval] + @task + def retrieve(query: str, index_dir: str) -> dict: + from langchain_community.vectorstores import FAISS + + from airflow.providers.common.ai.hooks.langchain import LangChainHook + + hook = LangChainHook( + llm_conn_id=LANGCHAIN_CONN_ID, + embed_model=EMBEDDING_MODEL, + ) + embeddings = hook.get_embedding_model() + + vectorstore = FAISS.load_local( + index_dir, embeddings, allow_dangerous_deserialization=True + ) + results = vectorstore.similarity_search_with_score(query, k=5) + + # FAISS returns L2 distance (lower = more similar). Normalize to + # 0-1 similarity so the synthesis LLM interprets scores correctly. + return { + "query": query, + "chunks": [ + { + "text": doc.page_content, + "score": round(1.0 / (1.0 + float(score)), 4), + "metadata": doc.metadata, + } + for doc, score in results + ], + } + + retrieval_results = retrieve.expand_kwargs(retrieval_kwargs) + # [END 10k_langchain_dtm_retrieval] + + # ------------------------------------------------------------------ + # Step 5: Collect all retrieval results into a single context. + # Mapped outputs preserve input order, so zip with sub_questions + # re-associates each result with its company. + # ------------------------------------------------------------------ + @task + def collect_results(sub_questions: list[dict], results: list[dict]) -> str: + sections = [] + for sq, r in zip(sub_questions, results): + chunks_text = "\n".join( + f" [{i + 1}] (score {c.get('score') or 0.0:.2f}) {c['text']}" + for i, c in enumerate(r["chunks"]) + ) + sections.append(f"## {sq['company']} -- {sq['sub_question']}\n{chunks_text}") + return "\n\n".join(sections) + + collected = collect_results(sub_questions, retrieval_results) + + # ------------------------------------------------------------------ + # Step 6: Synthesize a structured comparison report. + # UsageLimits caps the token spend; output_type=AnalysisReport + # enforces the Pydantic schema on the LLM response. + # ------------------------------------------------------------------ + # [START 10k_langchain_synthesis] + synthesize = LLMOperator( + task_id="synthesize_report", + llm_conn_id=LLM_CONN_ID, + system_prompt=SYNTHESIS_SYSTEM_PROMPT, + prompt="""\ +Synthesize a cross-company financial comparison from these retrieval results. +Cite specific data points and scores. + +{{ ti.xcom_pull(task_ids='collect_results') }}""", + output_type=AnalysisReport, + usage_limits=UsageLimits( + request_limit=10, + input_tokens_limit=50_000, + output_tokens_limit=8_000, + ), + ) + # [END 10k_langchain_synthesis] + collected >> synthesize + + # ------------------------------------------------------------------ + # Step 7: Analyst reviews the report before it reaches the + # investment committee. + # ------------------------------------------------------------------ + # [START 10k_langchain_hitl_approval] + ApprovalOperator( + task_id="review_report", + subject="Review 10-K comparison report before sharing", + body=synthesize.output, + response_timeout=timedelta(hours=24), + ) + # [END 10k_langchain_hitl_approval] + + +# [END example_langchain_10k_analysis] + +example_langchain_10k_analysis()
