This is an automated email from the ASF dual-hosted git repository.

akm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new f63509b  Fixes and json export
f63509b is described below

commit f63509be6e5f07d1b5b663c4145fdcf0dc88560b
Author: Andrew Musselman <[email protected]>
AuthorDate: Mon Apr 6 16:30:33 2026 -0700

    Fixes and json export
---
 repos/apache/github-review/agents/json-export.py | 239 ++++++++++++++++++
 repos/apache/github-review/agents/pre-fetch.py   | 308 +++++++++++++++++++++++
 repos/apache/github-review/agents/publishing.py  | 215 ++++++++++------
 repos/apache/github-review/agents/security.py    | 203 ++++++++-------
 4 files changed, 789 insertions(+), 176 deletions(-)

diff --git a/repos/apache/github-review/agents/json-export.py 
b/repos/apache/github-review/agents/json-export.py
new file mode 100644
index 0000000..de557aa
--- /dev/null
+++ b/repos/apache/github-review/agents/json-export.py
@@ -0,0 +1,239 @@
+from agent_factory.remote_mcp_client import RemoteMCPClient
+from services.llm_service import call_llm
+import httpx
+import re
+
+async def run(input_dict, tools):
+    mcpc = { url : RemoteMCPClient(remote_url = url) for url in tools.keys() }
+    http_client = httpx.AsyncClient()
+    try:
+        owner = input_dict.get("owner", "apache")
+        print(f"Agent 4 (JSON export) starting for owner={owner}", flush=True)
+
+        report_ns = data_store.use_namespace(f"ci-report:{owner}")
+        security_ns = data_store.use_namespace(f"ci-security:{owner}")
+        classification_ns = 
data_store.use_namespace(f"ci-classification:{owner}")
+
+        pub_stats = report_ns.get("latest_stats")
+        sec_stats = security_ns.get("latest_stats")
+
+        if not pub_stats or not sec_stats:
+            return {"outputText": "Error: Run Agent 1 and Agent 2 first."}
+
+        # --- Collect all repos ---
+        publishing_repos = set(pub_stats.get("publishing_repos", []))
+
+        # --- Read per-repo classifications from Agent 1 cache ---
+        all_cls_keys = classification_ns.list_keys()
+        meta_keys = [k for k in all_cls_keys if k.startswith("__meta__:")]
+
+        repo_workflows = {}  # repo -> [workflow classifications]
+        for mk in meta_keys:
+            repo = mk.replace("__meta__:", "")
+            meta = classification_ns.get(mk)
+            if not meta or not meta.get("complete"):
+                continue
+            wf_names = meta.get("workflows", [])
+            if not wf_names:
+                repo_workflows[repo] = []
+                continue
+            wfs = []
+            for wf_name in wf_names:
+                cls = classification_ns.get(f"{repo}:{wf_name}")
+                if cls:
+                    wfs.append(cls)
+            repo_workflows[repo] = wfs
+
+        print(f"Read classifications for {len(repo_workflows)} repos", 
flush=True)
+
+        # --- Read per-repo security findings from Agent 2 cache ---
+        all_sec_keys = security_ns.list_keys()
+        finding_keys = [k for k in all_sec_keys if k.startswith("findings:")]
+
+        repo_findings = {}
+        for k in finding_keys:
+            repo = k.replace("findings:", "")
+            findings = security_ns.get(k)
+            if findings and isinstance(findings, list):
+                repo_findings[repo] = findings
+
+        print(f"Read findings for {len(repo_findings)} repos", flush=True)
+
+        # --- Build per-repo JSON ---
+        all_repos = sorted(set(repo_workflows.keys()) | 
set(repo_findings.keys()))
+
+        CATEGORY_ORDER = ["release_artifact", "snapshot_artifact", 
"ci_infrastructure", "documentation", "none"]
+        SEV_ORDER = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]
+
+        def safe_str(val):
+            if val is None:
+                return ""
+            if isinstance(val, dict):
+                return json.dumps(val)
+            if isinstance(val, list):
+                return ", ".join(str(v) for v in val)
+            return str(val).strip()
+
+        def classify_workflow(w):
+            """Extract clean workflow record from classification cache."""
+            cat = safe_str(w.get("category")).lower().strip()
+            if cat not in CATEGORY_ORDER:
+                cat = "none"
+
+            ecosystems = []
+            for e in (w.get("ecosystems") or []):
+                e_str = safe_str(e).lower().strip().replace(" ", "_")
+                if e_str and e_str != "github_actions_artifacts":
+                    ecosystems.append(e_str)
+
+            # Normalize security notes to strings
+            notes = []
+            for n in (w.get("security_notes") or []):
+                if isinstance(n, str):
+                    notes.append(n.strip())
+                elif isinstance(n, dict):
+                    risk = n.get("risk_level") or n.get("risk") or "INFO"
+                    desc = n.get("description") or n.get("details") or str(n)
+                    notes.append(f"[{risk}] {desc}")
+
+            return {
+                "file": w.get("file", "unknown"),
+                "workflow_name": safe_str(w.get("workflow_name")) or 
w.get("file", "unknown"),
+                "publishes": bool(w.get("publishes_to_registry")),
+                "category": cat,
+                "ecosystems": ecosystems,
+                "trigger": safe_str(w.get("trigger")),
+                "auth_method": safe_str(w.get("auth_method")),
+                "publish_actions": w.get("publish_actions") or [],
+                "publish_commands": w.get("publish_commands") or [],
+                "summary": safe_str(w.get("summary")),
+                "confidence": safe_str(w.get("confidence")),
+                "security_notes": notes,
+            }
+
+        def classify_finding(f):
+            """Extract clean finding record."""
+            return {
+                "severity": f.get("severity", "INFO"),
+                "check": f.get("check", "unknown"),
+                "file": f.get("file", "unknown"),
+                "description": f.get("description", ""),
+            }
+
+        def summarize_severities(findings):
+            counts = {}
+            for f in findings:
+                s = f.get("severity", "INFO")
+                counts[s] = counts.get(s, 0) + 1
+            return counts
+
+        def summarize_checks(findings):
+            counts = {}
+            for f in findings:
+                chk = f.get("check", "unknown")
+                counts[chk] = counts.get(chk, 0) + 1
+            return counts
+
+        repos_json = []
+        for repo in all_repos:
+            wfs = repo_workflows.get(repo, [])
+            findings = repo_findings.get(repo, [])
+            publishes = repo in publishing_repos
+
+            # Classify workflows
+            workflow_records = [classify_workflow(w) for w in wfs]
+
+            # Workflow category summary
+            cat_counts = {}
+            eco_set = set()
+            for wr in workflow_records:
+                if wr["publishes"]:
+                    cat_counts[wr["category"]] = 
cat_counts.get(wr["category"], 0) + 1
+                    eco_set.update(wr["ecosystems"])
+
+            # Finding records
+            finding_records = [classify_finding(f) for f in findings]
+            sev_counts = summarize_severities(finding_records)
+            check_counts = summarize_checks(finding_records)
+
+            # Worst severity
+            worst = "none"
+            for s in SEV_ORDER:
+                if sev_counts.get(s, 0) > 0:
+                    worst = s
+                    break
+
+            # Trusted publishing: does this repo have TP opportunities?
+            has_tp = False
+            tp_ecosystems = []
+            for wr in workflow_records:
+                if not wr["publishes"] or wr["category"] not in 
("release_artifact", "snapshot_artifact"):
+                    continue
+                auth_lower = wr["auth_method"].lower()
+                if "oidc" in auth_lower or "trusted publisher" in auth_lower 
or "id-token" in auth_lower:
+                    continue
+                token_pats = ["token", "password", "secret", "api_key", 
"apikey", "nexus_user", "nexus_pw"]
+                uses_token = any(p in auth_lower for p in token_pats)
+                if uses_token:
+                    tp_eligible = {"pypi", "npm", "nuget", "rubygems", 
"crates_io"}
+                    for eco in wr["ecosystems"]:
+                        if eco in tp_eligible:
+                            has_tp = True
+                            if eco not in tp_ecosystems:
+                                tp_ecosystems.append(eco)
+
+            repos_json.append({
+                "repo": f"{owner}/{repo}",
+                "has_workflows": len(wfs) > 0,
+                "total_workflows": len(wfs),
+                "publishes_to_registry": publishes,
+                "ecosystems": sorted(eco_set),
+                "category_counts": cat_counts,
+                "trusted_publishing": {
+                    "migration_needed": has_tp,
+                    "eligible_ecosystems": sorted(tp_ecosystems),
+                },
+                "security": {
+                    "total_findings": len(finding_records),
+                    "worst_severity": worst,
+                    "severity_counts": {s: sev_counts.get(s, 0) for s in 
SEV_ORDER if sev_counts.get(s, 0) > 0},
+                    "check_counts": check_counts,
+                },
+                "workflows": workflow_records,
+                "findings": finding_records,
+            })
+
+        # --- Build top-level summary ---
+        output = {
+            "schema_version": "1.0",
+            "owner": owner,
+            "generated_at": 
__import__("datetime").datetime.utcnow().isoformat() + "Z",
+            "summary": {
+                "repos_scanned": pub_stats.get("repos_scanned", 0),
+                "repos_with_workflows": pub_stats.get("repos_with_workflows", 
0),
+                "total_workflows": pub_stats.get("total_workflows", 0),
+                "repos_publishing": len(publishing_repos),
+                "ecosystem_counts": pub_stats.get("ecosystem_counts", {}),
+                "category_counts": pub_stats.get("by_category", {}),
+                "trusted_publishing_opportunities": 
pub_stats.get("trusted_publishing_opportunities", 0),
+                "security": {
+                    "total_findings": sec_stats.get("total_findings", 0),
+                    "repos_with_findings": 
sec_stats.get("repos_with_findings", 0),
+                    "severity_counts": sec_stats.get("severity_counts", {}),
+                    "check_counts": sec_stats.get("check_counts", {}),
+                },
+            },
+            "repos": repos_json,
+        }
+
+        output_json = json.dumps(output, indent=2, ensure_ascii=False)
+        print(f"JSON report: {len(output_json)} chars, {len(repos_json)} 
repos", flush=True)
+
+        # Store in data store
+        combined_ns = data_store.use_namespace(f"ci-combined:{owner}")
+        combined_ns.set("latest_json", output)
+
+        return {"outputText": output_json}
+
+    finally:
+        await http_client.aclose()
\ No newline at end of file
diff --git a/repos/apache/github-review/agents/pre-fetch.py 
b/repos/apache/github-review/agents/pre-fetch.py
new file mode 100644
index 0000000..c2bbc54
--- /dev/null
+++ b/repos/apache/github-review/agents/pre-fetch.py
@@ -0,0 +1,308 @@
+from agent_factory.remote_mcp_client import RemoteMCPClient
+from services.llm_service import call_llm
+import httpx
+
+async def run(input_dict, tools):
+    mcpc = { url : RemoteMCPClient(remote_url = url) for url in tools.keys() }
+    http_client = httpx.AsyncClient()
+    try:
+        owner = input_dict.get("owner", "apache")
+        all_repos_raw = input_dict.get("all_repos", "false")
+        repos_str = input_dict.get("repos", "").strip()
+        github_pat = input_dict.get("github_pat", "").strip()
+        clear_cache_raw = input_dict.get("clear_cache", "false")
+
+        all_repos = str(all_repos_raw).lower().strip() in ("true", "1", "yes")
+        clear_cache = str(clear_cache_raw).lower().strip() in ("true", "1", 
"yes")
+
+        if not github_pat:
+            return {"outputText": "Error: `github_pat` is required.\n"
+                    "Create a fine-grained PAT with **Contents: read** at 
https://github.com/settings/tokens"}
+
+        if not all_repos and not repos_str:
+            return {"outputText": "Error: provide `repos` or set `all_repos` 
to `true`."}
+
+        GITHUB_API = "https://api.github.com";
+        gh_headers = {"Accept": "application/vnd.github.v3+json",
+                      "Authorization": f"token {github_pat}"}
+
+        workflow_cache = data_store.use_namespace(f"ci-workflows:{owner}")
+        classification_cache = 
data_store.use_namespace(f"ci-classification:{owner}")
+
+        if clear_cache:
+            print("Clearing workflow cache...", flush=True)
+            for key in workflow_cache.list_keys():
+                workflow_cache.delete(key)
+            print("Cache cleared.", flush=True)
+
+        # --- Preflight ---
+        preflight_resp = await http_client.get(f"{GITHUB_API}/rate_limit", 
headers=gh_headers, timeout=15.0)
+        if preflight_resp.status_code == 401:
+            return {"outputText": "Error: GitHub PAT is invalid or expired 
(HTTP 401)."}
+        if preflight_resp.status_code == 200:
+            rate = preflight_resp.json().get("resources", {}).get("core", {})
+            print(f"GitHub API: {rate.get('remaining', 
'?')}/{rate.get('limit', '?')} remaining", flush=True)
+
+        # --- GitHub GET with retry/rate-limit handling ---
+        async def github_get(url, params=None):
+            for attempt in range(5):
+                try:
+                    resp = await http_client.get(url, headers=gh_headers, 
params=params, timeout=30.0)
+                except Exception as e:
+                    print(f"  HTTP error (attempt {attempt+1}): 
{str(e)[:80]}", flush=True)
+                    if attempt < 4:
+                        await asyncio.sleep(2 ** attempt)
+                        continue
+                    return None
+
+                if resp.status_code == 429:
+                    wait = int(resp.headers.get("Retry-After", "60"))
+                    print(f"  Rate limited, waiting {wait}s...", flush=True)
+                    await asyncio.sleep(min(wait, 120))
+                    continue
+
+                if resp.status_code == 403:
+                    if resp.headers.get("X-RateLimit-Remaining", "") == "0":
+                        print(f"  Rate limit exhausted, waiting 60s...", 
flush=True)
+                        await asyncio.sleep(60)
+                        continue
+
+                remaining = resp.headers.get("X-RateLimit-Remaining")
+                if remaining:
+                    try:
+                        rem = int(remaining)
+                        if rem < 50:
+                            print(f"  WARNING: {rem} API requests remaining", 
flush=True)
+                            await asyncio.sleep(5)
+                        elif rem < 100:
+                            await asyncio.sleep(2)
+                    except ValueError:
+                        pass
+
+                return resp
+            return None
+
+        # --- Step 1: Get repo list ---
+        if all_repos:
+            print(f"Fetching all repos for {owner}...", flush=True)
+            repo_names = []
+            skipped = 0
+            page = 1
+            while True:
+                resp = await github_get(
+                    f"{GITHUB_API}/orgs/{owner}/repos",
+                    params={"per_page": 100, "page": page, "sort": "pushed", 
"type": "public"})
+                if resp is None or resp.status_code != 200:
+                    break
+                data = resp.json()
+                if not data or not isinstance(data, list):
+                    break
+                for r in data:
+                    if isinstance(r, dict) and "name" in r:
+                        if r.get("archived"):
+                            skipped += 1
+                        else:
+                            repo_names.append(r["name"])
+                if 'rel="next"' not in resp.headers.get("Link", ""):
+                    break
+                page += 1
+                await asyncio.sleep(0.3)
+            print(f"Found {len(repo_names)} active repos ({skipped} archived 
skipped)", flush=True)
+        else:
+            repo_names = [r.strip() for r in repos_str.split(",") if r.strip()]
+            print(f"Using provided list of {len(repo_names)} repos", 
flush=True)
+
+        if not repo_names:
+            return {"outputText": "No repositories found."}
+
+        # --- Build index of what's already cached ---
+        print("Building cache index...", flush=True)
+        all_cached_keys = set(workflow_cache.list_keys())
+        print(f"  {len(all_cached_keys)} keys in workflow cache", flush=True)
+
+        def has_prefetch(repo_name):
+            meta = workflow_cache.get(f"__prefetch__:{repo_name}")
+            return meta and meta.get("complete")
+
+        def has_composites(repo_name):
+            meta = workflow_cache.get(f"__composites__:{repo_name}")
+            return meta and meta.get("complete")
+
+        def is_classified(repo_name):
+            meta = classification_cache.get(f"__meta__:{repo_name}")
+            return meta and meta.get("complete")
+
+        # --- Semaphore for concurrent fetches ---
+        api_sem = asyncio.Semaphore(10)
+
+        # --- Step 2: Prefetch workflows ---
+        stats = {"repos": 0, "wf_skipped": 0, "wf_fetched": 0, 
"wf_no_workflows": 0,
+                 "wf_yaml_cached": 0, "wf_yaml_existed": 0,
+                 "ca_skipped": 0, "ca_fetched": 0, "ca_repos_with": 0, 
"ca_total": 0,
+                 "errors": 0}
+
+        async def fetch_single_yaml(repo_name, wf_name, download_url):
+            """Fetch one YAML file if not already cached."""
+            cache_key = f"{repo_name}/{wf_name}"
+            if cache_key in all_cached_keys:
+                return True, True  # success, was_cached
+
+            async with api_sem:
+                try:
+                    resp = await http_client.get(download_url, 
follow_redirects=True, timeout=30.0)
+                    if resp.status_code == 200:
+                        workflow_cache.set(cache_key, resp.text)
+                        all_cached_keys.add(cache_key)
+                        return True, False  # success, newly fetched
+                except Exception:
+                    pass
+            return False, False
+
+        for idx, repo_name in enumerate(repo_names):
+            stats["repos"] += 1
+
+            if (idx + 1) % 50 == 0 or idx == 0:
+                print(f"[{idx + 1}/{len(repo_names)}] Prefetching 
{repo_name}... "
+                      f"({stats['wf_fetched']} fetched, {stats['wf_skipped']} 
skipped, "
+                      f"{stats['wf_yaml_cached']} YAMLs, "
+                      f"{stats['ca_total']} composites)", flush=True)
+
+            # ---- Workflows ----
+            if is_classified(repo_name) or has_prefetch(repo_name):
+                stats["wf_skipped"] += 1
+            else:
+                resp = await github_get(
+                    
f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/.github/workflows")
+
+                if resp is None:
+                    stats["errors"] += 1
+                elif resp.status_code == 404:
+                    workflow_cache.set(f"__prefetch__:{repo_name}",
+                                       {"complete": True, "workflows": []})
+                    stats["wf_no_workflows"] += 1
+                elif resp.status_code != 200:
+                    stats["errors"] += 1
+                else:
+                    try:
+                        dir_listing = resp.json()
+                    except Exception:
+                        dir_listing = None
+
+                    if not dir_listing or not isinstance(dir_listing, list):
+                        workflow_cache.set(f"__prefetch__:{repo_name}",
+                                           {"complete": True, "workflows": []})
+                        stats["wf_no_workflows"] += 1
+                    else:
+                        yaml_files = [f for f in dir_listing
+                                      if isinstance(f, dict)
+                                      and f.get("name", "").endswith((".yml", 
".yaml"))]
+
+                        if not yaml_files:
+                            workflow_cache.set(f"__prefetch__:{repo_name}",
+                                               {"complete": True, "workflows": 
[]})
+                            stats["wf_no_workflows"] += 1
+                        else:
+                            # Fetch all YAML concurrently
+                            tasks = []
+                            wf_names = []
+                            for wf_file in yaml_files:
+                                wf_name = wf_file.get("name", "unknown")
+                                wf_names.append(wf_name)
+                                dl_url = wf_file.get("download_url")
+                                if dl_url:
+                                    tasks.append(fetch_single_yaml(repo_name, 
wf_name, dl_url))
+
+                            results = await asyncio.gather(*tasks, 
return_exceptions=True)
+                            for r in results:
+                                if isinstance(r, Exception):
+                                    stats["errors"] += 1
+                                else:
+                                    success, was_cached = r
+                                    if success:
+                                        if was_cached:
+                                            stats["wf_yaml_existed"] += 1
+                                        else:
+                                            stats["wf_yaml_cached"] += 1
+
+                            workflow_cache.set(f"__prefetch__:{repo_name}",
+                                               {"complete": True, "workflows": 
wf_names})
+                            stats["wf_fetched"] += 1
+
+            # ---- Composite actions ----
+            if has_composites(repo_name):
+                stats["ca_skipped"] += 1
+            else:
+                resp = await github_get(
+                    
f"{GITHUB_API}/repos/{owner}/{repo_name}/git/trees/HEAD?recursive=1")
+
+                composite_names = []
+                if resp and resp.status_code == 200:
+                    try:
+                        tree = resp.json().get("tree", [])
+                        action_files = [
+                            item["path"] for item in tree
+                            if item.get("path", 
"").startswith(".github/actions/")
+                            and item.get("path", "").endswith(("/action.yml", 
"/action.yaml"))
+                            and item.get("type") == "blob"
+                        ]
+
+                        for action_path in action_files:
+                            action_name = 
action_path.replace(".github/actions/", "").rsplit("/", 1)[0]
+                            short_path = 
f".github/actions/{action_name}/action.yml"
+                            cache_key = f"{repo_name}/{short_path}"
+
+                            # Already cached?
+                            if cache_key in all_cached_keys:
+                                composite_names.append(short_path)
+                                continue
+
+                            # Fetch it
+                            async with api_sem:
+                                aresp = await github_get(
+                                    
f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/{action_path}")
+                                if aresp and aresp.status_code == 200:
+                                    try:
+                                        dl_url = 
aresp.json().get("download_url")
+                                        if dl_url:
+                                            dl_resp = await http_client.get(
+                                                dl_url, follow_redirects=True, 
timeout=10.0)
+                                            if dl_resp.status_code == 200:
+                                                workflow_cache.set(cache_key, 
dl_resp.text)
+                                                all_cached_keys.add(cache_key)
+                                                
composite_names.append(short_path)
+                                                stats["ca_total"] += 1
+                                    except Exception:
+                                        pass
+
+                    except Exception as e:
+                        print(f"  Error scanning tree for {repo_name}: 
{str(e)[:80]}", flush=True)
+
+                if composite_names:
+                    stats["ca_repos_with"] += 1
+
+                workflow_cache.set(f"__composites__:{repo_name}", {
+                    "complete": True,
+                    "actions": composite_names,
+                })
+                stats["ca_fetched"] += 1
+
+        print(f"\n{'=' * 60}", flush=True)
+        print(f"Prefetch complete!", flush=True)
+        print(f"  Repos processed: {stats['repos']}", flush=True)
+        print(f"  Workflows:", flush=True)
+        print(f"    Skipped (already done): {stats['wf_skipped']}", flush=True)
+        print(f"    Newly fetched: {stats['wf_fetched']}", flush=True)
+        print(f"    No workflows: {stats['wf_no_workflows']}", flush=True)
+        print(f"    YAML files cached: {stats['wf_yaml_cached']} (already 
existed: {stats['wf_yaml_existed']})", flush=True)
+        print(f"  Composite actions:", flush=True)
+        print(f"    Skipped (already done): {stats['ca_skipped']}", flush=True)
+        print(f"    Repos scanned: {stats['ca_fetched']}", flush=True)
+        print(f"    Repos with composites: {stats['ca_repos_with']}", 
flush=True)
+        print(f"    Action files cached: {stats['ca_total']}", flush=True)
+        print(f"  Errors: {stats['errors']}", flush=True)
+        print(f"{'=' * 60}\n", flush=True)
+
+        return {"outputText": json.dumps(stats, indent=2)}
+
+    finally:
+        await http_client.aclose()
\ No newline at end of file
diff --git a/repos/apache/github-review/agents/publishing.py 
b/repos/apache/github-review/agents/publishing.py
index 662f9ed..c9ee349 100644
--- a/repos/apache/github-review/agents/publishing.py
+++ b/repos/apache/github-review/agents/publishing.py
@@ -407,19 +407,24 @@ async def run(input_dict, tools):
 
         print(f"\nStarting workflow scan of {len(repo_names)} repos...\n", 
flush=True)
 
-        # ===== STEP 2: Fetch workflows and classify =====
+        # ===== STEP 2: Fetch workflows and classify (parallel) =====
         all_results = {}
         stats = {"repos_scanned": 0, "repos_with_workflows": 0, 
"total_workflows": 0,
                  "total_classified": 0, "cache_hits": 0, "errors": []}
 
+        # Phase 1: Collect work items (sequential — checks caches, minimal API)
+        pending_llm = []  # [(repo_name, wf_name, yaml_content)]
+        repo_wf_names = {}  # repo -> [wf_names] for setting meta after 
classification
+
         for repo_idx, repo_name in enumerate(repo_names):
             stats["repos_scanned"] += 1
 
-            if (repo_idx + 1) % 25 == 0 or repo_idx == 0:
-                print(f"[{repo_idx + 1}/{len(repo_names)}] Scanning 
{repo_name}... "
-                      f"({stats['total_workflows']} wfs, 
{stats['total_classified']} classified, "
-                      f"{stats['cache_hits']} cached)", flush=True)
+            if (repo_idx + 1) % 100 == 0 or repo_idx == 0:
+                print(f"[{repo_idx + 1}/{len(repo_names)}] Collecting 
{repo_name}... "
+                      f"({stats['total_workflows']} wfs, {stats['cache_hits']} 
cached, "
+                      f"{len(pending_llm)} pending LLM)", flush=True)
 
+            # Already fully classified?
             meta_key = f"__meta__:{repo_name}"
             cached_meta = classification_cache.get(meta_key)
 
@@ -439,99 +444,151 @@ async def run(input_dict, tools):
                         stats["total_classified"] += len(repo_results)
                 continue
 
-            resp = await 
github_get(f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/.github/workflows")
+            # Check prefetch cache first (from prefetch agent)
+            prefetch_meta = 
workflow_content_cache.get(f"__prefetch__:{repo_name}")
+            wf_names_list = None
+            if prefetch_meta and prefetch_meta.get("complete"):
+                wf_names_list = prefetch_meta.get("workflows", [])
+                if not wf_names_list:
+                    # Prefetch confirmed no workflows
+                    classification_cache.set(meta_key, {"complete": True, 
"workflows": []})
+                    continue
 
-            if resp is None:
-                stats["errors"].append(f"{owner}/{repo_name}: network error")
-                continue
-            if resp.status_code == 404:
-                classification_cache.set(meta_key, {"complete": True, 
"workflows": []})
-                continue
-            if resp.status_code != 200:
-                stats["errors"].append(f"{owner}/{repo_name}: HTTP 
{resp.status_code}")
-                continue
+            # If no prefetch data, fetch from GitHub API
+            if wf_names_list is None:
+                resp = await 
github_get(f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/.github/workflows")
 
-            try:
-                dir_listing = resp.json()
-            except Exception:
-                classification_cache.set(meta_key, {"complete": True, 
"workflows": []})
-                continue
+                if resp is None:
+                    stats["errors"].append(f"{owner}/{repo_name}: network 
error")
+                    continue
+                if resp.status_code == 404:
+                    classification_cache.set(meta_key, {"complete": True, 
"workflows": []})
+                    continue
+                if resp.status_code != 200:
+                    stats["errors"].append(f"{owner}/{repo_name}: HTTP 
{resp.status_code}")
+                    continue
 
-            if not isinstance(dir_listing, list):
-                classification_cache.set(meta_key, {"complete": True, 
"workflows": []})
-                continue
+                try:
+                    dir_listing = resp.json()
+                except Exception:
+                    classification_cache.set(meta_key, {"complete": True, 
"workflows": []})
+                    continue
 
-            yaml_files = [f for f in dir_listing
-                          if isinstance(f, dict) and f.get("name", 
"").endswith((".yml", ".yaml"))]
+                if not isinstance(dir_listing, list):
+                    classification_cache.set(meta_key, {"complete": True, 
"workflows": []})
+                    continue
 
-            if not yaml_files:
-                classification_cache.set(meta_key, {"complete": True, 
"workflows": []})
-                continue
+                yaml_files = [f for f in dir_listing
+                              if isinstance(f, dict) and f.get("name", 
"").endswith((".yml", ".yaml"))]
+                wf_names_list = [f.get("name", "unknown") for f in yaml_files]
 
-            stats["repos_with_workflows"] += 1
-            repo_results = []
-            workflow_names = []
+                if not wf_names_list:
+                    classification_cache.set(meta_key, {"complete": True, 
"workflows": []})
+                    continue
 
-            for wf_file in yaml_files:
-                wf_name = wf_file.get("name", "unknown")
-                workflow_names.append(wf_name)
-                stats["total_workflows"] += 1
+                # Fetch YAML for any not already in content cache
+                for wf_file in yaml_files:
+                    wf_name = wf_file.get("name", "unknown")
+                    cache_key = f"{repo_name}/{wf_name}"
+                    if not workflow_content_cache.get(cache_key):
+                        raw_url = wf_file.get("download_url")
+                        if raw_url:
+                            try:
+                                content_resp = await http_client.get(raw_url, 
follow_redirects=True, timeout=30.0)
+                                if content_resp.status_code == 200:
+                                    workflow_content_cache.set(cache_key, 
content_resp.text)
+                            except Exception:
+                                pass
 
-                wf_cache_key = f"{repo_name}:{wf_name}"
-                cached_cls = classification_cache.get(wf_cache_key)
+            stats["repos_with_workflows"] += 1
+            stats["total_workflows"] += len(wf_names_list)
+            repo_wf_names[repo_name] = wf_names_list
+
+            # Check each workflow: cached classification or needs LLM?
+            for wf_name in wf_names_list:
+                cached_cls = classification_cache.get(f"{repo_name}:{wf_name}")
                 if cached_cls:
-                    repo_results.append(cached_cls)
-                    stats["total_classified"] += 1
+                    all_results.setdefault(repo_name, []).append(cached_cls)
                     stats["cache_hits"] += 1
+                    stats["total_classified"] += 1
                     continue
 
-                raw_url = wf_file.get("download_url")
-                yaml_content = None
-                if raw_url:
-                    try:
-                        content_resp = await http_client.get(raw_url, 
follow_redirects=True, timeout=30.0)
-                        if content_resp.status_code == 200:
-                            yaml_content = content_resp.text
-                    except Exception:
-                        pass
-
+                # Get YAML from content cache
+                yaml_content = 
workflow_content_cache.get(f"{repo_name}/{wf_name}")
                 if yaml_content is None:
-                    repo_results.append({"file": wf_name, "error": "Could not 
fetch", "publishes_to_registry": None})
+                    all_results.setdefault(repo_name, []).append(
+                        {"file": wf_name, "error": "Could not fetch", 
"publishes_to_registry": None})
                     continue
 
-                workflow_content_cache.set(f"{repo_name}/{wf_name}", 
yaml_content)
-                yaml_content = truncate_yaml(yaml_content)
+                pending_llm.append((repo_name, wf_name, yaml_content))
 
-                llm_response = None
-                try:
-                    messages = [{"role": "user", "content": (
-                        f"{CLASSIFICATION_PROMPT}\n\n---\n"
-                        f"File: 
{owner}/{repo_name}/.github/workflows/{wf_name}\n---\n\n{yaml_content}"
-                    )}]
-
-                    llm_response, _ = await call_llm(
-                        provider=provider, model=model, messages=messages,
-                        parameters=configured_params, user_service=None, 
user_id=None)
-
-                    classification = parse_classification(llm_response)
-                    classification["file"] = wf_name
-                    repo_results.append(classification)
-                    classification_cache.set(wf_cache_key, classification)
-                    stats["total_classified"] += 1
+        print(f"\nCollection complete: {stats['repos_scanned']} repos, "
+              f"{stats['total_workflows']} workflows, {stats['cache_hits']} 
cached, "
+              f"{len(pending_llm)} need LLM classification\n", flush=True)
 
-                except json.JSONDecodeError:
-                    repo_results.append({"file": wf_name, "error": "JSON parse 
error",
-                                         "raw_response": (llm_response or 
"")[:300], "publishes_to_registry": None})
-                    
stats["errors"].append(f"{owner}/{repo_name}/.github/workflows/{wf_name}: JSON 
parse error")
-                except Exception as e:
-                    repo_results.append({"file": wf_name, "error": 
str(e)[:200], "publishes_to_registry": None})
-                    
stats["errors"].append(f"{owner}/{repo_name}/.github/workflows/{wf_name}: 
{str(e)[:80]}")
+        # Phase 2: Parallel LLM classification
+        if pending_llm:
+            llm_sem = asyncio.Semaphore(5)
+            completed = {"count": 0}
 
-                await asyncio.sleep(0.3)
+            async def classify_workflow(repo_name, wf_name, yaml_content):
+                yaml_truncated = truncate_yaml(yaml_content)
+                messages = [{"role": "user", "content": (
+                    f"{CLASSIFICATION_PROMPT}\n\n---\n"
+                    f"File: 
{owner}/{repo_name}/.github/workflows/{wf_name}\n---\n\n{yaml_truncated}"
+                )}]
+
+                async with llm_sem:
+                    llm_response = None
+                    try:
+                        llm_response, _ = await call_llm(
+                            provider=provider, model=model, messages=messages,
+                            parameters=configured_params, user_service=None, 
user_id=None)
+
+                        classification = parse_classification(llm_response)
+                        classification["file"] = wf_name
+                        classification_cache.set(f"{repo_name}:{wf_name}", 
classification)
+
+                        completed["count"] += 1
+                        if completed["count"] % 25 == 0:
+                            print(f"  Classified 
{completed['count']}/{len(pending_llm)}...", flush=True)
+
+                        return repo_name, wf_name, classification, None
+
+                    except json.JSONDecodeError:
+                        error_result = {"file": wf_name, "error": "JSON parse 
error",
+                                        "raw_response": (llm_response or 
"")[:300],
+                                        "publishes_to_registry": None}
+                        return repo_name, wf_name, error_result, \
+                            f"{owner}/{repo_name}/.github/workflows/{wf_name}: 
JSON parse error"
+
+                    except Exception as e:
+                        error_result = {"file": wf_name, "error": str(e)[:200],
+                                        "publishes_to_registry": None}
+                        return repo_name, wf_name, error_result, \
+                            f"{owner}/{repo_name}/.github/workflows/{wf_name}: 
{str(e)[:80]}"
+
+            print(f"Starting parallel classification of {len(pending_llm)} 
workflows "
+                  f"(concurrency=5)...\n", flush=True)
+
+            tasks = [classify_workflow(r, w, y) for r, w, y in pending_llm]
+            results = await asyncio.gather(*tasks, return_exceptions=True)
+
+            for result in results:
+                if isinstance(result, Exception):
+                    stats["errors"].append(f"Unexpected error: 
{str(result)[:100]}")
+                    continue
+
+                repo_name, wf_name, classification, error_msg = result
+                all_results.setdefault(repo_name, []).append(classification)
+                stats["total_classified"] += 1
+                if error_msg:
+                    stats["errors"].append(error_msg)
 
-            if repo_results:
-                all_results[repo_name] = repo_results
-            classification_cache.set(meta_key, {"complete": True, "workflows": 
workflow_names})
+        # Phase 3: Set meta keys for all processed repos
+        for repo_name, wf_names in repo_wf_names.items():
+            classification_cache.set(f"__meta__:{repo_name}", {
+                "complete": True, "workflows": wf_names})
 
         print(f"\n{'=' * 60}", flush=True)
         print(f"Scan complete! {stats['repos_scanned']} repos, 
{stats['total_classified']} classified "
diff --git a/repos/apache/github-review/agents/security.py 
b/repos/apache/github-review/agents/security.py
index 71ab0bb..fedbe4e 100644
--- a/repos/apache/github-review/agents/security.py
+++ b/repos/apache/github-review/agents/security.py
@@ -487,111 +487,120 @@ async def run(input_dict, tools):
                     "detail": "No dependabot.yml or renovate.json found.",
                 })
 
-            # Check 9: Composite actions via recursive Git Trees API
-            # One API call gets the entire tree, handles any nesting depth
+            # Check 9: Composite actions — read from prefetch cache or fall 
back to GitHub
             composite_findings = []
             composite_analyzed = 0
             composite_total = 0
 
-            resp = await github_get(
-                
f"{GITHUB_API}/repos/{owner}/{repo_name}/git/trees/HEAD?recursive=1")
-            if resp and resp.status_code == 200:
-                try:
-                    tree = resp.json().get("tree", [])
-                    action_files = [
-                        item["path"] for item in tree
-                        if item.get("path", "").startswith(".github/actions/")
-                        and item.get("path", "").endswith(("/action.yml", 
"/action.yaml"))
-                        and item.get("type") == "blob"
-                    ]
-                    composite_total = len(action_files)
-
-                    for action_path in action_files:
-                        # Extract action name: 
.github/actions/build/rust/action.yml -> build/rust
-                        action_name = action_path.replace(".github/actions/", 
"").rsplit("/", 1)[0]
-
-                        # Fetch the action.yml content
-                        aresp = await github_get(
-                            
f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/{action_path}")
-                        if not aresp or aresp.status_code != 200:
-                            continue
-
-                        try:
-                            dl_url = aresp.json().get("download_url")
-                            if not dl_url:
+            # Collect (action_name, short_path, action_content) tuples
+            composite_items = []
+
+            composites_meta = workflow_ns.get(f"__composites__:{repo_name}")
+            if composites_meta and composites_meta.get("complete"):
+                # Read from prefetch cache
+                cached_actions = composites_meta.get("actions", [])
+                for short_path in cached_actions:
+                    action_content = 
workflow_ns.get(f"{repo_name}/{short_path}")
+                    if action_content:
+                        action_name = short_path.replace(".github/actions/", 
"").rsplit("/", 1)[0]
+                        composite_items.append((action_name, short_path, 
action_content))
+            else:
+                # Fall back to GitHub API
+                resp = await github_get(
+                    
f"{GITHUB_API}/repos/{owner}/{repo_name}/git/trees/HEAD?recursive=1")
+                if resp and resp.status_code == 200:
+                    try:
+                        tree = resp.json().get("tree", [])
+                        action_files = [
+                            item["path"] for item in tree
+                            if item.get("path", 
"").startswith(".github/actions/")
+                            and item.get("path", "").endswith(("/action.yml", 
"/action.yaml"))
+                            and item.get("type") == "blob"
+                        ]
+                        for action_path in action_files:
+                            action_name = 
action_path.replace(".github/actions/", "").rsplit("/", 1)[0]
+                            aresp = await github_get(
+                                
f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/{action_path}")
+                            if not aresp or aresp.status_code != 200:
                                 continue
-                            dl_resp = await http_client.get(dl_url, 
follow_redirects=True, timeout=10.0)
-                            if dl_resp.status_code != 200:
+                            try:
+                                dl_url = aresp.json().get("download_url")
+                                if not dl_url:
+                                    continue
+                                dl_resp = await http_client.get(dl_url, 
follow_redirects=True, timeout=10.0)
+                                if dl_resp.status_code != 200:
+                                    continue
+                                action_content = dl_resp.text
+                                short_path = 
f".github/actions/{action_name}/action.yml"
+                                workflow_ns.set(f"{repo_name}/{short_path}", 
action_content)
+                                composite_items.append((action_name, 
short_path, action_content))
+                            except Exception:
                                 continue
-                            action_content = dl_resp.text
-                        except Exception:
-                            continue
+                    except Exception as e:
+                        print(f"  Error scanning composite actions for 
{repo_name}: {str(e)[:100]}", flush=True)
 
-                        composite_analyzed += 1
-                        short_path = 
f".github/actions/{action_name}/action.yml"
-
-                        # Store for other agents
-                        workflow_ns.set(f"{repo_name}/{short_path}", 
action_content)
-
-                        # Run injection checks
-                        context = f"composite action 
.github/actions/{action_name}"
-                        injections = 
find_injection_in_run_blocks(action_content, context_label=context)
-                        for sev, detail in injections:
-                            composite_findings.append({
-                                "check": "composite_action_injection",
-                                "severity": sev,
-                                "file": short_path,
-                                "detail": detail,
-                            })
-
-                        # Check unpinned actions inside composite
-                        ca_refs = extract_action_refs(action_content)
-                        for ref in ca_refs:
-                            parsed = parse_action_ref(ref)
-                            if parsed["type"] == "remote" and not 
parsed["pinned"]:
-                                composite_findings.append({
-                                    "check": "composite_action_unpinned",
-                                    "severity": "MEDIUM",
-                                    "file": short_path,
-                                    "detail": (f"Composite action uses 
unpinned action `{parsed['raw']}`. "
-                                               "Supply chain risk."),
-                                })
+            composite_total = len(composite_items)
 
-                        # Check inputs.* directly in run blocks (hidden 
injection)
-                        has_input_injection = False
-                        in_run = False
-                        run_indent = 0
-                        for cline in action_content.split("\n"):
-                            cs = cline.strip()
-                            if cs.startswith("run:"):
-                                in_run = True
-                                run_indent = len(cline) - len(cline.lstrip())
-                                rest = cs[4:].strip()
-                                if rest.startswith("|") or 
rest.startswith(">"):
-                                    continue
-                                if "inputs." in rest and "${{" in rest:
-                                    has_input_injection = True
-                                    break
-                            elif in_run:
-                                ci = len(cline) - len(cline.lstrip())
-                                if cs and ci <= run_indent:
-                                    in_run = False
-                                elif "inputs." in cline and "${{" in cline:
-                                    has_input_injection = True
-                                    break
-
-                        if has_input_injection:
-                            composite_findings.append({
-                                "check": "composite_action_input_injection",
-                                "severity": "HIGH",
-                                "file": short_path,
-                                "detail": (f"Composite action `{action_name}` 
directly interpolates "
-                                           "`inputs.*` in run block. Callers 
may pass untrusted values — "
-                                           "the injection is hidden from 
workflow-level analysis."),
-                            })
-
-                except Exception as e:
-                    print(f"  Error scanning composite actions for 
{repo_name}: {str(e)[:100]}", flush=True)
+            # Analyze each composite action
+            for action_name, short_path, action_content in composite_items:
+                composite_analyzed += 1
+
+                # Run injection checks
+                context = f"composite action .github/actions/{action_name}"
+                injections = find_injection_in_run_blocks(action_content, 
context_label=context)
+                for sev, detail in injections:
+                    composite_findings.append({
+                        "check": "composite_action_injection",
+                        "severity": sev,
+                        "file": short_path,
+                        "detail": detail,
+                    })
+
+                # Check unpinned actions inside composite
+                ca_refs = extract_action_refs(action_content)
+                for ref in ca_refs:
+                    parsed = parse_action_ref(ref)
+                    if parsed["type"] == "remote" and not parsed["pinned"]:
+                        composite_findings.append({
+                            "check": "composite_action_unpinned",
+                            "severity": "MEDIUM",
+                            "file": short_path,
+                            "detail": (f"Composite action uses unpinned action 
`{parsed['raw']}`. "
+                                       "Supply chain risk."),
+                        })
+
+                # Check inputs.* directly in run blocks (hidden injection)
+                has_input_injection = False
+                in_run = False
+                run_indent = 0
+                for cline in action_content.split("\n"):
+                    cs = cline.strip()
+                    if cs.startswith("run:"):
+                        in_run = True
+                        run_indent = len(cline) - len(cline.lstrip())
+                        rest = cs[4:].strip()
+                        if rest.startswith("|") or rest.startswith(">"):
+                            continue
+                        if "inputs." in rest and "${{" in rest:
+                            has_input_injection = True
+                            break
+                    elif in_run:
+                        ci = len(cline) - len(cline.lstrip())
+                        if cs and ci <= run_indent:
+                            in_run = False
+                        elif "inputs." in cline and "${{" in cline:
+                            has_input_injection = True
+                            break
+
+                if has_input_injection:
+                    composite_findings.append({
+                        "check": "composite_action_input_injection",
+                        "severity": "HIGH",
+                        "file": short_path,
+                        "detail": (f"Composite action `{action_name}` directly 
interpolates "
+                                   "`inputs.*` in run block. Callers may pass 
untrusted values — "
+                                   "the injection is hidden from 
workflow-level analysis."),
+                    })
 
             # Deduplicate composite findings per file before adding
             composite_findings = deduplicate_findings(composite_findings)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to