This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1a1b131fd58 Fix XCom deserialization of Pydantic models in LangChain
10-K example (#67930)
1a1b131fd58 is described below
commit 1a1b131fd587c3182c8bda84e3470dcb8169066e
Author: Vikram Koka <[email protected]>
AuthorDate: Tue Jun 2 20:08:05 2026 -0700
Fix XCom deserialization of Pydantic models in LangChain 10-K example
(#67930)
---
.../ai/example_dags/example_langchain_10k.py | 40 ++++++++++++----------
.../ai/example_dags/example_llamaindex_10k.py | 38 +++++++++++---------
2 files changed, 43 insertions(+), 35 deletions(-)
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py
index e807fe47670..3a7b99556a1 100644
---
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py
+++
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py
@@ -449,6 +449,9 @@ def example_langchain_10k_analysis():
llm_conn_id=LLM_CONN_ID,
system_prompt=DECOMPOSE_SYSTEM_PROMPT,
output_type=DecomposedQuestion,
+ # Push the structured output to XCom as a dict so the example runs on
+ # every supported Airflow version (the model-instance form needs 3.3+).
+ serialize_output=True,
)
def decompose_question(question: str, tickers: str) -> str:
return (
@@ -461,8 +464,8 @@ def example_langchain_10k_analysis():
# [END 10k_langchain_decompose]
@task
- def extract_sub_questions(decomposed: DecomposedQuestion) ->
list[SubQuestion]:
- return decomposed.sub_questions
+ def extract_sub_questions(decomposed: dict) -> list[dict]:
+ return decomposed["sub_questions"]
sub_questions = extract_sub_questions(decomposed)
@@ -471,12 +474,12 @@ def example_langchain_10k_analysis():
# Each sub-question targets a specific company's FAISS index.
# ------------------------------------------------------------------
@task
- def build_retrieval_kwargs(sub_questions: list[SubQuestion]) -> list[dict]:
+ def build_retrieval_kwargs(sub_questions: list[dict]) -> list[dict]:
return [
{
- "query": sq.sub_question,
- "ticker": sq.ticker,
- "index_dir": f"{INDEX_BASE_DIR}/{sq.ticker.lower()}",
+ "query": sq["sub_question"],
+ "ticker": sq["ticker"],
+ "index_dir": f"{INDEX_BASE_DIR}/{sq['ticker'].lower()}",
}
for sq in sub_questions
]
@@ -523,13 +526,13 @@ def example_langchain_10k_analysis():
# Step 5: Collect all retrieval results into a single context.
# ------------------------------------------------------------------
@task
- def collect_results(sub_questions: list[SubQuestion], results: list[dict])
-> str:
+ def collect_results(sub_questions: list[dict], results: list[dict]) -> str:
sections = []
for sq, r in zip(sub_questions, results):
chunks_text = "\n".join(
f" [{i + 1}] (score {c['score']:.2f}) {c['text']}" for i, c
in enumerate(r["chunks"])
)
- sections.append(f"## {sq.ticker} --
{sq.sub_question}\n{chunks_text}")
+ sections.append(f"## {sq['ticker']} --
{sq['sub_question']}\n{chunks_text}")
return "\n\n".join(sections)
collected = collect_results(sub_questions, retrieval_results)
@@ -550,6 +553,7 @@ Cite specific data points and scores.
{{ ti.xcom_pull(task_ids='collect_results') }}""",
output_type=AnalysisReport,
+ serialize_output=True,
usage_limits=UsageLimits(
request_limit=10,
input_tokens_limit=50_000,
@@ -561,30 +565,30 @@ Cite specific data points and scores.
# ------------------------------------------------------------------
# Step 7: Format the structured report into readable text for the
- # human reviewer. The LLM produced an AnalysisReport instance (via
- # output_type=AnalysisReport); this task renders it as clean prose.
+ # human reviewer. The LLM produced a dict (via output_type=
+ # AnalysisReport with serialize_output); this task renders it as clean
prose.
# ------------------------------------------------------------------
@task
- def format_report(report: AnalysisReport) -> str:
- lines = [f"# Executive Summary\n\n{report.executive_summary}"]
+ def format_report(report: dict) -> str:
+ lines = [f"# Executive Summary\n\n{report['executive_summary']}"]
- if report.company_findings:
+ if report.get("company_findings"):
lines.append("\n# Company Findings")
- for finding in report.company_findings:
+ for finding in report["company_findings"]:
company = finding.get("company") or finding.get("ticker",
"Unknown")
lines.append(f"\n## {company}")
for key, value in finding.items():
if key not in ("company", "ticker"):
lines.append(f"- **{key}**: {value}")
- if report.key_risks:
+ if report.get("key_risks"):
lines.append("\n# Key Risks")
- for risk in report.key_risks:
+ for risk in report["key_risks"]:
lines.append(f"- {risk}")
- if report.recommendations:
+ if report.get("recommendations"):
lines.append("\n# Recommendations")
- for rec in report.recommendations:
+ for rec in report["recommendations"]:
lines.append(f"- {rec}")
return "\n".join(lines)
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
index 1570c9f252e..4142ac0bbd3 100644
---
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
+++
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_10k.py
@@ -406,6 +406,9 @@ def example_llamaindex_10k_analysis():
llm_conn_id=LLM_CONN_ID,
system_prompt=DECOMPOSE_SYSTEM_PROMPT,
output_type=DecomposedQuestion,
+ # Push the structured output to XCom as a dict so the example runs on
+ # every supported Airflow version (the model-instance form needs 3.3+).
+ serialize_output=True,
)
def decompose_question(question: str, tickers: str) -> str:
return (
@@ -418,8 +421,8 @@ def example_llamaindex_10k_analysis():
# [END 10k_decompose]
@task
- def extract_sub_questions(decomposed: DecomposedQuestion) ->
list[SubQuestion]:
- return decomposed.sub_questions
+ def extract_sub_questions(decomposed: dict) -> list[dict]:
+ return decomposed["sub_questions"]
sub_questions = extract_sub_questions(decomposed)
@@ -428,11 +431,11 @@ def example_llamaindex_10k_analysis():
# Each sub-question targets a specific company's pre-built index.
# ------------------------------------------------------------------
@task
- def build_retrieval_kwargs(sub_questions: list[SubQuestion]) -> list[dict]:
+ def build_retrieval_kwargs(sub_questions: list[dict]) -> list[dict]:
return [
{
- "query": sq.sub_question,
- "index_persist_dir": f"{INDEX_BASE_DIR}/{sq.ticker.lower()}",
+ "query": sq["sub_question"],
+ "index_persist_dir":
f"{INDEX_BASE_DIR}/{sq['ticker'].lower()}",
}
for sq in sub_questions
]
@@ -459,14 +462,14 @@ def example_llamaindex_10k_analysis():
# re-associates each result with its company.
# ------------------------------------------------------------------
@task
- def collect_results(sub_questions: list[SubQuestion], results: list[dict])
-> str:
+ def collect_results(sub_questions: list[dict], results: list[dict]) -> str:
sections = []
for sq, r in zip(sub_questions, results):
chunks_text = "\n".join(
f" [{i + 1}] (score {c.get('score') or 0.0:.2f}) {c['text']}"
for i, c in enumerate(r["chunks"])
)
- sections.append(f"## {sq.ticker} --
{sq.sub_question}\n{chunks_text}")
+ sections.append(f"## {sq['ticker']} --
{sq['sub_question']}\n{chunks_text}")
return "\n\n".join(sections)
collected = collect_results(sub_questions, retrieval_results.output)
@@ -487,6 +490,7 @@ Cite specific data points and scores.
{{ ti.xcom_pull(task_ids='collect_results') }}""",
output_type=AnalysisReport,
+ serialize_output=True,
usage_limits=UsageLimits(
request_limit=10,
input_tokens_limit=50_000,
@@ -498,30 +502,30 @@ Cite specific data points and scores.
# ------------------------------------------------------------------
# Step 7: Format the structured report into readable text for the
- # human reviewer. The LLM produced an AnalysisReport instance (via
- # output_type=AnalysisReport); this task renders it as clean prose.
+ # human reviewer. The LLM produced a dict (via output_type=
+ # AnalysisReport with serialize_output); this task renders it as clean
prose.
# ------------------------------------------------------------------
@task
- def format_report(report: AnalysisReport) -> str:
- lines = [f"# Executive Summary\n\n{report.executive_summary}"]
+ def format_report(report: dict) -> str:
+ lines = [f"# Executive Summary\n\n{report['executive_summary']}"]
- if report.company_findings:
+ if report.get("company_findings"):
lines.append("\n# Company Findings")
- for finding in report.company_findings:
+ for finding in report["company_findings"]:
company = finding.get("company") or finding.get("ticker",
"Unknown")
lines.append(f"\n## {company}")
for key, value in finding.items():
if key not in ("company", "ticker"):
lines.append(f"- **{key}**: {value}")
- if report.key_risks:
+ if report.get("key_risks"):
lines.append("\n# Key Risks")
- for risk in report.key_risks:
+ for risk in report["key_risks"]:
lines.append(f"- {risk}")
- if report.recommendations:
+ if report.get("recommendations"):
lines.append("\n# Recommendations")
- for rec in report.recommendations:
+ for rec in report["recommendations"]:
lines.append(f"- {rec}")
return "\n".join(lines)