This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 08d7f633437 Fix common.ai 10-K example DAGs for Pydantic XCom output 
change (#67924)
08d7f633437 is described below

commit 08d7f633437caa22fe508c1c141f712af3c87f2a
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Jun 2 23:56:54 2026 +0100

    Fix common.ai 10-K example DAGs for Pydantic XCom output change (#67924)
    
    Commit 9318bd6250 (#67644) stopped the common.ai LLM operators from
    calling model_dump() on Pydantic output_type results before pushing to
    XCom, so downstream tasks now receive the model instance instead of a
    dict. The example_langchain_10k and example_llamaindex_10k DAGs still
    subscripted the result as a dict and broke with a TypeError on current
    main. Switch their consumer tasks to attribute access and type the
    parameters with the model classes.
---
 .../ai/example_dags/example_langchain_10k.py       | 36 +++++++++++-----------
 .../ai/example_dags/example_llamaindex_10k.py      | 34 ++++++++++----------
 2 files changed, 35 insertions(+), 35 deletions(-)

diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py
index d7c7b8a4f6c..e807fe47670 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py
@@ -461,8 +461,8 @@ def example_langchain_10k_analysis():
     # [END 10k_langchain_decompose]
 
     @task
-    def extract_sub_questions(decomposed: dict) -> list[dict]:
-        return decomposed["sub_questions"]
+    def extract_sub_questions(decomposed: DecomposedQuestion) -> 
list[SubQuestion]:
+        return decomposed.sub_questions
 
     sub_questions = extract_sub_questions(decomposed)
 
@@ -471,12 +471,12 @@ def example_langchain_10k_analysis():
     # Each sub-question targets a specific company's FAISS index.
     # ------------------------------------------------------------------
     @task
-    def build_retrieval_kwargs(sub_questions: list[dict]) -> list[dict]:
+    def build_retrieval_kwargs(sub_questions: list[SubQuestion]) -> list[dict]:
         return [
             {
-                "query": sq["sub_question"],
-                "ticker": sq["ticker"],
-                "index_dir": f"{INDEX_BASE_DIR}/{sq['ticker'].lower()}",
+                "query": sq.sub_question,
+                "ticker": sq.ticker,
+                "index_dir": f"{INDEX_BASE_DIR}/{sq.ticker.lower()}",
             }
             for sq in sub_questions
         ]
@@ -523,13 +523,13 @@ def example_langchain_10k_analysis():
     # Step 5: Collect all retrieval results into a single context.
     # ------------------------------------------------------------------
     @task
-    def collect_results(sub_questions: list[dict], results: list[dict]) -> str:
+    def collect_results(sub_questions: list[SubQuestion], results: list[dict]) 
-> str:
         sections = []
         for sq, r in zip(sub_questions, results):
             chunks_text = "\n".join(
                 f"  [{i + 1}] (score {c['score']:.2f}) {c['text']}" for i, c 
in enumerate(r["chunks"])
             )
-            sections.append(f"## {sq['ticker']} -- 
{sq['sub_question']}\n{chunks_text}")
+            sections.append(f"## {sq.ticker} -- 
{sq.sub_question}\n{chunks_text}")
         return "\n\n".join(sections)
 
     collected = collect_results(sub_questions, retrieval_results)
@@ -561,30 +561,30 @@ Cite specific data points and scores.
 
     # ------------------------------------------------------------------
     # Step 7: Format the structured report into readable text for the
-    # human reviewer.  The LLM produced a dict (via output_type=
-    # AnalysisReport); this task renders it as clean prose.
+    # human reviewer.  The LLM produced an AnalysisReport instance (via
+    # output_type=AnalysisReport); this task renders it as clean prose.
     # ------------------------------------------------------------------
     @task
-    def format_report(report: dict) -> str:
-        lines = [f"# Executive Summary\n\n{report['executive_summary']}"]
+    def format_report(report: AnalysisReport) -> str:
+        lines = [f"# Executive Summary\n\n{report.executive_summary}"]
 
-        if report.get("company_findings"):
+        if report.company_findings:
             lines.append("\n# Company Findings")
-            for finding in report["company_findings"]:
+            for finding in report.company_findings:
                 company = finding.get("company") or finding.get("ticker", 
"Unknown")
                 lines.append(f"\n## {company}")
                 for key, value in finding.items():
                     if key not in ("company", "ticker"):
                         lines.append(f"- **{key}**: {value}")
 
-        if report.get("key_risks"):
+        if report.key_risks:
             lines.append("\n# Key Risks")
-            for risk in report["key_risks"]:
+            for risk in report.key_risks:
                 lines.append(f"- {risk}")
 
-        if report.get("recommendations"):
+        if report.recommendations:
             lines.append("\n# Recommendations")
-            for rec in report["recommendations"]:
+            for rec in report.recommendations:
                 lines.append(f"- {rec}")
 
         return "\n".join(lines)
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
index ce6b5381e14..1570c9f252e 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
@@ -418,8 +418,8 @@ def example_llamaindex_10k_analysis():
     # [END 10k_decompose]
 
     @task
-    def extract_sub_questions(decomposed: dict) -> list[dict]:
-        return decomposed["sub_questions"]
+    def extract_sub_questions(decomposed: DecomposedQuestion) -> 
list[SubQuestion]:
+        return decomposed.sub_questions
 
     sub_questions = extract_sub_questions(decomposed)
 
@@ -428,11 +428,11 @@ def example_llamaindex_10k_analysis():
     # Each sub-question targets a specific company's pre-built index.
     # ------------------------------------------------------------------
     @task
-    def build_retrieval_kwargs(sub_questions: list[dict]) -> list[dict]:
+    def build_retrieval_kwargs(sub_questions: list[SubQuestion]) -> list[dict]:
         return [
             {
-                "query": sq["sub_question"],
-                "index_persist_dir": 
f"{INDEX_BASE_DIR}/{sq['ticker'].lower()}",
+                "query": sq.sub_question,
+                "index_persist_dir": f"{INDEX_BASE_DIR}/{sq.ticker.lower()}",
             }
             for sq in sub_questions
         ]
@@ -459,14 +459,14 @@ def example_llamaindex_10k_analysis():
     # re-associates each result with its company.
     # ------------------------------------------------------------------
     @task
-    def collect_results(sub_questions: list[dict], results: list[dict]) -> str:
+    def collect_results(sub_questions: list[SubQuestion], results: list[dict]) 
-> str:
         sections = []
         for sq, r in zip(sub_questions, results):
             chunks_text = "\n".join(
                 f"  [{i + 1}] (score {c.get('score') or 0.0:.2f}) {c['text']}"
                 for i, c in enumerate(r["chunks"])
             )
-            sections.append(f"## {sq['ticker']} -- 
{sq['sub_question']}\n{chunks_text}")
+            sections.append(f"## {sq.ticker} -- 
{sq.sub_question}\n{chunks_text}")
         return "\n\n".join(sections)
 
     collected = collect_results(sub_questions, retrieval_results.output)
@@ -498,30 +498,30 @@ Cite specific data points and scores.
 
     # ------------------------------------------------------------------
     # Step 7: Format the structured report into readable text for the
-    # human reviewer.  The LLM produced a dict (via output_type=
-    # AnalysisReport); this task renders it as clean prose.
+    # human reviewer.  The LLM produced an AnalysisReport instance (via
+    # output_type=AnalysisReport); this task renders it as clean prose.
     # ------------------------------------------------------------------
     @task
-    def format_report(report: dict) -> str:
-        lines = [f"# Executive Summary\n\n{report['executive_summary']}"]
+    def format_report(report: AnalysisReport) -> str:
+        lines = [f"# Executive Summary\n\n{report.executive_summary}"]
 
-        if report.get("company_findings"):
+        if report.company_findings:
             lines.append("\n# Company Findings")
-            for finding in report["company_findings"]:
+            for finding in report.company_findings:
                 company = finding.get("company") or finding.get("ticker", 
"Unknown")
                 lines.append(f"\n## {company}")
                 for key, value in finding.items():
                     if key not in ("company", "ticker"):
                         lines.append(f"- **{key}**: {value}")
 
-        if report.get("key_risks"):
+        if report.key_risks:
             lines.append("\n# Key Risks")
-            for risk in report["key_risks"]:
+            for risk in report.key_risks:
                 lines.append(f"- {risk}")
 
-        if report.get("recommendations"):
+        if report.recommendations:
             lines.append("\n# Recommendations")
-            for rec in report["recommendations"]:
+            for rec in report.recommendations:
                 lines.append(f"- {rec}")
 
         return "\n".join(lines)

Reply via email to