This is an automated email from the ASF dual-hosted git repository.
vikramkoka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4b0db2f40c8 Add AIP progress tracker example DAG for common.ai
provider (#67660)
4b0db2f40c8 is described below
commit 4b0db2f40c8824dbef876176de80dd756615f579
Author: Vikram Koka <[email protected]>
AuthorDate: Thu May 28 15:29:51 2026 -0700
Add AIP progress tracker example DAG for common.ai provider (#67660)
* Add AIP progress tracker example DAG for common.ai provider
Demonstrates Dynamic Task Mapping, structured LLM output (Pydantic),
cost-controlled synthesis (UsageLimits), and HITL approval using
LLMOperator with live data from the Apache Confluence wiki and GitHub.
The DAG accepts an aip_numbers param (default: 76,99,103,105,108) to
choose which AIPs to investigate. For each AIP it fetches the spec via
Confluence CQL search and gathers PR/commit evidence from the GitHub
Search API, then fans out structured LLM analysis via DTM and
synthesizes a cross-AIP progress report for maintainer review.
* Changed spelling
Changed analysing to analyzing to fix spell-check.
Interesting difference between British vs. US
* ignore AIP spell check
---------
Co-authored-by: gopidesupavan <[email protected]>
---
docs/spelling_wordlist.txt | 1 +
.../example_dags/example_aip_progress_tracker.py | 344 +++++++++++++++++++++
2 files changed, 345 insertions(+)
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 961c8247507..fdc795161bb 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -27,6 +27,7 @@ agentic
ai
aio
aiobotocore
+AIP
aiplatform
Airbnb
airbnb
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
new file mode 100644
index 00000000000..706a60cb9aa
--- /dev/null
+++
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
@@ -0,0 +1,344 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+AIP progress tracker -- multi-source data fusion with common.ai operators.
+
+Demonstrates Dynamic Task Mapping, structured LLM output, cost-controlled
+synthesis, and HITL approval using only ``LLMOperator`` -- no LlamaIndex or
+LangChain dependency required.
+
+For each active Airflow Improvement Proposal the Dag gathers evidence from
+two sources (Confluence spec text, GitHub PRs and commits), asks an LLM to
+assess spec-vs-implementation progress, then synthesizes a cross-AIP report
+for maintainer review.
+
+``example_aip_progress_tracker`` (manual trigger):
+
+.. code-block:: text
+
+ fetch_aip_list (@task)
+ → gather_aip_evidence (@task, mapped ×N AIPs)
+ → format_analysis_prompt (@task, mapped ×N)
+ → analyze_aip (LLMOperator, mapped ×N)
+ → collect_analyses (@task)
+ → synthesize_report (LLMOperator, with UsageLimits)
+ → review_report (ApprovalOperator)
+
+**What this makes visible that a notebook hides:**
+
+* Each AIP investigation is a named, logged task instance with its own
+ retry behaviour -- not a loop iteration buried inside one cell.
+* If the GitHub API is rate-limited for one AIP, only that mapped
+ instance retries; the others preserve their XCom results.
+* The synthesis step's inputs and token budget are fully auditable.
+* A maintainer reviews the report before it goes to the dev list.
+
+Before running:
+
+1. Create an LLM connection named ``pydanticai_default`` (or the value of
+ ``LLM_CONN_ID``) for your chosen model provider.
+2. Trigger the DAG with the default ``aip_numbers`` param or edit it to
+ choose which AIPs to investigate.
+"""
+
+from __future__ import annotations
+
+import json
+import re
+import urllib.parse
+import urllib.request
+from datetime import timedelta
+
+from pydantic import BaseModel
+from pydantic_ai.usage import UsageLimits
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.compat.sdk import dag, task
+from airflow.providers.standard.operators.hitl import ApprovalOperator
+from airflow.sdk import Param
+
+# ---------------------------------------------------------------------------
+# Configuration
+# ---------------------------------------------------------------------------
+
+LLM_CONN_ID = "pydanticai_default"
+
+# Confluence wiki -- public REST API, no auth required.
+CONFLUENCE_BASE_URL = "https://cwiki.apache.org/confluence"
+AIP_LISTING_PAGE_ID = "89066602" # ancestor filter for CQL queries
+GITHUB_REPO = "apache/airflow"
+DEFAULT_AIP_NUMBERS = "76,99,103,105,108"
+
+# ---------------------------------------------------------------------------
+# Structured output model -- enforces a schema on the per-AIP LLM response
+# ---------------------------------------------------------------------------
+
+# [START aip_tracker_structured_output]
+
+
+class AIPStatus(BaseModel):
+ """Per-AIP analysis produced by the LLM."""
+
+ aip_number: int
+ title: str
+ spec_summary: str
+ implementation_status: str
+ key_prs: list[str]
+ blockers: list[str]
+ next_steps: list[str]
+ completion_pct: int
+
+
+# [END aip_tracker_structured_output]
+
+# ---------------------------------------------------------------------------
+# HTTP helpers
+# ---------------------------------------------------------------------------
+
+
+def _confluence_rest_get(path: str) -> dict:
+ """GET a Confluence REST API endpoint (public, no auth required)."""
+ url = f"{CONFLUENCE_BASE_URL}{path}"
+ req = urllib.request.Request(url, headers={"Accept": "application/json"})
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return json.loads(resp.read())
+
+
+def _github_api_get(path: str) -> dict:
+ """GET a GitHub REST API endpoint (public, rate-limited to 10 req/min)."""
+ url = f"https://api.github.com{path}"
+ req = urllib.request.Request(url, headers={"Accept":
"application/vnd.github.v3+json"})
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return json.loads(resp.read())
+
+
+def _strip_html_tags(html: str) -> str:
+ """Remove HTML/Confluence markup, returning plain text."""
+ text = re.sub(r"<[^>]+>", " ", html)
+ return re.sub(r"\s+", " ", text).strip()
+
+
+# ---------------------------------------------------------------------------
+# System prompts
+# ---------------------------------------------------------------------------
+
+ANALYSIS_SYSTEM_PROMPT = """\
+You are an Airflow project analyst. Given an AIP specification and its \
+GitHub evidence (pull requests and commits), produce a structured status \
+assessment.
+
+Be specific about what has been implemented versus what remains. Rate \
+completion percentage based on the ratio of spec goals that have \
+corresponding PRs or commits."""
+
+SYNTHESIS_SYSTEM_PROMPT = """\
+You are an Airflow release coordinator. Given individual AIP status \
+assessments, produce a concise cross-AIP progress report.
+
+Identify the top priorities, shared blockers across AIPs, and recommend \
+where maintainer attention is most needed. Keep the report actionable \
+and under 500 words."""
+
+
+# ---------------------------------------------------------------------------
+# DAG
+# ---------------------------------------------------------------------------
+
+
+# [START example_aip_progress_tracker]
+@dag(
+ schedule=None,
+ catchup=False,
+ params={
+ "aip_numbers": Param(
+ DEFAULT_AIP_NUMBERS,
+ type="string",
+ description="Comma-separated AIP numbers to investigate (e.g.
76,99,103,105,108)",
+ ),
+ },
+ tags=["example", "aip_tracker", "common_ai"],
+)
+def example_aip_progress_tracker():
+ """
+ Track AIP progress by analyzing Confluence specs against GitHub evidence.
+
+ Task graph::
+
+ fetch_aip_list (@task)
+ → gather_aip_evidence (@task ×N, via Dynamic Task Mapping)
+ → format_analysis_prompt (@task ×N)
+ → analyze_aip (LLMOperator ×N, structured output)
+ → collect_analyses (@task)
+ → synthesize_report (LLMOperator, with UsageLimits)
+ → review_report (ApprovalOperator)
+ """
+
+ # ------------------------------------------------------------------
+ # Step 1: Fetch the list of active AIPs to investigate.
+ # The length of this list determines how many mapped instances are
+ # created in the downstream steps -- N is decided at runtime.
+ # ------------------------------------------------------------------
+ @task
+ def fetch_aip_list(params: dict) -> list[dict]:
+ aip_numbers = [int(n.strip()) for n in
params["aip_numbers"].split(",") if n.strip()]
+ aips = []
+ for num in aip_numbers:
+ cql = urllib.parse.quote(
+ f'space="AIRFLOW" AND title~"AIP-{num}" AND
ancestor={AIP_LISTING_PAGE_ID}'
+ )
+ results =
_confluence_rest_get(f"/rest/api/content/search?cql={cql}&limit=1")
+ if results.get("results"):
+ title = results["results"][0]["title"]
+ else:
+ title = f"AIP-{num}"
+ aips.append({"aip_number": num, "title": title})
+ return aips
+
+ aip_list = fetch_aip_list()
+
+ # ------------------------------------------------------------------
+ # Step 2: Gather evidence for each AIP from multiple sources.
+ # Each mapped instance fetches one AIP's spec text from the
+ # Confluence wiki (cwiki.apache.org) and searches GitHub for
+ # related PRs and commits. If the GitHub API is rate-limited
+ # for one AIP, only that instance retries.
+ # ------------------------------------------------------------------
+ @task
+ def gather_aip_evidence(aip: dict) -> dict:
+ aip_number = aip["aip_number"]
+ cql = urllib.parse.quote(
+ f'space="AIRFLOW" AND title~"AIP-{aip_number}" AND
ancestor={AIP_LISTING_PAGE_ID}'
+ )
+ results =
_confluence_rest_get(f"/rest/api/content/search?cql={cql}&expand=body.view&limit=1")
+ spec_text = ""
+ if results.get("results"):
+ raw_html = results["results"][0]["body"]["view"]["value"]
+ spec_text = _strip_html_tags(raw_html)[:3000]
+ pr_query = urllib.parse.quote(f"AIP-{aip_number} repo:{GITHUB_REPO}
is:pr")
+ pr_data = _github_api_get(f"/search/issues?q={pr_query}&per_page=10")
+ prs = [f"#{it['number']} -- {it['title']}" for it in
pr_data.get("items", [])]
+ commit_query = urllib.parse.quote(f"AIP-{aip_number}
repo:{GITHUB_REPO}")
+ commit_data =
_github_api_get(f"/search/commits?q={commit_query}&per_page=10")
+ commits = [it["commit"]["message"].split("\n")[0] for it in
commit_data.get("items", [])]
+ return {
+ "aip_number": aip_number,
+ "title": aip["title"],
+ "spec_text": spec_text,
+ "prs": prs,
+ "commits": commits,
+ }
+
+ evidence = gather_aip_evidence.expand(aip=aip_list)
+
+ # ------------------------------------------------------------------
+ # Step 3: Format the gathered evidence into an LLM analysis prompt.
+ # Separating formatting from data gathering keeps each task focused
+ # and makes prompt iteration independent of API logic.
+ # ------------------------------------------------------------------
+ @task
+ def format_analysis_prompt(evidence: dict) -> str:
+ prs_text = "\n".join(f" - {pr}" for pr in evidence["prs"])
+ commits_text = "\n".join(f" - {c}" for c in evidence["commits"])
+ return (
+ f"Analyze AIP-{evidence['aip_number']}: {evidence['title']}\n\n"
+ f"Specification:\n{evidence['spec_text']}\n\n"
+ f"Pull Requests:\n{prs_text}\n\n"
+ f"Recent Commits:\n{commits_text}"
+ )
+
+ prompts = format_analysis_prompt.expand(evidence=evidence)
+
+ # ------------------------------------------------------------------
+ # Step 4: Analyze each AIP with a structured LLM call.
+ # Dynamic Task Mapping creates one LLMOperator instance per AIP.
+ # output_type=AIPStatus enforces the Pydantic schema on the response.
+ # ------------------------------------------------------------------
+ # [START aip_tracker_dtm_analysis]
+ analyses = LLMOperator.partial(
+ task_id="analyze_aip",
+ llm_conn_id=LLM_CONN_ID,
+ system_prompt=ANALYSIS_SYSTEM_PROMPT,
+ output_type=AIPStatus,
+ ).expand(prompt=prompts)
+ # [END aip_tracker_dtm_analysis]
+
+ # ------------------------------------------------------------------
+ # Step 5: Collect all per-AIP analyses into a single context string
+ # for the synthesis step.
+ # ------------------------------------------------------------------
+ @task
+ def collect_analyses(analyses: list) -> str:
+ sections = []
+ for raw in analyses:
+ a = json.loads(raw) if isinstance(raw, str) else raw
+ blockers = ", ".join(a["blockers"]) if a["blockers"] else "None
identified"
+ next_steps = ", ".join(a["next_steps"]) if a["next_steps"] else
"N/A"
+ sections.append(
+ f"## AIP-{a['aip_number']}: {a['title']}\n"
+ f"Status: {a['implementation_status']} "
+ f"({a['completion_pct']}% complete)\n"
+ f"Summary: {a['spec_summary']}\n"
+ f"Key PRs: {', '.join(a['key_prs'])}\n"
+ f"Blockers: {blockers}\n"
+ f"Next steps: {next_steps}"
+ )
+ return "\n\n".join(sections)
+
+ collected = collect_analyses(analyses.output)
+
+ # ------------------------------------------------------------------
+ # Step 6: Synthesize a cross-AIP progress report.
+ # UsageLimits caps the token spend so a runaway prompt cannot
+ # exhaust the API budget in a single Dag run.
+ # ------------------------------------------------------------------
+ # [START aip_tracker_synthesis]
+ synthesize = LLMOperator(
+ task_id="synthesize_report",
+ llm_conn_id=LLM_CONN_ID,
+ system_prompt=SYNTHESIS_SYSTEM_PROMPT,
+ prompt="""\
+Create a cross-AIP progress report from these individual assessments.
+Prioritize AIPs that are close to completion or have shared blockers.
+
+{{ ti.xcom_pull(task_ids='collect_analyses') }}""",
+ usage_limits=UsageLimits(
+ request_limit=5,
+ input_tokens_limit=20_000,
+ output_tokens_limit=4_000,
+ ),
+ )
+ # [END aip_tracker_synthesis]
+ collected >> synthesize
+
+ # ------------------------------------------------------------------
+ # Step 7: A maintainer reviews the synthesized report before it is
+ # shared on the dev list. The Dag pauses here until the human
+ # approves, requests changes, or the timeout expires.
+ # ------------------------------------------------------------------
+ # [START aip_tracker_hitl]
+ ApprovalOperator(
+ task_id="review_report",
+ subject="Review AIP Progress Report before sharing",
+ body=synthesize.output,
+ response_timeout=timedelta(hours=24),
+ )
+ # [END aip_tracker_hitl]
+
+
+# [END example_aip_progress_tracker]
+
+example_aip_progress_tracker()