This is an automated email from the ASF dual-hosted git repository.
akm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-agents.git
The following commit(s) were added to refs/heads/main by this push:
new f63509b Fixes and json export
f63509b is described below
commit f63509be6e5f07d1b5b663c4145fdcf0dc88560b
Author: Andrew Musselman <[email protected]>
AuthorDate: Mon Apr 6 16:30:33 2026 -0700
Fixes and json export
---
repos/apache/github-review/agents/json-export.py | 239 ++++++++++++++++++
repos/apache/github-review/agents/pre-fetch.py | 308 +++++++++++++++++++++++
repos/apache/github-review/agents/publishing.py | 215 ++++++++++------
repos/apache/github-review/agents/security.py | 203 ++++++++-------
4 files changed, 789 insertions(+), 176 deletions(-)
diff --git a/repos/apache/github-review/agents/json-export.py
b/repos/apache/github-review/agents/json-export.py
new file mode 100644
index 0000000..de557aa
--- /dev/null
+++ b/repos/apache/github-review/agents/json-export.py
@@ -0,0 +1,239 @@
+from agent_factory.remote_mcp_client import RemoteMCPClient
+from services.llm_service import call_llm
+import httpx
+import re
+
+async def run(input_dict, tools):
+ mcpc = { url : RemoteMCPClient(remote_url = url) for url in tools.keys() }
+ http_client = httpx.AsyncClient()
+ try:
+ owner = input_dict.get("owner", "apache")
+ print(f"Agent 4 (JSON export) starting for owner={owner}", flush=True)
+
+ report_ns = data_store.use_namespace(f"ci-report:{owner}")
+ security_ns = data_store.use_namespace(f"ci-security:{owner}")
+ classification_ns =
data_store.use_namespace(f"ci-classification:{owner}")
+
+ pub_stats = report_ns.get("latest_stats")
+ sec_stats = security_ns.get("latest_stats")
+
+ if not pub_stats or not sec_stats:
+ return {"outputText": "Error: Run Agent 1 and Agent 2 first."}
+
+ # --- Collect all repos ---
+ publishing_repos = set(pub_stats.get("publishing_repos", []))
+
+ # --- Read per-repo classifications from Agent 1 cache ---
+ all_cls_keys = classification_ns.list_keys()
+ meta_keys = [k for k in all_cls_keys if k.startswith("__meta__:")]
+
+ repo_workflows = {} # repo -> [workflow classifications]
+ for mk in meta_keys:
+ repo = mk.replace("__meta__:", "")
+ meta = classification_ns.get(mk)
+ if not meta or not meta.get("complete"):
+ continue
+ wf_names = meta.get("workflows", [])
+ if not wf_names:
+ repo_workflows[repo] = []
+ continue
+ wfs = []
+ for wf_name in wf_names:
+ cls = classification_ns.get(f"{repo}:{wf_name}")
+ if cls:
+ wfs.append(cls)
+ repo_workflows[repo] = wfs
+
+ print(f"Read classifications for {len(repo_workflows)} repos",
flush=True)
+
+ # --- Read per-repo security findings from Agent 2 cache ---
+ all_sec_keys = security_ns.list_keys()
+ finding_keys = [k for k in all_sec_keys if k.startswith("findings:")]
+
+ repo_findings = {}
+ for k in finding_keys:
+ repo = k.replace("findings:", "")
+ findings = security_ns.get(k)
+ if findings and isinstance(findings, list):
+ repo_findings[repo] = findings
+
+ print(f"Read findings for {len(repo_findings)} repos", flush=True)
+
+ # --- Build per-repo JSON ---
+ all_repos = sorted(set(repo_workflows.keys()) |
set(repo_findings.keys()))
+
+ CATEGORY_ORDER = ["release_artifact", "snapshot_artifact",
"ci_infrastructure", "documentation", "none"]
+ SEV_ORDER = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]
+
+ def safe_str(val):
+ if val is None:
+ return ""
+ if isinstance(val, dict):
+ return json.dumps(val)
+ if isinstance(val, list):
+ return ", ".join(str(v) for v in val)
+ return str(val).strip()
+
+ def classify_workflow(w):
+ """Extract clean workflow record from classification cache."""
+ cat = safe_str(w.get("category")).lower().strip()
+ if cat not in CATEGORY_ORDER:
+ cat = "none"
+
+ ecosystems = []
+ for e in (w.get("ecosystems") or []):
+ e_str = safe_str(e).lower().strip().replace(" ", "_")
+ if e_str and e_str != "github_actions_artifacts":
+ ecosystems.append(e_str)
+
+ # Normalize security notes to strings
+ notes = []
+ for n in (w.get("security_notes") or []):
+ if isinstance(n, str):
+ notes.append(n.strip())
+ elif isinstance(n, dict):
+ risk = n.get("risk_level") or n.get("risk") or "INFO"
+ desc = n.get("description") or n.get("details") or str(n)
+ notes.append(f"[{risk}] {desc}")
+
+ return {
+ "file": w.get("file", "unknown"),
+ "workflow_name": safe_str(w.get("workflow_name")) or
w.get("file", "unknown"),
+ "publishes": bool(w.get("publishes_to_registry")),
+ "category": cat,
+ "ecosystems": ecosystems,
+ "trigger": safe_str(w.get("trigger")),
+ "auth_method": safe_str(w.get("auth_method")),
+ "publish_actions": w.get("publish_actions") or [],
+ "publish_commands": w.get("publish_commands") or [],
+ "summary": safe_str(w.get("summary")),
+ "confidence": safe_str(w.get("confidence")),
+ "security_notes": notes,
+ }
+
+ def classify_finding(f):
+ """Extract clean finding record."""
+ return {
+ "severity": f.get("severity", "INFO"),
+ "check": f.get("check", "unknown"),
+ "file": f.get("file", "unknown"),
+ "description": f.get("description", ""),
+ }
+
+ def summarize_severities(findings):
+ counts = {}
+ for f in findings:
+ s = f.get("severity", "INFO")
+ counts[s] = counts.get(s, 0) + 1
+ return counts
+
+ def summarize_checks(findings):
+ counts = {}
+ for f in findings:
+ chk = f.get("check", "unknown")
+ counts[chk] = counts.get(chk, 0) + 1
+ return counts
+
+ repos_json = []
+ for repo in all_repos:
+ wfs = repo_workflows.get(repo, [])
+ findings = repo_findings.get(repo, [])
+ publishes = repo in publishing_repos
+
+ # Classify workflows
+ workflow_records = [classify_workflow(w) for w in wfs]
+
+ # Workflow category summary
+ cat_counts = {}
+ eco_set = set()
+ for wr in workflow_records:
+ if wr["publishes"]:
+ cat_counts[wr["category"]] =
cat_counts.get(wr["category"], 0) + 1
+ eco_set.update(wr["ecosystems"])
+
+ # Finding records
+ finding_records = [classify_finding(f) for f in findings]
+ sev_counts = summarize_severities(finding_records)
+ check_counts = summarize_checks(finding_records)
+
+ # Worst severity
+ worst = "none"
+ for s in SEV_ORDER:
+ if sev_counts.get(s, 0) > 0:
+ worst = s
+ break
+
+ # Trusted publishing: does this repo have TP opportunities?
+ has_tp = False
+ tp_ecosystems = []
+ for wr in workflow_records:
+ if not wr["publishes"] or wr["category"] not in
("release_artifact", "snapshot_artifact"):
+ continue
+ auth_lower = wr["auth_method"].lower()
+ if "oidc" in auth_lower or "trusted publisher" in auth_lower
or "id-token" in auth_lower:
+ continue
+ token_pats = ["token", "password", "secret", "api_key",
"apikey", "nexus_user", "nexus_pw"]
+ uses_token = any(p in auth_lower for p in token_pats)
+ if uses_token:
+ tp_eligible = {"pypi", "npm", "nuget", "rubygems",
"crates_io"}
+ for eco in wr["ecosystems"]:
+ if eco in tp_eligible:
+ has_tp = True
+ if eco not in tp_ecosystems:
+ tp_ecosystems.append(eco)
+
+ repos_json.append({
+ "repo": f"{owner}/{repo}",
+ "has_workflows": len(wfs) > 0,
+ "total_workflows": len(wfs),
+ "publishes_to_registry": publishes,
+ "ecosystems": sorted(eco_set),
+ "category_counts": cat_counts,
+ "trusted_publishing": {
+ "migration_needed": has_tp,
+ "eligible_ecosystems": sorted(tp_ecosystems),
+ },
+ "security": {
+ "total_findings": len(finding_records),
+ "worst_severity": worst,
+ "severity_counts": {s: sev_counts.get(s, 0) for s in
SEV_ORDER if sev_counts.get(s, 0) > 0},
+ "check_counts": check_counts,
+ },
+ "workflows": workflow_records,
+ "findings": finding_records,
+ })
+
+ # --- Build top-level summary ---
+ output = {
+ "schema_version": "1.0",
+ "owner": owner,
+ "generated_at":
__import__("datetime").datetime.utcnow().isoformat() + "Z",
+ "summary": {
+ "repos_scanned": pub_stats.get("repos_scanned", 0),
+ "repos_with_workflows": pub_stats.get("repos_with_workflows",
0),
+ "total_workflows": pub_stats.get("total_workflows", 0),
+ "repos_publishing": len(publishing_repos),
+ "ecosystem_counts": pub_stats.get("ecosystem_counts", {}),
+ "category_counts": pub_stats.get("by_category", {}),
+ "trusted_publishing_opportunities":
pub_stats.get("trusted_publishing_opportunities", 0),
+ "security": {
+ "total_findings": sec_stats.get("total_findings", 0),
+ "repos_with_findings":
sec_stats.get("repos_with_findings", 0),
+ "severity_counts": sec_stats.get("severity_counts", {}),
+ "check_counts": sec_stats.get("check_counts", {}),
+ },
+ },
+ "repos": repos_json,
+ }
+
+ output_json = json.dumps(output, indent=2, ensure_ascii=False)
+ print(f"JSON report: {len(output_json)} chars, {len(repos_json)}
repos", flush=True)
+
+ # Store in data store
+ combined_ns = data_store.use_namespace(f"ci-combined:{owner}")
+ combined_ns.set("latest_json", output)
+
+ return {"outputText": output_json}
+
+ finally:
+ await http_client.aclose()
\ No newline at end of file
diff --git a/repos/apache/github-review/agents/pre-fetch.py
b/repos/apache/github-review/agents/pre-fetch.py
new file mode 100644
index 0000000..c2bbc54
--- /dev/null
+++ b/repos/apache/github-review/agents/pre-fetch.py
@@ -0,0 +1,308 @@
+from agent_factory.remote_mcp_client import RemoteMCPClient
+from services.llm_service import call_llm
+import httpx
+
+async def run(input_dict, tools):
+ mcpc = { url : RemoteMCPClient(remote_url = url) for url in tools.keys() }
+ http_client = httpx.AsyncClient()
+ try:
+ owner = input_dict.get("owner", "apache")
+ all_repos_raw = input_dict.get("all_repos", "false")
+ repos_str = input_dict.get("repos", "").strip()
+ github_pat = input_dict.get("github_pat", "").strip()
+ clear_cache_raw = input_dict.get("clear_cache", "false")
+
+ all_repos = str(all_repos_raw).lower().strip() in ("true", "1", "yes")
+ clear_cache = str(clear_cache_raw).lower().strip() in ("true", "1",
"yes")
+
+ if not github_pat:
+ return {"outputText": "Error: `github_pat` is required.\n"
+ "Create a fine-grained PAT with **Contents: read** at
https://github.com/settings/tokens"}
+
+ if not all_repos and not repos_str:
+ return {"outputText": "Error: provide `repos` or set `all_repos`
to `true`."}
+
+ GITHUB_API = "https://api.github.com"
+ gh_headers = {"Accept": "application/vnd.github.v3+json",
+ "Authorization": f"token {github_pat}"}
+
+ workflow_cache = data_store.use_namespace(f"ci-workflows:{owner}")
+ classification_cache =
data_store.use_namespace(f"ci-classification:{owner}")
+
+ if clear_cache:
+ print("Clearing workflow cache...", flush=True)
+ for key in workflow_cache.list_keys():
+ workflow_cache.delete(key)
+ print("Cache cleared.", flush=True)
+
+ # --- Preflight ---
+ preflight_resp = await http_client.get(f"{GITHUB_API}/rate_limit",
headers=gh_headers, timeout=15.0)
+ if preflight_resp.status_code == 401:
+ return {"outputText": "Error: GitHub PAT is invalid or expired
(HTTP 401)."}
+ if preflight_resp.status_code == 200:
+ rate = preflight_resp.json().get("resources", {}).get("core", {})
+ print(f"GitHub API: {rate.get('remaining',
'?')}/{rate.get('limit', '?')} remaining", flush=True)
+
+ # --- GitHub GET with retry/rate-limit handling ---
+ async def github_get(url, params=None):
+ for attempt in range(5):
+ try:
+ resp = await http_client.get(url, headers=gh_headers,
params=params, timeout=30.0)
+ except Exception as e:
+ print(f" HTTP error (attempt {attempt+1}):
{str(e)[:80]}", flush=True)
+ if attempt < 4:
+ await asyncio.sleep(2 ** attempt)
+ continue
+ return None
+
+ if resp.status_code == 429:
+ wait = int(resp.headers.get("Retry-After", "60"))
+ print(f" Rate limited, waiting {wait}s...", flush=True)
+ await asyncio.sleep(min(wait, 120))
+ continue
+
+ if resp.status_code == 403:
+ if resp.headers.get("X-RateLimit-Remaining", "") == "0":
+ print(f" Rate limit exhausted, waiting 60s...",
flush=True)
+ await asyncio.sleep(60)
+ continue
+
+ remaining = resp.headers.get("X-RateLimit-Remaining")
+ if remaining:
+ try:
+ rem = int(remaining)
+ if rem < 50:
+ print(f" WARNING: {rem} API requests remaining",
flush=True)
+ await asyncio.sleep(5)
+ elif rem < 100:
+ await asyncio.sleep(2)
+ except ValueError:
+ pass
+
+ return resp
+ return None
+
+ # --- Step 1: Get repo list ---
+ if all_repos:
+ print(f"Fetching all repos for {owner}...", flush=True)
+ repo_names = []
+ skipped = 0
+ page = 1
+ while True:
+ resp = await github_get(
+ f"{GITHUB_API}/orgs/{owner}/repos",
+ params={"per_page": 100, "page": page, "sort": "pushed",
"type": "public"})
+ if resp is None or resp.status_code != 200:
+ break
+ data = resp.json()
+ if not data or not isinstance(data, list):
+ break
+ for r in data:
+ if isinstance(r, dict) and "name" in r:
+ if r.get("archived"):
+ skipped += 1
+ else:
+ repo_names.append(r["name"])
+ if 'rel="next"' not in resp.headers.get("Link", ""):
+ break
+ page += 1
+ await asyncio.sleep(0.3)
+ print(f"Found {len(repo_names)} active repos ({skipped} archived
skipped)", flush=True)
+ else:
+ repo_names = [r.strip() for r in repos_str.split(",") if r.strip()]
+ print(f"Using provided list of {len(repo_names)} repos",
flush=True)
+
+ if not repo_names:
+ return {"outputText": "No repositories found."}
+
+ # --- Build index of what's already cached ---
+ print("Building cache index...", flush=True)
+ all_cached_keys = set(workflow_cache.list_keys())
+ print(f" {len(all_cached_keys)} keys in workflow cache", flush=True)
+
+ def has_prefetch(repo_name):
+ meta = workflow_cache.get(f"__prefetch__:{repo_name}")
+ return meta and meta.get("complete")
+
+ def has_composites(repo_name):
+ meta = workflow_cache.get(f"__composites__:{repo_name}")
+ return meta and meta.get("complete")
+
+ def is_classified(repo_name):
+ meta = classification_cache.get(f"__meta__:{repo_name}")
+ return meta and meta.get("complete")
+
+ # --- Semaphore for concurrent fetches ---
+ api_sem = asyncio.Semaphore(10)
+
+ # --- Step 2: Prefetch workflows ---
+ stats = {"repos": 0, "wf_skipped": 0, "wf_fetched": 0,
"wf_no_workflows": 0,
+ "wf_yaml_cached": 0, "wf_yaml_existed": 0,
+ "ca_skipped": 0, "ca_fetched": 0, "ca_repos_with": 0,
"ca_total": 0,
+ "errors": 0}
+
+ async def fetch_single_yaml(repo_name, wf_name, download_url):
+ """Fetch one YAML file if not already cached."""
+ cache_key = f"{repo_name}/{wf_name}"
+ if cache_key in all_cached_keys:
+ return True, True # success, was_cached
+
+ async with api_sem:
+ try:
+ resp = await http_client.get(download_url,
follow_redirects=True, timeout=30.0)
+ if resp.status_code == 200:
+ workflow_cache.set(cache_key, resp.text)
+ all_cached_keys.add(cache_key)
+ return True, False # success, newly fetched
+ except Exception:
+ pass
+ return False, False
+
+ for idx, repo_name in enumerate(repo_names):
+ stats["repos"] += 1
+
+ if (idx + 1) % 50 == 0 or idx == 0:
+ print(f"[{idx + 1}/{len(repo_names)}] Prefetching
{repo_name}... "
+ f"({stats['wf_fetched']} fetched, {stats['wf_skipped']}
skipped, "
+ f"{stats['wf_yaml_cached']} YAMLs, "
+ f"{stats['ca_total']} composites)", flush=True)
+
+ # ---- Workflows ----
+ if is_classified(repo_name) or has_prefetch(repo_name):
+ stats["wf_skipped"] += 1
+ else:
+ resp = await github_get(
+
f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/.github/workflows")
+
+ if resp is None:
+ stats["errors"] += 1
+ elif resp.status_code == 404:
+ workflow_cache.set(f"__prefetch__:{repo_name}",
+ {"complete": True, "workflows": []})
+ stats["wf_no_workflows"] += 1
+ elif resp.status_code != 200:
+ stats["errors"] += 1
+ else:
+ try:
+ dir_listing = resp.json()
+ except Exception:
+ dir_listing = None
+
+ if not dir_listing or not isinstance(dir_listing, list):
+ workflow_cache.set(f"__prefetch__:{repo_name}",
+ {"complete": True, "workflows": []})
+ stats["wf_no_workflows"] += 1
+ else:
+ yaml_files = [f for f in dir_listing
+ if isinstance(f, dict)
+ and f.get("name", "").endswith((".yml",
".yaml"))]
+
+ if not yaml_files:
+ workflow_cache.set(f"__prefetch__:{repo_name}",
+ {"complete": True, "workflows":
[]})
+ stats["wf_no_workflows"] += 1
+ else:
+ # Fetch all YAML concurrently
+ tasks = []
+ wf_names = []
+ for wf_file in yaml_files:
+ wf_name = wf_file.get("name", "unknown")
+ wf_names.append(wf_name)
+ dl_url = wf_file.get("download_url")
+ if dl_url:
+ tasks.append(fetch_single_yaml(repo_name,
wf_name, dl_url))
+
+ results = await asyncio.gather(*tasks,
return_exceptions=True)
+ for r in results:
+ if isinstance(r, Exception):
+ stats["errors"] += 1
+ else:
+ success, was_cached = r
+ if success:
+ if was_cached:
+ stats["wf_yaml_existed"] += 1
+ else:
+ stats["wf_yaml_cached"] += 1
+
+ workflow_cache.set(f"__prefetch__:{repo_name}",
+ {"complete": True, "workflows":
wf_names})
+ stats["wf_fetched"] += 1
+
+ # ---- Composite actions ----
+ if has_composites(repo_name):
+ stats["ca_skipped"] += 1
+ else:
+ resp = await github_get(
+
f"{GITHUB_API}/repos/{owner}/{repo_name}/git/trees/HEAD?recursive=1")
+
+ composite_names = []
+ if resp and resp.status_code == 200:
+ try:
+ tree = resp.json().get("tree", [])
+ action_files = [
+ item["path"] for item in tree
+ if item.get("path",
"").startswith(".github/actions/")
+ and item.get("path", "").endswith(("/action.yml",
"/action.yaml"))
+ and item.get("type") == "blob"
+ ]
+
+ for action_path in action_files:
+ action_name =
action_path.replace(".github/actions/", "").rsplit("/", 1)[0]
+ short_path =
f".github/actions/{action_name}/action.yml"
+ cache_key = f"{repo_name}/{short_path}"
+
+ # Already cached?
+ if cache_key in all_cached_keys:
+ composite_names.append(short_path)
+ continue
+
+ # Fetch it
+ async with api_sem:
+ aresp = await github_get(
+
f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/{action_path}")
+ if aresp and aresp.status_code == 200:
+ try:
+ dl_url =
aresp.json().get("download_url")
+ if dl_url:
+ dl_resp = await http_client.get(
+ dl_url, follow_redirects=True,
timeout=10.0)
+ if dl_resp.status_code == 200:
+ workflow_cache.set(cache_key,
dl_resp.text)
+ all_cached_keys.add(cache_key)
+
composite_names.append(short_path)
+ stats["ca_total"] += 1
+ except Exception:
+ pass
+
+ except Exception as e:
+ print(f" Error scanning tree for {repo_name}:
{str(e)[:80]}", flush=True)
+
+ if composite_names:
+ stats["ca_repos_with"] += 1
+
+ workflow_cache.set(f"__composites__:{repo_name}", {
+ "complete": True,
+ "actions": composite_names,
+ })
+ stats["ca_fetched"] += 1
+
+ print(f"\n{'=' * 60}", flush=True)
+ print(f"Prefetch complete!", flush=True)
+ print(f" Repos processed: {stats['repos']}", flush=True)
+ print(f" Workflows:", flush=True)
+ print(f" Skipped (already done): {stats['wf_skipped']}", flush=True)
+ print(f" Newly fetched: {stats['wf_fetched']}", flush=True)
+ print(f" No workflows: {stats['wf_no_workflows']}", flush=True)
+ print(f" YAML files cached: {stats['wf_yaml_cached']} (already
existed: {stats['wf_yaml_existed']})", flush=True)
+ print(f" Composite actions:", flush=True)
+ print(f" Skipped (already done): {stats['ca_skipped']}", flush=True)
+ print(f" Repos scanned: {stats['ca_fetched']}", flush=True)
+ print(f" Repos with composites: {stats['ca_repos_with']}",
flush=True)
+ print(f" Action files cached: {stats['ca_total']}", flush=True)
+ print(f" Errors: {stats['errors']}", flush=True)
+ print(f"{'=' * 60}\n", flush=True)
+
+ return {"outputText": json.dumps(stats, indent=2)}
+
+ finally:
+ await http_client.aclose()
\ No newline at end of file
diff --git a/repos/apache/github-review/agents/publishing.py
b/repos/apache/github-review/agents/publishing.py
index 662f9ed..c9ee349 100644
--- a/repos/apache/github-review/agents/publishing.py
+++ b/repos/apache/github-review/agents/publishing.py
@@ -407,19 +407,24 @@ async def run(input_dict, tools):
print(f"\nStarting workflow scan of {len(repo_names)} repos...\n",
flush=True)
- # ===== STEP 2: Fetch workflows and classify =====
+ # ===== STEP 2: Fetch workflows and classify (parallel) =====
all_results = {}
stats = {"repos_scanned": 0, "repos_with_workflows": 0,
"total_workflows": 0,
"total_classified": 0, "cache_hits": 0, "errors": []}
+ # Phase 1: Collect work items (sequential — checks caches, minimal API)
+ pending_llm = [] # [(repo_name, wf_name, yaml_content)]
+ repo_wf_names = {} # repo -> [wf_names] for setting meta after
classification
+
for repo_idx, repo_name in enumerate(repo_names):
stats["repos_scanned"] += 1
- if (repo_idx + 1) % 25 == 0 or repo_idx == 0:
- print(f"[{repo_idx + 1}/{len(repo_names)}] Scanning
{repo_name}... "
- f"({stats['total_workflows']} wfs,
{stats['total_classified']} classified, "
- f"{stats['cache_hits']} cached)", flush=True)
+ if (repo_idx + 1) % 100 == 0 or repo_idx == 0:
+ print(f"[{repo_idx + 1}/{len(repo_names)}] Collecting
{repo_name}... "
+ f"({stats['total_workflows']} wfs, {stats['cache_hits']}
cached, "
+ f"{len(pending_llm)} pending LLM)", flush=True)
+ # Already fully classified?
meta_key = f"__meta__:{repo_name}"
cached_meta = classification_cache.get(meta_key)
@@ -439,99 +444,151 @@ async def run(input_dict, tools):
stats["total_classified"] += len(repo_results)
continue
- resp = await
github_get(f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/.github/workflows")
+ # Check prefetch cache first (from prefetch agent)
+ prefetch_meta =
workflow_content_cache.get(f"__prefetch__:{repo_name}")
+ wf_names_list = None
+ if prefetch_meta and prefetch_meta.get("complete"):
+ wf_names_list = prefetch_meta.get("workflows", [])
+ if not wf_names_list:
+ # Prefetch confirmed no workflows
+ classification_cache.set(meta_key, {"complete": True,
"workflows": []})
+ continue
- if resp is None:
- stats["errors"].append(f"{owner}/{repo_name}: network error")
- continue
- if resp.status_code == 404:
- classification_cache.set(meta_key, {"complete": True,
"workflows": []})
- continue
- if resp.status_code != 200:
- stats["errors"].append(f"{owner}/{repo_name}: HTTP
{resp.status_code}")
- continue
+ # If no prefetch data, fetch from GitHub API
+ if wf_names_list is None:
+ resp = await
github_get(f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/.github/workflows")
- try:
- dir_listing = resp.json()
- except Exception:
- classification_cache.set(meta_key, {"complete": True,
"workflows": []})
- continue
+ if resp is None:
+ stats["errors"].append(f"{owner}/{repo_name}: network
error")
+ continue
+ if resp.status_code == 404:
+ classification_cache.set(meta_key, {"complete": True,
"workflows": []})
+ continue
+ if resp.status_code != 200:
+ stats["errors"].append(f"{owner}/{repo_name}: HTTP
{resp.status_code}")
+ continue
- if not isinstance(dir_listing, list):
- classification_cache.set(meta_key, {"complete": True,
"workflows": []})
- continue
+ try:
+ dir_listing = resp.json()
+ except Exception:
+ classification_cache.set(meta_key, {"complete": True,
"workflows": []})
+ continue
- yaml_files = [f for f in dir_listing
- if isinstance(f, dict) and f.get("name",
"").endswith((".yml", ".yaml"))]
+ if not isinstance(dir_listing, list):
+ classification_cache.set(meta_key, {"complete": True,
"workflows": []})
+ continue
- if not yaml_files:
- classification_cache.set(meta_key, {"complete": True,
"workflows": []})
- continue
+ yaml_files = [f for f in dir_listing
+ if isinstance(f, dict) and f.get("name",
"").endswith((".yml", ".yaml"))]
+ wf_names_list = [f.get("name", "unknown") for f in yaml_files]
- stats["repos_with_workflows"] += 1
- repo_results = []
- workflow_names = []
+ if not wf_names_list:
+ classification_cache.set(meta_key, {"complete": True,
"workflows": []})
+ continue
- for wf_file in yaml_files:
- wf_name = wf_file.get("name", "unknown")
- workflow_names.append(wf_name)
- stats["total_workflows"] += 1
+ # Fetch YAML for any not already in content cache
+ for wf_file in yaml_files:
+ wf_name = wf_file.get("name", "unknown")
+ cache_key = f"{repo_name}/{wf_name}"
+ if not workflow_content_cache.get(cache_key):
+ raw_url = wf_file.get("download_url")
+ if raw_url:
+ try:
+ content_resp = await http_client.get(raw_url,
follow_redirects=True, timeout=30.0)
+ if content_resp.status_code == 200:
+ workflow_content_cache.set(cache_key,
content_resp.text)
+ except Exception:
+ pass
- wf_cache_key = f"{repo_name}:{wf_name}"
- cached_cls = classification_cache.get(wf_cache_key)
+ stats["repos_with_workflows"] += 1
+ stats["total_workflows"] += len(wf_names_list)
+ repo_wf_names[repo_name] = wf_names_list
+
+ # Check each workflow: cached classification or needs LLM?
+ for wf_name in wf_names_list:
+ cached_cls = classification_cache.get(f"{repo_name}:{wf_name}")
if cached_cls:
- repo_results.append(cached_cls)
- stats["total_classified"] += 1
+ all_results.setdefault(repo_name, []).append(cached_cls)
stats["cache_hits"] += 1
+ stats["total_classified"] += 1
continue
- raw_url = wf_file.get("download_url")
- yaml_content = None
- if raw_url:
- try:
- content_resp = await http_client.get(raw_url,
follow_redirects=True, timeout=30.0)
- if content_resp.status_code == 200:
- yaml_content = content_resp.text
- except Exception:
- pass
-
+ # Get YAML from content cache
+ yaml_content =
workflow_content_cache.get(f"{repo_name}/{wf_name}")
if yaml_content is None:
- repo_results.append({"file": wf_name, "error": "Could not
fetch", "publishes_to_registry": None})
+ all_results.setdefault(repo_name, []).append(
+ {"file": wf_name, "error": "Could not fetch",
"publishes_to_registry": None})
continue
- workflow_content_cache.set(f"{repo_name}/{wf_name}",
yaml_content)
- yaml_content = truncate_yaml(yaml_content)
+ pending_llm.append((repo_name, wf_name, yaml_content))
- llm_response = None
- try:
- messages = [{"role": "user", "content": (
- f"{CLASSIFICATION_PROMPT}\n\n---\n"
- f"File:
{owner}/{repo_name}/.github/workflows/{wf_name}\n---\n\n{yaml_content}"
- )}]
-
- llm_response, _ = await call_llm(
- provider=provider, model=model, messages=messages,
- parameters=configured_params, user_service=None,
user_id=None)
-
- classification = parse_classification(llm_response)
- classification["file"] = wf_name
- repo_results.append(classification)
- classification_cache.set(wf_cache_key, classification)
- stats["total_classified"] += 1
+ print(f"\nCollection complete: {stats['repos_scanned']} repos, "
+ f"{stats['total_workflows']} workflows, {stats['cache_hits']}
cached, "
+ f"{len(pending_llm)} need LLM classification\n", flush=True)
- except json.JSONDecodeError:
- repo_results.append({"file": wf_name, "error": "JSON parse
error",
- "raw_response": (llm_response or
"")[:300], "publishes_to_registry": None})
-
stats["errors"].append(f"{owner}/{repo_name}/.github/workflows/{wf_name}: JSON
parse error")
- except Exception as e:
- repo_results.append({"file": wf_name, "error":
str(e)[:200], "publishes_to_registry": None})
-
stats["errors"].append(f"{owner}/{repo_name}/.github/workflows/{wf_name}:
{str(e)[:80]}")
+ # Phase 2: Parallel LLM classification
+ if pending_llm:
+ llm_sem = asyncio.Semaphore(5)
+ completed = {"count": 0}
- await asyncio.sleep(0.3)
+ async def classify_workflow(repo_name, wf_name, yaml_content):
+ yaml_truncated = truncate_yaml(yaml_content)
+ messages = [{"role": "user", "content": (
+ f"{CLASSIFICATION_PROMPT}\n\n---\n"
+ f"File:
{owner}/{repo_name}/.github/workflows/{wf_name}\n---\n\n{yaml_truncated}"
+ )}]
+
+ async with llm_sem:
+ llm_response = None
+ try:
+ llm_response, _ = await call_llm(
+ provider=provider, model=model, messages=messages,
+ parameters=configured_params, user_service=None,
user_id=None)
+
+ classification = parse_classification(llm_response)
+ classification["file"] = wf_name
+ classification_cache.set(f"{repo_name}:{wf_name}",
classification)
+
+ completed["count"] += 1
+ if completed["count"] % 25 == 0:
+ print(f" Classified
{completed['count']}/{len(pending_llm)}...", flush=True)
+
+ return repo_name, wf_name, classification, None
+
+ except json.JSONDecodeError:
+ error_result = {"file": wf_name, "error": "JSON parse
error",
+ "raw_response": (llm_response or
"")[:300],
+ "publishes_to_registry": None}
+ return repo_name, wf_name, error_result, \
+ f"{owner}/{repo_name}/.github/workflows/{wf_name}:
JSON parse error"
+
+ except Exception as e:
+ error_result = {"file": wf_name, "error": str(e)[:200],
+ "publishes_to_registry": None}
+ return repo_name, wf_name, error_result, \
+ f"{owner}/{repo_name}/.github/workflows/{wf_name}:
{str(e)[:80]}"
+
+ print(f"Starting parallel classification of {len(pending_llm)}
workflows "
+ f"(concurrency=5)...\n", flush=True)
+
+ tasks = [classify_workflow(r, w, y) for r, w, y in pending_llm]
+ results = await asyncio.gather(*tasks, return_exceptions=True)
+
+ for result in results:
+ if isinstance(result, Exception):
+ stats["errors"].append(f"Unexpected error:
{str(result)[:100]}")
+ continue
+
+ repo_name, wf_name, classification, error_msg = result
+ all_results.setdefault(repo_name, []).append(classification)
+ stats["total_classified"] += 1
+ if error_msg:
+ stats["errors"].append(error_msg)
- if repo_results:
- all_results[repo_name] = repo_results
- classification_cache.set(meta_key, {"complete": True, "workflows":
workflow_names})
+ # Phase 3: Set meta keys for all processed repos
+ for repo_name, wf_names in repo_wf_names.items():
+ classification_cache.set(f"__meta__:{repo_name}", {
+ "complete": True, "workflows": wf_names})
print(f"\n{'=' * 60}", flush=True)
print(f"Scan complete! {stats['repos_scanned']} repos,
{stats['total_classified']} classified "
diff --git a/repos/apache/github-review/agents/security.py
b/repos/apache/github-review/agents/security.py
index 71ab0bb..fedbe4e 100644
--- a/repos/apache/github-review/agents/security.py
+++ b/repos/apache/github-review/agents/security.py
@@ -487,111 +487,120 @@ async def run(input_dict, tools):
"detail": "No dependabot.yml or renovate.json found.",
})
- # Check 9: Composite actions via recursive Git Trees API
- # One API call gets the entire tree, handles any nesting depth
+ # Check 9: Composite actions — read from prefetch cache or fall
back to GitHub
composite_findings = []
composite_analyzed = 0
composite_total = 0
- resp = await github_get(
-
f"{GITHUB_API}/repos/{owner}/{repo_name}/git/trees/HEAD?recursive=1")
- if resp and resp.status_code == 200:
- try:
- tree = resp.json().get("tree", [])
- action_files = [
- item["path"] for item in tree
- if item.get("path", "").startswith(".github/actions/")
- and item.get("path", "").endswith(("/action.yml",
"/action.yaml"))
- and item.get("type") == "blob"
- ]
- composite_total = len(action_files)
-
- for action_path in action_files:
- # Extract action name:
.github/actions/build/rust/action.yml -> build/rust
- action_name = action_path.replace(".github/actions/",
"").rsplit("/", 1)[0]
-
- # Fetch the action.yml content
- aresp = await github_get(
-
f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/{action_path}")
- if not aresp or aresp.status_code != 200:
- continue
-
- try:
- dl_url = aresp.json().get("download_url")
- if not dl_url:
+ # Collect (action_name, short_path, action_content) tuples
+ composite_items = []
+
+ composites_meta = workflow_ns.get(f"__composites__:{repo_name}")
+ if composites_meta and composites_meta.get("complete"):
+ # Read from prefetch cache
+ cached_actions = composites_meta.get("actions", [])
+ for short_path in cached_actions:
+ action_content =
workflow_ns.get(f"{repo_name}/{short_path}")
+ if action_content:
+ action_name = short_path.replace(".github/actions/",
"").rsplit("/", 1)[0]
+ composite_items.append((action_name, short_path,
action_content))
+ else:
+ # Fall back to GitHub API
+ resp = await github_get(
+
f"{GITHUB_API}/repos/{owner}/{repo_name}/git/trees/HEAD?recursive=1")
+ if resp and resp.status_code == 200:
+ try:
+ tree = resp.json().get("tree", [])
+ action_files = [
+ item["path"] for item in tree
+ if item.get("path",
"").startswith(".github/actions/")
+ and item.get("path", "").endswith(("/action.yml",
"/action.yaml"))
+ and item.get("type") == "blob"
+ ]
+ for action_path in action_files:
+ action_name =
action_path.replace(".github/actions/", "").rsplit("/", 1)[0]
+ aresp = await github_get(
+
f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/{action_path}")
+ if not aresp or aresp.status_code != 200:
continue
- dl_resp = await http_client.get(dl_url,
follow_redirects=True, timeout=10.0)
- if dl_resp.status_code != 200:
+ try:
+ dl_url = aresp.json().get("download_url")
+ if not dl_url:
+ continue
+ dl_resp = await http_client.get(dl_url,
follow_redirects=True, timeout=10.0)
+ if dl_resp.status_code != 200:
+ continue
+ action_content = dl_resp.text
+ short_path =
f".github/actions/{action_name}/action.yml"
+ workflow_ns.set(f"{repo_name}/{short_path}",
action_content)
+ composite_items.append((action_name,
short_path, action_content))
+ except Exception:
continue
- action_content = dl_resp.text
- except Exception:
- continue
+ except Exception as e:
+ print(f" Error scanning composite actions for
{repo_name}: {str(e)[:100]}", flush=True)
- composite_analyzed += 1
- short_path =
f".github/actions/{action_name}/action.yml"
-
- # Store for other agents
- workflow_ns.set(f"{repo_name}/{short_path}",
action_content)
-
- # Run injection checks
- context = f"composite action
.github/actions/{action_name}"
- injections =
find_injection_in_run_blocks(action_content, context_label=context)
- for sev, detail in injections:
- composite_findings.append({
- "check": "composite_action_injection",
- "severity": sev,
- "file": short_path,
- "detail": detail,
- })
-
- # Check unpinned actions inside composite
- ca_refs = extract_action_refs(action_content)
- for ref in ca_refs:
- parsed = parse_action_ref(ref)
- if parsed["type"] == "remote" and not
parsed["pinned"]:
- composite_findings.append({
- "check": "composite_action_unpinned",
- "severity": "MEDIUM",
- "file": short_path,
- "detail": (f"Composite action uses
unpinned action `{parsed['raw']}`. "
- "Supply chain risk."),
- })
+ composite_total = len(composite_items)
- # Check inputs.* directly in run blocks (hidden
injection)
- has_input_injection = False
- in_run = False
- run_indent = 0
- for cline in action_content.split("\n"):
- cs = cline.strip()
- if cs.startswith("run:"):
- in_run = True
- run_indent = len(cline) - len(cline.lstrip())
- rest = cs[4:].strip()
- if rest.startswith("|") or
rest.startswith(">"):
- continue
- if "inputs." in rest and "${{" in rest:
- has_input_injection = True
- break
- elif in_run:
- ci = len(cline) - len(cline.lstrip())
- if cs and ci <= run_indent:
- in_run = False
- elif "inputs." in cline and "${{" in cline:
- has_input_injection = True
- break
-
- if has_input_injection:
- composite_findings.append({
- "check": "composite_action_input_injection",
- "severity": "HIGH",
- "file": short_path,
- "detail": (f"Composite action `{action_name}`
directly interpolates "
- "`inputs.*` in run block. Callers
may pass untrusted values — "
- "the injection is hidden from
workflow-level analysis."),
- })
-
- except Exception as e:
- print(f" Error scanning composite actions for
{repo_name}: {str(e)[:100]}", flush=True)
+ # Analyze each composite action
+ for action_name, short_path, action_content in composite_items:
+ composite_analyzed += 1
+
+ # Run injection checks
+ context = f"composite action .github/actions/{action_name}"
+ injections = find_injection_in_run_blocks(action_content,
context_label=context)
+ for sev, detail in injections:
+ composite_findings.append({
+ "check": "composite_action_injection",
+ "severity": sev,
+ "file": short_path,
+ "detail": detail,
+ })
+
+ # Check unpinned actions inside composite
+ ca_refs = extract_action_refs(action_content)
+ for ref in ca_refs:
+ parsed = parse_action_ref(ref)
+ if parsed["type"] == "remote" and not parsed["pinned"]:
+ composite_findings.append({
+ "check": "composite_action_unpinned",
+ "severity": "MEDIUM",
+ "file": short_path,
+ "detail": (f"Composite action uses unpinned action
`{parsed['raw']}`. "
+ "Supply chain risk."),
+ })
+
+ # Check inputs.* directly in run blocks (hidden injection)
+ has_input_injection = False
+ in_run = False
+ run_indent = 0
+ for cline in action_content.split("\n"):
+ cs = cline.strip()
+ if cs.startswith("run:"):
+ in_run = True
+ run_indent = len(cline) - len(cline.lstrip())
+ rest = cs[4:].strip()
+ if rest.startswith("|") or rest.startswith(">"):
+ continue
+ if "inputs." in rest and "${{" in rest:
+ has_input_injection = True
+ break
+ elif in_run:
+ ci = len(cline) - len(cline.lstrip())
+ if cs and ci <= run_indent:
+ in_run = False
+ elif "inputs." in cline and "${{" in cline:
+ has_input_injection = True
+ break
+
+ if has_input_injection:
+ composite_findings.append({
+ "check": "composite_action_input_injection",
+ "severity": "HIGH",
+ "file": short_path,
+ "detail": (f"Composite action `{action_name}` directly
interpolates "
+ "`inputs.*` in run block. Callers may pass
untrusted values — "
+ "the injection is hidden from
workflow-level analysis."),
+ })
# Deduplicate composite findings per file before adding
composite_findings = deduplicate_findings(composite_findings)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]