This is an automated email from the ASF dual-hosted git repository.

vikramkoka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new db6ce848baf Improve AIP progress tracker example for accuracy (#68037)
db6ce848baf is described below

commit db6ce848baf69fc74e0d1d6337da6b84ff749f3b
Author: Vikram Koka <[email protected]>
AuthorDate: Fri Jun 5 13:16:35 2026 -0700

    Improve AIP progress tracker example for accuracy (#68037)
    
    * Improve AIP progress tracker example DAG to produce accurate, 
evidence-backed results
    
    The example DAG was producing hallucinated output -- fabricated completion
    percentages, invented blockers, and missed shipped work -- because the
    evidence pipeline was too thin and the prompts too permissive.
    
    Key changes:
    - Add AIP registry with Confluence page IDs, GitHub search aliases, and
      codebase directory paths for multi-strategy evidence gathering
    - Fetch GitHub file tree (Git Trees API) for codebase-level evidence
    - Replace flat 3000-char spec truncation with section-aware parsing
    - Replace completion_pct/blockers Pydantic model with per-deliverable
      DeliverableStatus (name, status, evidence, confidence)
    - Add grounding rules to analysis/synthesis/validation system prompts
    - Add three-layer quality pipeline: AI validation (LLMOperator) identifies
      ungrounded claims, deterministic apply_validation task does mechanical
      find-and-replace, human reviews the corrected report
    - Add arithmetic validation that cross-checks X/Y fractions against
      structured analysis data (catches validator-introduced errors)
    - Set temperature=0 on all LLM calls for run-to-run consistency
    
    * Add skills-based AIP tracker DAG alongside the pipeline version
    
    Same file now contains two DAGs that solve the same use case -- tracking
    AIP implementation progress -- with different architectures:
    
    1. example_aip_progress_tracker (pipeline): 12-task deterministic pipeline
       with per-AIP LLM analysis, structured Pydantic output, AI validation,
       and arithmetic correction. More accurate, more auditable, fewer tokens
       (~66K total), but more complex.
    
    2. example_aip_progress_tracker_skills (agent): Single AgentOperator with
       the aip-tracker skill loaded via AgentSkillsToolset plus custom tool
       functions for Confluence/GitHub APIs. Simpler DAG (2 tasks), but less
       control over output discipline (~82K tokens, coarser granularity).
    
    The aip-tracker SKILL.md bundle teaches the agent the same grounding
    rules the pipeline enforces structurally: spec-level deliverable
    granularity, fraction-only progress format, evidence-backed assessments,
    and a mandatory self-verification checklist.
    
    Also strengthens the pipeline DAG's arithmetic validation to cross-check
    per-AIP fractions and summary totals against structured analysis data.
    
    * Removed duplicate import and redundant definition
    
    Based on feedback from Kaxil, removed the duplicate import of re and 
resolved the redundant definition of _github_headers
    
    * Updated example to fix CI errors
    
    Fix mypy errors in AIP tracker skills DAG for _safe_api_get return type
    
      Narrow type guard from `isinstance(data, str)` to `not isinstance(data, 
dict)`
      so mypy recognizes that `.get()` calls are valid after the check, since
      `_safe_api_get` returns `dict | list | str`.
---
 .../example_dags/example_aip_progress_tracker.py   | 1057 +++++++++++++++++---
 .../ai/example_dags/skills/aip-tracker/SKILL.md    |  143 +++
 2 files changed, 1072 insertions(+), 128 deletions(-)

diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
index 706a60cb9aa..225f9d03b54 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
@@ -15,58 +15,84 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-AIP progress tracker -- multi-source data fusion with common.ai operators.
+AIP progress tracker -- two approaches to the same problem with common.ai.
 
-Demonstrates Dynamic Task Mapping, structured LLM output, cost-controlled
-synthesis, and HITL approval using only ``LLMOperator`` -- no LlamaIndex or
-LangChain dependency required.
+This file contains **two DAGs** that solve the same use case -- tracking
+Airflow Improvement Proposal implementation progress -- using different
+architectural patterns. Comparing them illustrates the tradeoff between
+deterministic control and agent autonomy.
 
-For each active Airflow Improvement Proposal the Dag gathers evidence from
-two sources (Confluence spec text, GitHub PRs and commits), asks an LLM to
-assess spec-vs-implementation progress, then synthesizes a cross-AIP report
-for maintainer review.
+``example_aip_progress_tracker`` (pipeline approach):
 
-``example_aip_progress_tracker`` (manual trigger):
+.. code-block:: text
+
+    fetch_aip_list  (@task)  ─┐
+                               ├─> gather_aip_evidence  (@task, mapped ×N AIPs)
+    fetch_repo_tree (@task)  ─┘    → format_analysis_prompt (@task, mapped ×N)
+                                   → analyze_aip           (LLMOperator, 
mapped ×N)
+                                   → collect_analyses      (@task)
+                                   → format_report         (@task)
+                                   → synthesize_report     (LLMOperator, 
UsageLimits)
+                                   → validate_report       (LLMOperator, 
hallucination check)
+                                   → apply_validation      (@task, 
deterministic corrections)
+                                   → build_review_body     (@task)
+                                   → review_report         (ApprovalOperator)
+
+The pipeline gathers evidence deterministically, then uses LLMs only for
+analysis and synthesis -- with a three-layer quality pipeline (structured
+output → AI validation → arithmetic correction) to prevent hallucination.
+
+``example_aip_progress_tracker_skills`` (skills approach):
 
 .. code-block:: text
 
-    fetch_aip_list (@task)
-        → gather_aip_evidence  (@task, mapped ×N AIPs)
-        → format_analysis_prompt (@task, mapped ×N)
-        → analyze_aip          (LLMOperator, mapped ×N)
-        → collect_analyses     (@task)
-        → synthesize_report    (LLMOperator, with UsageLimits)
-        → review_report        (ApprovalOperator)
+    track_aip_progress  (AgentOperator + AgentSkillsToolset)
+        → review_report (ApprovalOperator)
+
+The agent loads the ``aip-tracker`` skill (an `agentskills.io
+<https://agentskills.io>`__ ``SKILL.md`` bundle) which teaches it how to
+assess AIP progress. Custom tools give it access to Confluence and GitHub
+APIs. The agent decides its own evidence-gathering strategy -- simpler DAG,
+but less control over accuracy.
 
-**What this makes visible that a notebook hides:**
+**When to use which:**
 
-* Each AIP investigation is a named, logged task instance with its own
-  retry behaviour -- not a loop iteration buried inside one cell.
-* If the GitHub API is rate-limited for one AIP, only that mapped
-  instance retries; the others preserve their XCom results.
-* The synthesis step's inputs and token budget are fully auditable.
-* A maintainer reviews the report before it goes to the dev list.
+* **Pipeline** when accuracy is critical and you want full auditability of
+  every evidence source and LLM judgment.
+* **Agent** when you want a quick assessment and trust the model to follow
+  skill instructions, or when the problem is too open-ended for a fixed
+  pipeline.
 
-Before running:
+Before running either DAG:
 
 1. Create an LLM connection named ``pydanticai_default`` (or the value of
    ``LLM_CONN_ID``) for your chosen model provider.
-2. Trigger the DAG with the default ``aip_numbers`` param or edit it to
+2. Optionally set a ``GITHUB_TOKEN`` environment variable for higher API
+   rate limits (unauthenticated: 10 req/min; authenticated: 5,000 req/hr).
+3. Trigger the DAG with the default ``aip_numbers`` param or edit it to
    choose which AIPs to investigate.
+4. The agent DAG requires the ``skills`` extra:
+   ``pip install "apache-airflow-providers-common-ai[skills]"``.
 """
 
 from __future__ import annotations
 
 import json
+import os
 import re
+import time
 import urllib.parse
 import urllib.request
 from datetime import timedelta
+from pathlib import Path
+from typing import Literal
 
 from pydantic import BaseModel
 from pydantic_ai.usage import UsageLimits
 
+from airflow.providers.common.ai.operators.agent import AgentOperator
 from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.ai.toolsets.skills import AgentSkillsToolset
 from airflow.providers.common.compat.sdk import dag, task
 from airflow.providers.standard.operators.hitl import ApprovalOperator
 from airflow.sdk import Param
@@ -76,40 +102,170 @@ from airflow.sdk import Param
 # ---------------------------------------------------------------------------
 
 LLM_CONN_ID = "pydanticai_default"
-
-# Confluence wiki -- public REST API, no auth required.
 CONFLUENCE_BASE_URL = "https://cwiki.apache.org/confluence";
-AIP_LISTING_PAGE_ID = "89066602"  # ancestor filter for CQL queries
 GITHUB_REPO = "apache/airflow"
+GITHUB_API_DELAY = 7  # seconds between unauthenticated GitHub API calls
 DEFAULT_AIP_NUMBERS = "76,99,103,105,108"
 
 # ---------------------------------------------------------------------------
-# Structured output model -- enforces a schema on the per-AIP LLM response
+# AIP Registry -- page IDs, search aliases, and codebase paths
+#
+# Each entry enables multi-strategy evidence gathering:
+# - page_id: direct Confluence fetch (no CQL search needed)
+# - search_terms: additional GitHub search keywords beyond "AIP-{N}"
+# - codebase_paths: directory prefixes to look for in the GitHub file tree
+# ---------------------------------------------------------------------------
+
+# [START aip_registry]
+AIP_REGISTRY: dict[int, dict] = {
+    76: {
+        "page_id": "311626969",
+        "topic": "Asset Partitions",
+        "search_terms": ["asset partition", "PartitionMapper", 
"PartitionedAsset"],
+        "codebase_paths": [
+            "airflow-core/src/airflow/models/asset.py",
+            "task-sdk/src/airflow/sdk/definitions/partition_mappers",
+            "task-sdk/src/airflow/sdk/definitions/timetables/assets.py",
+            "airflow-core/src/airflow/migrations/versions/0095_",
+            "airflow-core/src/airflow/migrations/versions/0106_",
+            "airflow-core/src/airflow/migrations/versions/0107_",
+        ],
+    },
+    99: {
+        "page_id": "406618285",
+        "topic": "Common AI Operators",
+        "search_terms": ["LLMOperator", "AgentOperator", "common.ai", 
"LangChainHook"],
+        "codebase_paths": [
+            "providers/common/ai/src/airflow/providers/common/ai/operators",
+            "providers/common/ai/src/airflow/providers/common/ai/toolsets",
+            "providers/common/ai/src/airflow/providers/common/ai/hooks",
+            "providers/common/ai/src/airflow/providers/common/ai/decorators",
+            "providers/common/ai/src/airflow/providers/common/ai/durable",
+            "providers/common/ai/src/airflow/providers/common/ai/example_dags",
+        ],
+    },
+    103: {
+        "page_id": "406623137",
+        "topic": "Task State Management",
+        "search_terms": ["task_store", "asset_store", "state_store", 
"TaskStoreAccessor"],
+        "codebase_paths": [
+            "airflow-core/src/airflow/models/task_store.py",
+            "airflow-core/src/airflow/models/asset_store.py",
+            "airflow-core/src/airflow/state",
+            "shared/state/src/airflow_shared/state",
+            
"airflow-core/src/airflow/api_fastapi/execution_api/routes/task_store.py",
+            
"airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py",
+        ],
+    },
+    105: {
+        "page_id": "421955342",
+        "topic": "Pluggable Retry Policies",
+        "search_terms": ["RetryPolicy", "retry_policy", 
"ExceptionRetryPolicy"],
+        "codebase_paths": [
+            "task-sdk/src/airflow/sdk/definitions/retry_policy.py",
+            "airflow-core/src/airflow/migrations/versions/0113_",
+        ],
+    },
+    108: {
+        "page_id": "421957285",
+        "topic": "Language Task SDK + Coordinator",
+        "search_terms": ["coordinator", "Go-SDK", "Go SDK", "Java SDK", 
"SubprocessCoordinator"],
+        "codebase_paths": [
+            "task-sdk/src/airflow/sdk/coordinators",
+            "task-sdk/src/airflow/sdk/execution_time/coordinator.py",
+            "go-sdk",
+            "java-sdk",
+        ],
+    },
+}
+# [END aip_registry]
+
+# ---------------------------------------------------------------------------
+# Structured output models
 # ---------------------------------------------------------------------------
 
 # [START aip_tracker_structured_output]
 
 
+class DeliverableStatus(BaseModel):
+    """Status of a single deliverable from the AIP spec."""
+
+    name: str
+    status: Literal["shipped", "in_progress", "not_started", "beyond_spec", 
"unclear"]
+    evidence: str
+    confidence: Literal["high", "medium", "low"]
+
+
 class AIPStatus(BaseModel):
     """Per-AIP analysis produced by the LLM."""
 
     aip_number: int
     title: str
     spec_summary: str
-    implementation_status: str
+    deliverables: list[DeliverableStatus]
+    shipped_count: int
+    total_count: int
     key_prs: list[str]
-    blockers: list[str]
-    next_steps: list[str]
-    completion_pct: int
+    confluence_update_needed: bool
+    notes: str
 
 
 # [END aip_tracker_structured_output]
 
+# [START aip_tracker_validation_output]
+
+
+class ClaimValidation(BaseModel):
+    """A single claim from the report checked against evidence."""
+
+    claim: str
+    grounded: bool
+    evidence_found: str
+    correction: str
+
+
+class ValidationResult(BaseModel):
+    """Result of the AI hallucination validation step."""
+
+    overall_verdict: Literal["pass", "pass_with_warnings", "fail"]
+    ungrounded_claims: list[ClaimValidation]
+    hallucination_risk: Literal["low", "medium", "high"]
+
+
+# [END aip_tracker_validation_output]
+
 # ---------------------------------------------------------------------------
 # HTTP helpers
 # ---------------------------------------------------------------------------
 
 
+def _github_headers() -> dict[str, str]:
+    """Build GitHub API headers, using a token if available."""
+    headers = {"Accept": "application/vnd.github.v3+json", "User-Agent": 
"airflow-aip-tracker/1.0"}
+    token = os.environ.get("GITHUB_TOKEN")
+    if token:
+        headers["Authorization"] = f"Bearer {token}"
+    return headers
+
+
+def _github_api_get(path: str, *, delay: bool = True) -> dict:
+    """GET a GitHub REST API endpoint with rate-limit awareness."""
+    url = f"https://api.github.com{path}";
+    req = urllib.request.Request(url, headers=_github_headers())
+    try:
+        with urllib.request.urlopen(req, timeout=30) as resp:
+            result = json.loads(resp.read())
+            remaining = resp.headers.get("X-RateLimit-Remaining")
+            if delay and not os.environ.get("GITHUB_TOKEN") and remaining:
+                if int(remaining) < 5:
+                    time.sleep(GITHUB_API_DELAY)
+            return result
+    except urllib.error.HTTPError as e:
+        if e.code == 403:
+            return {}
+        raise
+
+
 def _confluence_rest_get(path: str) -> dict:
     """GET a Confluence REST API endpoint (public, no auth required)."""
     url = f"{CONFLUENCE_BASE_URL}{path}"
@@ -118,40 +274,109 @@ def _confluence_rest_get(path: str) -> dict:
         return json.loads(resp.read())
 
 
-def _github_api_get(path: str) -> dict:
-    """GET a GitHub REST API endpoint (public, rate-limited to 10 req/min)."""
-    url = f"https://api.github.com{path}";
-    req = urllib.request.Request(url, headers={"Accept": 
"application/vnd.github.v3+json"})
-    with urllib.request.urlopen(req, timeout=30) as resp:
-        return json.loads(resp.read())
-
-
 def _strip_html_tags(html: str) -> str:
     """Remove HTML/Confluence markup, returning plain text."""
     text = re.sub(r"<[^>]+>", " ", html)
     return re.sub(r"\s+", " ", text).strip()
 
 
+def _extract_spec_sections(html: str) -> dict:
+    """Parse Confluence HTML into structured sections by heading."""
+    sections: dict[str, str] = {}
+    current_heading = "introduction"
+    current_text: list[str] = []
+
+    for part in re.split(r"(<h[1-4][^>]*>.*?</h[1-4]>)", html, 
flags=re.IGNORECASE | re.DOTALL):
+        heading_match = re.match(r"<h[1-4][^>]*>(.*?)</h[1-4]>", part, 
re.IGNORECASE | re.DOTALL)
+        if heading_match:
+            if current_text:
+                sections[current_heading] = _strip_html_tags(" 
".join(current_text))
+            current_heading = 
_strip_html_tags(heading_match.group(1)).lower().strip()
+            current_text = []
+        else:
+            current_text.append(part)
+
+    if current_text:
+        sections[current_heading] = _strip_html_tags(" ".join(current_text))
+
+    return sections
+
+
 # ---------------------------------------------------------------------------
 # System prompts
 # ---------------------------------------------------------------------------
 
 ANALYSIS_SYSTEM_PROMPT = """\
-You are an Airflow project analyst. Given an AIP specification and its \
-GitHub evidence (pull requests and commits), produce a structured status \
-assessment.
+You are an Airflow project analyst assessing AIP implementation progress.
+
+DELIVERABLE EXTRACTION:
+Extract deliverables from the specification's own structure. Use these \
+sources in priority order:
+1. Numbered completion criteria (e.g. "Definition of Done", "Completion \
+Criteria") -- each numbered item is one deliverable.
+2. Phase definitions -- each bullet or item under a phase heading is one \
+deliverable.
+3. Explicitly enumerated components (classes, operators, API endpoints, \
+CLI commands, UI features) listed in the spec.
+Do NOT split a single spec item into multiple deliverables. Do NOT merge \
+multiple spec items into one. Use the spec's own granularity.
 
-Be specific about what has been implemented versus what remains. Rate \
-completion percentage based on the ratio of spec goals that have \
-corresponding PRs or commits."""
+ASSESSMENT RULES:
+1. For each deliverable, you MUST cite specific evidence (a PR number, commit \
+message, or file path from the provided data). If no evidence exists, set \
+status to "not_started" or "unclear" and confidence to "low".
+2. Do NOT guess completion percentages. Instead, count shipped vs total \
+deliverables. shipped_count and total_count must match the deliverables list.
+3. Do NOT invent blockers. Use the notes field for genuine uncertainties only.
+4. If codebase evidence shows shipped work NOT mentioned in the spec, add \
+those as deliverables with status "beyond_spec" and set \
+confluence_update_needed to true.
+5. PR numbers must come from the Pull Requests section of the input. Do not \
+invent PR numbers."""
 
 SYNTHESIS_SYSTEM_PROMPT = """\
 You are an Airflow release coordinator. Given individual AIP status \
 assessments, produce a concise cross-AIP progress report.
 
-Identify the top priorities, shared blockers across AIPs, and recommend \
-where maintainer attention is most needed. Keep the report actionable \
-and under 500 words."""
+RULES:
+1. Use ONLY the data from the individual assessments. Do not add information \
+not present in the inputs.
+2. Always write progress as "X/Y deliverables shipped" (e.g. "8/14 shipped"). \
+NEVER convert to percentages. Do not write "57%" or "90%" or any other \
+percentage. The fraction form is the only acceptable format.
+3. Identify AIPs where confluence_update_needed is true.
+4. Flag deliverables with confidence="low" as needing manual verification.
+5. Do NOT characterize AIPs as "near completion" or "minimal blockers" unless \
+the evidence explicitly supports that. Use the fraction (e.g. "9/10 shipped, \
+1 in progress") and let the reader draw conclusions.
+6. Keep the report actionable and under 500 words."""
+
+VALIDATION_SYSTEM_PROMPT = """\
+You are a fact-checker for an AIP progress report. You receive two inputs:
+1. A synthesized cross-AIP progress report
+2. The raw per-AIP evidence that the report was derived from
+
+Your ONLY job is to verify claims. A separate downstream step applies your \
+corrections, so each correction must be a self-contained replacement string \
+that can be substituted for the original claim text.
+
+RULES:
+1. Any deliverable status, PR number, shipped count, or recommendation in \
+the report must have a corresponding entry in the raw evidence.
+2. If a claim has no supporting evidence, set grounded=false and provide a \
+correction. The "claim" field must contain the EXACT text from the report \
+(so it can be found by string search). The "correction" field must contain \
+the replacement text, or "REMOVE" if the claim should be deleted entirely.
+3. Flag invented blockers, fabricated statistics, and PR numbers not in the \
+evidence.
+4. Flag any percentages (e.g. "57%", "90%") as ungrounded. Progress must be \
+expressed as fractions ("8/14 shipped"), never as percentages.
+5. Flag vague characterizations ("near completion", "minimal blockers", \
+"requires foundational work") that editorialize beyond the evidence. Provide \
+a factual replacement using data from the evidence.
+6. Set overall_verdict to "fail" if any high-confidence claims are ungrounded, 
\
+"pass_with_warnings" if only low-confidence claims are flagged, "pass" if \
+all claims are grounded."""
 
 
 # ---------------------------------------------------------------------------
@@ -178,94 +403,230 @@ def example_aip_progress_tracker():
 
     Task graph::
 
-        fetch_aip_list (@task)
-            → gather_aip_evidence    (@task ×N, via Dynamic Task Mapping)
-            → format_analysis_prompt (@task ×N)
-            → analyze_aip            (LLMOperator ×N, structured output)
-            → collect_analyses       (@task)
-            → synthesize_report      (LLMOperator, with UsageLimits)
-            → review_report          (ApprovalOperator)
+        fetch_aip_list  ─┐
+                          ├─> gather_aip_evidence   (@task ×N, Dynamic Task 
Mapping)
+        fetch_repo_tree ─┘    → format_analysis_prompt (@task ×N)
+                              → analyze_aip            (LLMOperator ×N, 
structured output)
+                              → collect_analyses       (@task)
+                              → format_report          (@task)
+                              → synthesize_report      (LLMOperator, with 
UsageLimits)
+                              → validate_report        (LLMOperator, 
hallucination check)
+                              → apply_validation       (@task, deterministic 
corrections)
+                              → build_review_body      (@task)
+                              → review_report          (ApprovalOperator)
     """
 
     # ------------------------------------------------------------------
-    # Step 1: Fetch the list of active AIPs to investigate.
-    # The length of this list determines how many mapped instances are
-    # created in the downstream steps -- N is decided at runtime.
+    # Step 1: Build the AIP list from the registry, using Confluence page
+    # IDs for direct spec fetching (no CQL search needed).
     # ------------------------------------------------------------------
     @task
     def fetch_aip_list(params: dict) -> list[dict]:
         aip_numbers = [int(n.strip()) for n in 
params["aip_numbers"].split(",") if n.strip()]
         aips = []
         for num in aip_numbers:
-            cql = urllib.parse.quote(
-                f'space="AIRFLOW" AND title~"AIP-{num}" AND 
ancestor={AIP_LISTING_PAGE_ID}'
-            )
-            results = 
_confluence_rest_get(f"/rest/api/content/search?cql={cql}&limit=1")
-            if results.get("results"):
-                title = results["results"][0]["title"]
+            registry_entry = AIP_REGISTRY.get(num)
+            if registry_entry:
+                aips.append(
+                    {
+                        "aip_number": num,
+                        "title": f"AIP-{num}: {registry_entry['topic']}",
+                        "page_id": registry_entry["page_id"],
+                        "search_terms": registry_entry["search_terms"],
+                        "codebase_paths": registry_entry["codebase_paths"],
+                    }
+                )
             else:
-                title = f"AIP-{num}"
-            aips.append({"aip_number": num, "title": title})
+                aips.append(
+                    {
+                        "aip_number": num,
+                        "title": f"AIP-{num}",
+                        "page_id": None,
+                        "search_terms": [],
+                        "codebase_paths": [],
+                    }
+                )
         return aips
 
     aip_list = fetch_aip_list()
 
     # ------------------------------------------------------------------
-    # Step 2: Gather evidence for each AIP from multiple sources.
-    # Each mapped instance fetches one AIP's spec text from the
-    # Confluence wiki (cwiki.apache.org) and searches GitHub for
-    # related PRs and commits.  If the GitHub API is rate-limited
-    # for one AIP, only that instance retries.
+    # Step 2: Fetch the repo file tree from GitHub once, shared across
+    # all mapped AIP tasks.  Falls back to per-path directory listings
+    # if the tree is truncated (common for large repos).
     # ------------------------------------------------------------------
     @task
-    def gather_aip_evidence(aip: dict) -> dict:
+    def fetch_repo_tree() -> list[str]:
+        data = 
_github_api_get(f"/repos/{GITHUB_REPO}/git/trees/main?recursive=1", delay=False)
+        if not data:
+            return []
+
+        if not data.get("truncated"):
+            return [item["path"] for item in data.get("tree", []) if 
item["type"] == "blob"]
+
+        # Tree was truncated -- fetch directory listings for known paths
+        known_dirs: set[str] = set()
+        for entry in AIP_REGISTRY.values():
+            for p in entry["codebase_paths"]:
+                parts = p.rstrip("/").split("/")
+                if len(parts) >= 2:
+                    known_dirs.add("/".join(parts[:3]))
+
+        all_files: list[str] = []
+        for dir_path in sorted(known_dirs):
+            contents = 
_github_api_get(f"/repos/{GITHUB_REPO}/contents/{dir_path}")
+            if isinstance(contents, list):
+                for item in contents:
+                    if item["type"] == "file":
+                        all_files.append(item["path"])
+                    elif item["type"] == "dir":
+                        sub = 
_github_api_get(f"/repos/{GITHUB_REPO}/contents/{item['path']}")
+                        if isinstance(sub, list):
+                            all_files.extend(s["path"] for s in sub if 
s["type"] == "file")
+        return all_files
+
+    repo_tree = fetch_repo_tree()
+
+    # ------------------------------------------------------------------
+    # Step 3: Gather evidence from Confluence, GitHub, and the file tree.
+    # Each mapped instance fetches one AIP's data.  Multi-strategy GitHub
+    # search uses both the AIP tag and topic-specific keywords.
+    # ------------------------------------------------------------------
+    @task
+    def gather_aip_evidence(aip: dict, repo_tree: list[str]) -> dict:
         aip_number = aip["aip_number"]
-        cql = urllib.parse.quote(
-            f'space="AIRFLOW" AND title~"AIP-{aip_number}" AND 
ancestor={AIP_LISTING_PAGE_ID}'
-        )
-        results = 
_confluence_rest_get(f"/rest/api/content/search?cql={cql}&expand=body.view&limit=1")
-        spec_text = ""
-        if results.get("results"):
-            raw_html = results["results"][0]["body"]["view"]["value"]
-            spec_text = _strip_html_tags(raw_html)[:3000]
-        pr_query = urllib.parse.quote(f"AIP-{aip_number} repo:{GITHUB_REPO} 
is:pr")
-        pr_data = _github_api_get(f"/search/issues?q={pr_query}&per_page=10")
-        prs = [f"#{it['number']} -- {it['title']}" for it in 
pr_data.get("items", [])]
-        commit_query = urllib.parse.quote(f"AIP-{aip_number} 
repo:{GITHUB_REPO}")
-        commit_data = 
_github_api_get(f"/search/commits?q={commit_query}&per_page=10")
-        commits = [it["commit"]["message"].split("\n")[0] for it in 
commit_data.get("items", [])]
+
+        # --- Confluence spec ---
+        spec_sections: dict[str, str] = {}
+        last_modified = ""
+        if aip.get("page_id"):
+            page = 
_confluence_rest_get(f"/rest/api/content/{aip['page_id']}?expand=body.storage,version")
+            raw_html = page.get("body", {}).get("storage", {}).get("value", "")
+            spec_sections = _extract_spec_sections(raw_html)
+            version_info = page.get("version", {})
+            last_modified = version_info.get("when", "")
+        else:
+            cql = urllib.parse.quote(f'space="AIRFLOW" AND 
title~"AIP-{aip_number}"')
+            results = 
_confluence_rest_get(f"/rest/api/content/search?cql={cql}&expand=body.storage&limit=1")
+            if results.get("results"):
+                raw_html = results["results"][0].get("body", 
{}).get("storage", {}).get("value", "")
+                spec_sections = _extract_spec_sections(raw_html)
+
+        # Build spec text with section budgets
+        spec_parts = []
+        for heading in (
+            "status",
+            "deliverables",
+            "milestones",
+            "phases",
+            "scope",
+            "completion criteria",
+            "definition of done",
+        ):
+            for key, text in spec_sections.items():
+                if heading in key:
+                    spec_parts.append(f"[{key}]: {text[:4000]}")
+                    break
+
+        remaining_budget = 8000 - sum(len(p) for p in spec_parts)
+        for key, text in spec_sections.items():
+            if not any(key in p for p in spec_parts):
+                chunk = text[: min(1000, max(200, remaining_budget))]
+                spec_parts.append(f"[{key}]: {chunk}")
+                remaining_budget -= len(chunk)
+                if remaining_budget <= 0:
+                    break
+
+        # --- GitHub: multi-strategy PR and commit search ---
+        seen_pr_numbers: set[int] = set()
+        prs: list[str] = []
+        seen_commit_shas: set[str] = set()
+        commits: list[str] = []
+
+        search_queries = [f"AIP-{aip_number}"]
+        for term in aip.get("search_terms", [])[:2]:
+            search_queries.append(term)
+
+        for query_term in search_queries:
+            # PR search
+            pr_query = urllib.parse.quote(f"{query_term} repo:{GITHUB_REPO} 
is:pr")
+            pr_data = 
_github_api_get(f"/search/issues?q={pr_query}&per_page=10")
+            for item in pr_data.get("items", []):
+                if item["number"] not in seen_pr_numbers:
+                    seen_pr_numbers.add(item["number"])
+                    state = "merged" if item.get("pull_request", 
{}).get("merged_at") else item["state"]
+                    prs.append(f"#{item['number']} ({state}) -- 
{item['title']}")
+
+            time.sleep(GITHUB_API_DELAY if not os.environ.get("GITHUB_TOKEN") 
else 1)
+
+            # Commit search
+            commit_query = urllib.parse.quote(f"{query_term} 
repo:{GITHUB_REPO}")
+            commit_data = 
_github_api_get(f"/search/commits?q={commit_query}&per_page=10")
+            for item in commit_data.get("items", []):
+                sha = item["sha"][:7]
+                if sha not in seen_commit_shas:
+                    seen_commit_shas.add(sha)
+                    commits.append(f"{sha} -- 
{item['commit']['message'].split(chr(10))[0]}")
+
+            time.sleep(GITHUB_API_DELAY if not os.environ.get("GITHUB_TOKEN") 
else 1)
+
+        # --- File tree evidence ---
+        file_evidence: list[str] = []
+        for path_prefix in aip.get("codebase_paths", []):
+            matching = [f for f in repo_tree if f.startswith(path_prefix)]
+            if matching:
+                test_files = [f for f in matching if "/test_" in f or 
"/tests/" in f]
+                file_evidence.append(
+                    f"{path_prefix}: {len(matching)} files"
+                    + (f" ({len(test_files)} test files)" if test_files else 
"")
+                )
+                for f in matching[:5]:
+                    file_evidence.append(f"  - {f}")
+                if len(matching) > 5:
+                    file_evidence.append(f"  ... and {len(matching) - 5} more")
+
         return {
             "aip_number": aip_number,
             "title": aip["title"],
-            "spec_text": spec_text,
+            "last_modified": last_modified,
+            "spec_sections": spec_parts,
             "prs": prs,
             "commits": commits,
+            "file_evidence": file_evidence,
         }
 
-    evidence = gather_aip_evidence.expand(aip=aip_list)
+    evidence = 
gather_aip_evidence.partial(repo_tree=repo_tree).expand(aip=aip_list)
 
     # ------------------------------------------------------------------
-    # Step 3: Format the gathered evidence into an LLM analysis prompt.
-    # Separating formatting from data gathering keeps each task focused
-    # and makes prompt iteration independent of API logic.
+    # Step 4: Format the gathered evidence into an LLM analysis prompt.
+    # Clear section headers help the LLM distinguish spec claims from
+    # implementation evidence.
     # ------------------------------------------------------------------
     @task
     def format_analysis_prompt(evidence: dict) -> str:
-        prs_text = "\n".join(f"  - {pr}" for pr in evidence["prs"])
-        commits_text = "\n".join(f"  - {c}" for c in evidence["commits"])
+        spec_text = "\n".join(evidence.get("spec_sections", [])) or "(spec not 
available)"
+        prs_text = "\n".join(f"  - {pr}" for pr in evidence["prs"]) or "  
(none found)"
+        commits_text = "\n".join(f"  - {c}" for c in evidence["commits"]) or " 
 (none found)"
+        files_text = "\n".join(evidence.get("file_evidence", [])) or "  (none 
found)"
+
+        modified = (
+            f"\nLast Confluence update: {evidence['last_modified']}" if 
evidence.get("last_modified") else ""
+        )
+
         return (
-            f"Analyze AIP-{evidence['aip_number']}: {evidence['title']}\n\n"
-            f"Specification:\n{evidence['spec_text']}\n\n"
-            f"Pull Requests:\n{prs_text}\n\n"
-            f"Recent Commits:\n{commits_text}"
+            f"Analyze {evidence['title']}{modified}\n\n"
+            f"=== SPECIFICATION (from Confluence) ===\n{spec_text}\n\n"
+            f"=== PULL REQUESTS (from GitHub, {len(evidence['prs'])} found) 
===\n{prs_text}\n\n"
+            f"=== COMMITS (from GitHub, {len(evidence['commits'])} found) 
===\n{commits_text}\n\n"
+            f"=== CODEBASE FILES (from GitHub tree) ===\n{files_text}"
         )
 
     prompts = format_analysis_prompt.expand(evidence=evidence)
 
     # ------------------------------------------------------------------
-    # Step 4: Analyze each AIP with a structured LLM call.
+    # Step 5: Analyze each AIP with a structured LLM call.
     # Dynamic Task Mapping creates one LLMOperator instance per AIP.
-    # output_type=AIPStatus enforces the Pydantic schema on the response.
+    # output_type=AIPStatus enforces the Pydantic schema.
     # ------------------------------------------------------------------
     # [START aip_tracker_dtm_analysis]
     analyses = LLMOperator.partial(
@@ -273,37 +634,83 @@ def example_aip_progress_tracker():
         llm_conn_id=LLM_CONN_ID,
         system_prompt=ANALYSIS_SYSTEM_PROMPT,
         output_type=AIPStatus,
+        serialize_output=True,
+        agent_params={"model_settings": {"temperature": 0}},
     ).expand(prompt=prompts)
     # [END aip_tracker_dtm_analysis]
 
     # ------------------------------------------------------------------
-    # Step 5: Collect all per-AIP analyses into a single context string
-    # for the synthesis step.
+    # Step 6: Collect all per-AIP analyses into a combined context.
     # ------------------------------------------------------------------
     @task
-    def collect_analyses(analyses: list) -> str:
-        sections = []
+    def collect_analyses(analyses: list) -> list[dict]:
+        result = []
         for raw in analyses:
-            a = json.loads(raw) if isinstance(raw, str) else raw
-            blockers = ", ".join(a["blockers"]) if a["blockers"] else "None 
identified"
-            next_steps = ", ".join(a["next_steps"]) if a["next_steps"] else 
"N/A"
-            sections.append(
-                f"## AIP-{a['aip_number']}: {a['title']}\n"
-                f"Status: {a['implementation_status']} "
-                f"({a['completion_pct']}% complete)\n"
-                f"Summary: {a['spec_summary']}\n"
-                f"Key PRs: {', '.join(a['key_prs'])}\n"
-                f"Blockers: {blockers}\n"
-                f"Next steps: {next_steps}"
-            )
-        return "\n\n".join(sections)
+            if hasattr(raw, "model_dump"):
+                result.append(raw.model_dump())
+            elif isinstance(raw, str):
+                result.append(json.loads(raw))
+            else:
+                result.append(raw)
+        return result
 
     collected = collect_analyses(analyses.output)
 
     # ------------------------------------------------------------------
-    # Step 6: Synthesize a cross-AIP progress report.
+    # Step 7: Format the structured analyses as readable markdown.
+    # This serves as a non-LLM audit artifact and provides cleaner
+    # input for the synthesis and validation steps.
+    # ------------------------------------------------------------------
+    @task
+    def format_report(analyses: list[dict]) -> str:
+        sections = []
+        for a in analyses:
+            shipped = [d for d in a["deliverables"] if d["status"] == 
"shipped"]
+            beyond = [d for d in a["deliverables"] if d["status"] == 
"beyond_spec"]
+            in_progress = [d for d in a["deliverables"] if d["status"] == 
"in_progress"]
+            not_started = [d for d in a["deliverables"] if d["status"] == 
"not_started"]
+            unclear = [d for d in a["deliverables"] if d["status"] == 
"unclear"]
+
+            lines = [
+                f"## AIP-{a['aip_number']}: {a['title']}",
+                f"Progress: {a['shipped_count']}/{a['total_count']} 
deliverables shipped",
+                f"Confluence update needed: {'YES' if 
a['confluence_update_needed'] else 'No'}",
+                f"\n{a['spec_summary']}",
+            ]
+            if shipped:
+                lines.append(f"\n**Shipped ({len(shipped)}):**")
+                for d in shipped:
+                    lines.append(f"  - {d['name']}: {d['evidence']} 
[{d['confidence']}]")
+            if beyond:
+                lines.append(f"\n**Beyond spec ({len(beyond)}):**")
+                for d in beyond:
+                    lines.append(f"  - {d['name']}: {d['evidence']} 
[{d['confidence']}]")
+            if in_progress:
+                lines.append(f"\n**In progress ({len(in_progress)}):**")
+                for d in in_progress:
+                    lines.append(f"  - {d['name']}: {d['evidence']} 
[{d['confidence']}]")
+            if not_started:
+                lines.append(f"\n**Not started ({len(not_started)}):**")
+                for d in not_started:
+                    lines.append(f"  - {d['name']}")
+            if unclear:
+                lines.append(f"\n**Unclear ({len(unclear)}):**")
+                for d in unclear:
+                    lines.append(f"  - {d['name']}: {d['evidence']} 
[{d['confidence']}]")
+            if a.get("notes"):
+                lines.append(f"\n**Notes:** {a['notes']}")
+            if a["key_prs"]:
+                lines.append(f"\n**Key PRs:** {', '.join(a['key_prs'])}")
+            sections.append("\n".join(lines))
+
+        return "\n\n---\n\n".join(sections)
+
+    formatted = format_report(collected)
+
+    # ------------------------------------------------------------------
+    # Step 8: Synthesize a cross-AIP progress report.
     # UsageLimits caps the token spend so a runaway prompt cannot
-    # exhaust the API budget in a single Dag run.
+    # exhaust the API budget in a single DAG run.
     # ------------------------------------------------------------------
     # [START aip_tracker_synthesis]
     synthesize = LLMOperator(
@@ -312,9 +719,11 @@ def example_aip_progress_tracker():
         system_prompt=SYNTHESIS_SYSTEM_PROMPT,
         prompt="""\
 Create a cross-AIP progress report from these individual assessments.
-Prioritize AIPs that are close to completion or have shared blockers.
+Prioritize AIPs that are close to completion or have shared dependencies.
+Use only the data below -- do not add external information.
 
-{{ ti.xcom_pull(task_ids='collect_analyses') }}""",
+{{ ti.xcom_pull(task_ids='format_report') }}""",
+        agent_params={"model_settings": {"temperature": 0}},
         usage_limits=UsageLimits(
             request_limit=5,
             input_tokens_limit=20_000,
@@ -322,18 +731,169 @@ Prioritize AIPs that are close to completion or have 
shared blockers.
         ),
     )
     # [END aip_tracker_synthesis]
-    collected >> synthesize
+    formatted >> synthesize
+
+    # ------------------------------------------------------------------
+    # Step 9: AI-powered hallucination validation.
+    # A second LLM checks every claim in the synthesized report against
+    # the raw per-AIP evidence.  Its only job is to judge and propose
+    # corrections -- a separate deterministic step applies them.
+    # ------------------------------------------------------------------
+    # [START aip_tracker_validation]
+    validate = LLMOperator(
+        task_id="validate_report",
+        llm_conn_id=LLM_CONN_ID,
+        system_prompt=VALIDATION_SYSTEM_PROMPT,
+        prompt="""\
+Verify the following synthesized report against the raw per-AIP evidence.
+Flag any claims not grounded in the evidence.
+
+=== SYNTHESIZED REPORT ===
+{{ ti.xcom_pull(task_ids='synthesize_report') }}
+
+=== RAW PER-AIP EVIDENCE ===
+{{ ti.xcom_pull(task_ids='format_report') }}""",
+        output_type=ValidationResult,
+        serialize_output=True,
+        usage_limits=UsageLimits(
+            request_limit=5,
+            input_tokens_limit=30_000,
+            output_tokens_limit=8_000,
+        ),
+        agent_params={"model_settings": {"temperature": 0}},
+    )
+    # [END aip_tracker_validation]
+    synthesize >> validate
+
+    # ------------------------------------------------------------------
+    # Step 10: Apply validation corrections deterministically.
+    # No LLM involved -- this is a mechanical find-and-replace using
+    # the validator's claim/correction pairs.  Ensures every flagged
+    # issue is actually fixed in the final report.
+    # ------------------------------------------------------------------
+    @task
+    def apply_validation(report: str, validation: dict, analyses: list[dict]) 
-> dict:
+        corrected = report
+        applied = 0
+        for claim in validation.get("ungrounded_claims", []):
+            if claim["grounded"]:
+                continue
+            original = claim.get("claim", "")
+            correction = claim.get("correction", "")
+            if not original:
+                continue
+            if correction == "REMOVE":
+                if original in corrected:
+                    corrected = corrected.replace(original, "")
+                    applied += 1
+            elif correction and original in corrected:
+                corrected = corrected.replace(original, correction)
+                applied += 1
+
+        # --- Arithmetic validation against ground-truth analysis data ---
+        # Build lookup of correct counts from the structured per-AIP analyses.
+        truth = {}
+        total_shipped_all = 0
+        total_all = 0
+        for a in analyses:
+            aip_num = a["aip_number"]
+            shipped = a["shipped_count"]
+            total = a["total_count"]
+            truth[aip_num] = (shipped, total)
+            total_shipped_all += shipped
+            total_all += total
+
+        arithmetic_fixes = 0
+
+        # Fix per-AIP fractions: find "X/Y" near "AIP-{N}" and correct if 
wrong.
+        for aip_num, (shipped, total) in truth.items():
+            pattern = 
re.compile(rf"(AIP-{aip_num}\b[^\n]{{0,80}}?)\b(\d+)/(\d+)\b")
+            for match in pattern.finditer(corrected):
+                found_x, found_y = int(match.group(2)), int(match.group(3))
+                if found_x != shipped or found_y != total:
+                    old = match.group(0)
+                    new = old[: match.start(2) - match.start(0)] + 
f"{shipped}/{total}"
+                    corrected = corrected.replace(old, new, 1)
+                    arithmetic_fixes += 1
+
+        # Fix the cross-AIP summary total.
+        summary_pattern = 
re.compile(r"(Five|Four|Three|\d+)\s+AIPs\s+tracked\s+across\s+(\d+)/(\d+)")
+        summary_match = summary_pattern.search(corrected)
+        if summary_match:
+            found_shipped = int(summary_match.group(2))
+            found_total = int(summary_match.group(3))
+            if found_shipped != total_shipped_all or found_total != total_all:
+                old_summary = summary_match.group(0)
+                aip_count = len(truth)
+                number_words = {3: "Three", 4: "Four", 5: "Five"}
+                count_word = number_words.get(aip_count, str(aip_count))
+                new_summary = f"{count_word} AIPs tracked across 
{total_shipped_all}/{total_all}"
+                corrected = corrected.replace(old_summary, new_summary, 1)
+                arithmetic_fixes += 1
+
+        return {
+            "corrected_report": corrected.strip(),
+            "verdict": validation.get("overall_verdict", "unknown"),
+            "hallucination_risk": validation.get("hallucination_risk", 
"unknown"),
+            "total_claims_flagged": len(
+                [c for c in validation.get("ungrounded_claims", []) if not 
c["grounded"]]
+            ),
+            "corrections_applied": applied,
+            "arithmetic_fixes": arithmetic_fixes,
+            "ungrounded_claims": validation.get("ungrounded_claims", []),
+        }
+
+    validated = apply_validation(synthesize.output, validate.output, collected)
+
+    # ------------------------------------------------------------------
+    # Step 11: Build the review body for the human reviewer, showing
+    # the corrected report, validation verdict, and any flagged claims.
+    # ------------------------------------------------------------------
+    @task
+    def build_review_body(validated: dict) -> str:
+        verdict = validated["verdict"]
+        risk = validated["hallucination_risk"]
+        flagged = validated["total_claims_flagged"]
+        applied = validated["corrections_applied"]
+        arith = validated.get("arithmetic_fixes", 0)
+        claims = validated["ungrounded_claims"]
+
+        lines = [
+            f"# AIP Progress Report (Validation: {verdict.upper()})",
+            f"Hallucination risk: {risk}",
+            f"Claims flagged: {flagged} | Corrections applied: {applied} | 
Arithmetic fixes: {arith}\n",
+        ]
+
+        if claims:
+            ungrounded = [c for c in claims if not c["grounded"]]
+            if ungrounded:
+                lines.append(f"## {len(ungrounded)} Ungrounded Claim(s)\n")
+                for claim in ungrounded:
+                    lines.append(f"- **Original:** {claim['claim']}")
+                    correction = claim.get("correction", "")
+                    if correction == "REMOVE":
+                        lines.append("  **Action:** Removed")
+                    elif correction:
+                        lines.append(f"  **Replaced with:** {correction}")
+                lines.append("")
+
+        lines.append("---\n")
+        lines.append(validated["corrected_report"])
+
+        return "\n".join(lines)
+
+    review_body = build_review_body(validated)
 
     # ------------------------------------------------------------------
-    # Step 7: A maintainer reviews the synthesized report before it is
-    # shared on the dev list.  The Dag pauses here until the human
-    # approves, requests changes, or the timeout expires.
+    # Step 12: A maintainer reviews the corrected report.  The DAG
+    # pauses here until the human approves, requests changes, or the
+    # timeout expires.
     # ------------------------------------------------------------------
     # [START aip_tracker_hitl]
     ApprovalOperator(
         task_id="review_report",
-        subject="Review AIP Progress Report before sharing",
-        body=synthesize.output,
+        subject="Review AIP Progress Report (AI-validated)",
+        body=review_body,
         response_timeout=timedelta(hours=24),
     )
     # [END aip_tracker_hitl]
@@ -342,3 +902,244 @@ Prioritize AIPs that are close to completion or have 
shared blockers.
 # [END example_aip_progress_tracker]
 
 example_aip_progress_tracker()
+
+
+# ===========================================================================
+# DAG 2: Agent-based AIP tracker (AgentOperator + AgentSkillsToolset)
+#
+# Same use case, different architecture.  Instead of a 12-task deterministic
+# pipeline, a single AgentOperator with the aip-tracker skill loaded via
+# AgentSkillsToolset.  The agent discovers the skill's grounding rules and
+# calls custom tools to gather evidence from Confluence and GitHub.
+# ===========================================================================
+
+SKILLS_DIR = str(Path(__file__).parent / "skills")
+
+# ---------------------------------------------------------------------------
+# Tool functions the agent can call to gather evidence.
+# These are plain Python functions -- the agent sees their docstrings and
+# decides when and how to call them based on the skill instructions.
+# Reuses _github_headers and _safe_api_get defined above.
+# ---------------------------------------------------------------------------
+
+
+def _safe_api_get(url: str, headers: dict[str, str] | None = None) -> dict | 
list | str:
+    """GET a URL, returning parsed JSON or an error string."""
+    req = urllib.request.Request(url, headers=headers or {})
+    try:
+        with urllib.request.urlopen(req, timeout=30) as resp:
+            return json.loads(resp.read().decode())
+    except urllib.error.HTTPError as e:
+        return f"HTTP {e.code}: {e.reason}"
+    except Exception as e:
+        return f"Error: {e}"
+
+
+def fetch_confluence_page(page_id: str) -> str:
+    """Fetch an AIP specification page from the Apache Confluence wiki.
+
+    Args:
+        page_id: The Confluence page ID (e.g. "311626969" for AIP-76).
+
+    Returns:
+        The page title, last-modified date, and body content (HTML).
+        Returns an error message if the page cannot be fetched.
+    """
+    url = 
f"{CONFLUENCE_BASE_URL}/rest/api/content/{page_id}?expand=body.storage,version"
+    data = _safe_api_get(url)
+    if not isinstance(data, dict):
+        return str(data)
+
+    title = data.get("title", "Unknown")
+    version = data.get("version", {})
+    modified = version.get("when", "unknown")
+    body_html = data.get("body", {}).get("storage", {}).get("value", "")
+
+    body_text = re.sub(r"<[^>]+>", " ", body_html)
+    body_text = re.sub(r"\s+", " ", body_text).strip()
+    if len(body_text) > 12000:
+        body_text = body_text[:12000] + "... (truncated)"
+
+    return f"Title: {title}\nLast modified: {modified}\n\n{body_text}"
+
+
+def search_github_prs(query: str) -> str:
+    """Search GitHub for pull requests and commits related to an AIP.
+
+    Call this with the AIP number (e.g. "AIP-76") and also with topic
+    keywords (e.g. "asset partition") to find commits not tagged with
+    the AIP number.
+
+    Args:
+        query: Search query (e.g. "AIP-76" or "asset partition 
PartitionMapper").
+
+    Returns:
+        A list of matching PRs and commits with titles, numbers, and status.
+    """
+    headers = _github_headers()
+    encoded = urllib.parse.quote(f"{query} repo:{GITHUB_REPO}")
+
+    pr_url = 
f"https://api.github.com/search/issues?q={encoded}+type:pr&per_page=15&sort=updated";
+    time.sleep(GITHUB_API_DELAY)
+    pr_data = _safe_api_get(pr_url, headers)
+
+    lines = []
+    if isinstance(pr_data, dict):
+        for item in pr_data.get("items", [])[:15]:
+            state = item.get("state", "unknown")
+            merged = ""
+            if item.get("pull_request", {}).get("merged_at"):
+                merged = " (merged)"
+            lines.append(f"PR #{item['number']}: {item['title']} 
[{state}{merged}]")
+
+    commit_url = 
f"https://api.github.com/search/commits?q={encoded}&per_page=10&sort=committer-date";
+    time.sleep(GITHUB_API_DELAY)
+    commit_data = _safe_api_get(commit_url, headers)
+
+    if isinstance(commit_data, dict):
+        for item in commit_data.get("items", [])[:10]:
+            sha = item.get("sha", "")[:7]
+            msg = item.get("commit", {}).get("message", "").split("\n")[0]
+            lines.append(f"Commit {sha}: {msg}")
+
+    return "\n".join(lines) if lines else "No results found."
+
+
+def get_repo_file_tree(path_prefix: str) -> str:
+    """List files in the Apache Airflow repository under a given path.
+
+    Args:
+        path_prefix: Directory path to list (e.g. 
"providers/common/ai/src/airflow/providers/common/ai/operators").
+
+    Returns:
+        A list of file paths under the given prefix, with counts of source
+        and test files.
+    """
+    headers = _github_headers()
+    url = 
f"https://api.github.com/repos/{GITHUB_REPO}/git/trees/main?recursive=1";
+    time.sleep(GITHUB_API_DELAY)
+    data = _safe_api_get(url, headers)
+
+    if not isinstance(data, dict):
+        return str(data)
+
+    tree = data.get("tree", [])
+    matching = [
+        item["path"] for item in tree if item.get("type") == "blob" and 
item["path"].startswith(path_prefix)
+    ]
+
+    if not matching:
+        return f"No files found under {path_prefix}"
+
+    source_files = [f for f in matching if f.endswith(".py") and "/tests/" not 
in f]
+    test_files = [f for f in matching if "/tests/" in f]
+
+    lines = [f"Found {len(matching)} files under {path_prefix}:"]
+    lines.append(f"  Source files: {len(source_files)}, Test files: 
{len(test_files)}")
+    for f in matching[:20]:
+        lines.append(f"  - {f}")
+    if len(matching) > 20:
+        lines.append(f"  ... and {len(matching) - 20} more")
+
+    return "\n".join(lines)
+
+
+# ---------------------------------------------------------------------------
+# Agent system prompt -- reinforces the skill's rules at the system level.
+# ---------------------------------------------------------------------------
+
+AGENT_SYSTEM_PROMPT = """\
+You are an Airflow project analyst assessing AIP implementation progress.
+
+You have access to an aip-tracker skill that provides detailed assessment \
+methodology. Load it first and follow its instructions precisely.
+
+HARD RULES (these override any conflicting tendency):
+1. Gather evidence from ALL three sources (Confluence spec, GitHub search, \
+file tree) for every AIP before writing any assessment.
+2. Extract deliverables from the spec's own structure -- completion criteria, \
+phase definitions, or enumerated components. Do NOT split a single spec \
+item into multiple deliverables (e.g. individual files within one component). \
+Do NOT merge multiple spec items into one. Match the spec's granularity.
+3. Always express progress as "X/Y deliverables shipped". NEVER convert to \
+percentages. Do not write "73%" or "72%" or any percentage.
+4. Every deliverable must cite evidence from tool results. If no evidence \
+exists, mark it "not_started" or "unclear" -- do not guess.
+5. Do NOT invent blockers, risks, or characterizations beyond the evidence. \
+Do NOT editorialize with phrases like "near completion", "Advanced maturity", \
+or "substantially implemented". State the fraction and let the reader judge.
+6. PR numbers must come from search_github_prs results. Never fabricate them.
+7. Before returning the report, run the self-verification checklist in the \
+skill. Fix any violations before returning."""
+
+
+# [START example_aip_progress_tracker_skills]
+@dag(
+    tags=["example", "ai", "skills"],
+    params={
+        "aip_numbers": Param(
+            default=DEFAULT_AIP_NUMBERS,
+            type="string",
+            description="Comma-separated AIP numbers to track",
+        ),
+    },
+)
+def example_aip_progress_tracker_skills():
+    """Skills-based AIP tracker using AgentSkillsToolset.
+
+    Same use case as ``example_aip_progress_tracker`` but solved with a
+    single ``AgentOperator`` that loads the ``aip-tracker`` skill and
+    decides its own evidence-gathering strategy.
+    """
+    from pydantic_ai.toolsets import FunctionToolset
+
+    aip_toolset = FunctionToolset(
+        tools=[fetch_confluence_page, search_github_prs, get_repo_file_tree],
+    )
+
+    aip_info = "\n".join(
+        f"- AIP-{num}: {info['topic']} (page_id={info['page_id']}, "
+        f"paths: {', '.join(info['codebase_paths'][:3])})"
+        for num, info in AIP_REGISTRY.items()
+    )
+
+    prompt = (
+        "Track the implementation progress of these AIPs and produce a "
+        "cross-AIP progress report:\n\n"
+        f"{aip_info}\n\n"
+        "Use the aip-tracker skill for detailed instructions on how to "
+        "gather evidence and structure your assessment."
+    )
+
+    # [START aip_tracker_skills_operator]
+    report = AgentOperator(
+        task_id="track_aip_progress",
+        llm_conn_id=LLM_CONN_ID,
+        system_prompt=AGENT_SYSTEM_PROMPT,
+        prompt=prompt,
+        toolsets=[
+            AgentSkillsToolset(sources=[SKILLS_DIR]),
+            aip_toolset,
+        ],
+        agent_params={"model_settings": {"temperature": 0}},
+        usage_limits=UsageLimits(
+            request_limit=30,
+            input_tokens_limit=200_000,
+            output_tokens_limit=16_000,
+        ),
+    )
+    # [END aip_tracker_skills_operator]
+
+    # [START aip_tracker_skills_hitl]
+    ApprovalOperator(
+        task_id="review_report",
+        subject="Review AIP Progress Report (Skills-generated)",
+        body=report.output,
+        response_timeout=timedelta(hours=24),
+    )
+    # [END aip_tracker_skills_hitl]
+
+
+# [END example_aip_progress_tracker_skills]
+
+example_aip_progress_tracker_skills()
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/skills/aip-tracker/SKILL.md
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/skills/aip-tracker/SKILL.md
new file mode 100644
index 00000000000..19111931129
--- /dev/null
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/skills/aip-tracker/SKILL.md
@@ -0,0 +1,143 @@
+---
+name: aip-tracker
+description: Track Airflow Improvement Proposal (AIP) implementation progress 
by comparing Confluence specs against codebase evidence. Use when asked to 
assess, report on, or compare AIP status.
+license: Apache-2.0
+---
+<!-- SPDX-License-Identifier: Apache-2.0
+     https://www.apache.org/licenses/LICENSE-2.0 -->
+
+# AIP Progress Tracker Skill
+
+Assess implementation progress of Airflow Improvement Proposals by comparing
+what the Confluence spec promises against what the codebase delivers.
+
+## When to Use This Skill
+
+Use this skill when the task involves:
+
+- Tracking AIP implementation status
+- Comparing a spec against shipped code
+- Producing a cross-AIP progress report
+
+## Available Tools
+
+You have access to these tools for gathering evidence:
+
+- `fetch_confluence_page(page_id)` -- fetch an AIP spec from Confluence
+- `search_github_prs(query)` -- search GitHub PRs/commits for an AIP
+- `get_repo_file_tree(path_prefix)` -- list files under a directory in the repo
+
+## Evidence Gathering Strategy
+
+For each AIP, gather evidence from **all three sources** before assessing:
+
+1. **Confluence spec** -- call `fetch_confluence_page` with the AIP's page ID.
+   Extract: deliverables, phases, completion criteria, and status.
+2. **GitHub PRs and commits** -- call `search_github_prs` with both the AIP
+   number (e.g. "AIP-76") AND topic keywords (e.g. "asset partition").
+   Deduplicate results.
+3. **Codebase file tree** -- call `get_repo_file_tree` with the AIP's known
+   directory prefixes. Count source files and test files.
+
+## Deliverable Extraction
+
+Extract deliverables from the specification's own structure. Use these
+sources in priority order:
+
+1. **Numbered completion criteria** (e.g. "Definition of Done", "Completion
+   Criteria") -- each numbered item is one deliverable.
+2. **Phase definitions** -- each bullet or item under a phase heading is one
+   deliverable.
+3. **Explicitly enumerated components** (classes, operators, API endpoints,
+   CLI commands, UI features) listed in the spec.
+
+**Critical:** Do NOT split a single spec item into multiple deliverables
+(e.g. do not count individual files within one component as separate
+deliverables). Do NOT merge multiple spec items into one. Use the spec's
+own granularity -- if the spec says "partition mappers" as one item, that
+is one deliverable, not five separate file-level deliverables.
+
+## Assessment Rules
+
+1. For each deliverable, you MUST cite specific evidence (a PR number,
+   commit message, or file path from tool results). If no evidence exists,
+   mark the deliverable as "not_started" or "unclear" with low confidence.
+2. Always express progress as fractions: "8/12 deliverables shipped".
+   NEVER convert to percentages. Do not write "73%" or "72%" or any
+   percentage. The fraction form "X/Y shipped" is the ONLY acceptable
+   format.
+3. Do NOT invent blockers or risks not directly supported by evidence.
+   Use a "Notes" field for genuine uncertainties only.
+4. If codebase evidence shows shipped work NOT mentioned in the spec,
+   list it as "beyond_spec" and flag that the Confluence page needs
+   updating.
+5. PR numbers must come from `search_github_prs` results. Never
+   fabricate or guess PR numbers.
+6. Do NOT characterize AIPs with vague editorializing like "near
+   completion", "minimal blockers", "requires foundational work", or
+   "Advanced maturity". State the fraction (e.g. "9/10 shipped,
+   1 in progress") and let the reader draw conclusions.
+7. Do NOT add information beyond what the tools return. If a tool
+   returns no results for a query, say so -- do not fill the gap
+   with assumptions.
+
+## Output Format
+
+Structure the report as follows. Do not add sections, emojis, or
+formatting beyond this template:
+
+```
+# AIP Progress Report
+
+## Summary
+- N AIPs tracked
+- Per-AIP: AIP-{N} {Title}: X/Y shipped
+
+## Per-AIP Breakdown
+
+### AIP-{N}: {Title}
+Confluence status: {status from spec}
+Progress: X/Y deliverables shipped
+Confluence update needed: {Yes/No}
+
+**Shipped ({count}):**
+  - {deliverable}: {evidence -- PR number or file path} [{confidence}]
+
+**In progress ({count}):**
+  - {deliverable}: PR #{number} [{confidence}]
+
+**Not started ({count}):**
+  - {deliverable}
+
+**Beyond spec ({count}):**
+  - {deliverable}: {evidence} [{confidence}]
+
+**Notes:** {genuine uncertainties only, or omit if none}
+
+## Cross-AIP Dependencies
+- {only if tool evidence shows shared PRs or dependency chains}
+
+## Confluence Updates Needed
+- {list AIPs where spec is stale and what needs updating}
+```
+
+## Self-Verification (MANDATORY before returning the report)
+
+Before returning the final report, re-read it and verify each of
+these rules. If any check fails, fix the report before returning it.
+
+1. Every deliverable cites evidence from tool results, or is explicitly
+   marked "unclear" or "not_started".
+2. No percentages appear anywhere in the report -- only "X/Y shipped"
+   fractions.
+3. Every PR number mentioned appears in `search_github_prs` results.
+   Remove any that do not.
+4. No invented blockers, risks, or characterizations beyond the
+   evidence.
+5. Beyond-spec items are flagged for Confluence update.
+6. Deliverable granularity matches the spec's own structure -- no
+   file-level splitting of spec-level items.
+7. No vague editorializing ("near completion", "Advanced maturity",
+   "substantially implemented"). Replace with the fraction.
+8. shipped_count + in_progress_count + not_started_count +
+   beyond_spec_count = total_count for each AIP.

Reply via email to