This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 23233760c43 Improve auto-triage display labels and log extraction
(#63687)
23233760c43 is described below
commit 23233760c436b1477fd9569a0a72fe35041449dd
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Mar 16 22:01:45 2026 +0100
Improve auto-triage display labels and log extraction (#63687)
* Improve auto-triage display labels and log extraction
- Replace "flagged"/"candidate" with "issues found" in all user-facing
tables and status messages for clearer triage output
- Show per-action result (drafted/commented/closed/skipped) in summary
table instead of generic "flagged"
- Extract log tail (last ~1000 lines) to reduce memory usage on large
CI log files
- Focus error extraction on last error cluster for more relevant output
* Fix mypy errors in pr_commands
- Fix return type of _get_cached_status to Any (payload can be dict or list)
- Annotate renderables list to accept both str and Text objects
---
.../src/airflow_breeze/commands/pr_commands.py | 250 ++++++++++++++++-----
1 file changed, 194 insertions(+), 56 deletions(-)
diff --git a/dev/breeze/src/airflow_breeze/commands/pr_commands.py
b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
index ecf98c185e6..945a38d7db7 100644
--- a/dev/breeze/src/airflow_breeze/commands/pr_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
@@ -25,7 +25,7 @@ from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import dataclass, field
from pathlib import Path
from threading import Thread
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
import click
from rich.panel import Panel
@@ -135,6 +135,83 @@ def _save_assessment_cache(github_repository: str,
pr_number: int, head_sha: str
cache_file.write_text(json.dumps({"head_sha": head_sha, "assessment":
assessment}, indent=2))
+_STATUS_CACHE_TTL_SECONDS = 4 * 3600 # 4 hours
+
+
+def _get_status_cache_dir(github_repository: str) -> Path:
+ """Return the directory for storing cached PR/main status results."""
+ from airflow_breeze.utils.path_utils import BUILD_CACHE_PATH
+
+ safe_name = github_repository.replace("/", "_")
+ cache_dir = Path(BUILD_CACHE_PATH) / "status_cache" / safe_name
+ cache_dir.mkdir(parents=True, exist_ok=True)
+ return cache_dir
+
+
+def _get_cached_status(github_repository: str, cache_key: str) -> Any:
+ """Load a cached status result if it exists and is within the TTL.
+
+ Uses wall-clock time (``time.time()``) because this cache persists across
process restarts.
+ """
+ cache_file = _get_status_cache_dir(github_repository) / f"{cache_key}.json"
+ if not cache_file.exists():
+ return None
+ try:
+ data = json.loads(cache_file.read_text())
+ cached_at = data.get("cached_at", 0)
+ if time.time() - cached_at < _STATUS_CACHE_TTL_SECONDS:
+ return data.get("payload")
+ return None
+ except (json.JSONDecodeError, KeyError, OSError):
+ return None
+
+
+def _save_status_cache(github_repository: str, cache_key: str, payload: dict |
list) -> None:
+ """Save a status result to the cache with a wall-clock timestamp."""
+ cache_file = _get_status_cache_dir(github_repository) / f"{cache_key}.json"
+ cache_file.write_text(json.dumps({"cached_at": time.time(), "payload":
payload}))
+
+
+def _cached_fetch_recent_pr_failures(
+ token: str, github_repository: str, *, branch: str = "main", hours: int =
4, max_prs: int = 10
+) -> RecentPRFailureInfo:
+ """Return cached recent-PR failure info, fetching fresh data when the
cache expires."""
+ cache_key = f"recent_pr_failures_{branch}"
+ cached = _get_cached_status(github_repository, cache_key)
+ if cached is not None:
+ get_console().print("[dim]Using cached recent-PR failure data (expires
after 4 h).[/]")
+ return RecentPRFailureInfo(
+ failing_checks=cached["failing_checks"],
+ failing_check_names=set(cached["failing_check_names"]),
+ prs_examined=cached["prs_examined"],
+ )
+ result = _fetch_recent_pr_failures(token, github_repository,
branch=branch, hours=hours, max_prs=max_prs)
+ _save_status_cache(
+ github_repository,
+ cache_key,
+ {
+ "failing_checks": result.failing_checks,
+ "failing_check_names": list(result.failing_check_names),
+ "prs_examined": result.prs_examined,
+ },
+ )
+ return result
+
+
+def _cached_fetch_main_canary_builds(
+ token: str, github_repository: str, *, branch: str = "main", count: int = 4
+) -> list[dict]:
+ """Return cached canary build data, fetching fresh data when the cache
expires."""
+ cache_key = f"canary_builds_{branch}"
+ cached = _get_cached_status(github_repository, cache_key)
+ if cached is not None:
+ get_console().print("[dim]Using cached canary build data (expires
after 4 h).[/]")
+ return cached
+ result = _fetch_main_canary_builds(token, github_repository,
branch=branch, count=count)
+ _save_status_cache(github_repository, cache_key, result)
+ return result
+
+
def _cached_assess_pr(
github_repository: str,
head_sha: str,
@@ -490,6 +567,14 @@ class RecentPRFailureInfo:
return matched
+@dataclass
+class LogSnippetInfo:
+ """Log snippet from a failed CI check, with a link to the full log."""
+
+ snippet: str
+ job_url: str # html_url of the failed job (for clickable link)
+
+
@dataclass
class PRData:
"""PR data fetched from GraphQL."""
@@ -2453,20 +2538,37 @@ def _display_pr_panel(pr: PRData, author_profile: dict
| None, assessment):
_display_unresolved_threads_panel(pr)
-def _display_log_snippets_panel(log_snippets: dict[str, str]) -> None:
+def _display_log_snippets_panel(log_snippets: dict[str, LogSnippetInfo], pr:
PRData | None = None) -> None:
"""Display Rich panel(s) with log snippets from failed CI checks."""
+ from rich.console import Group
+ from rich.text import Text
+
console = get_console()
- for check_name, snippet in log_snippets.items():
+ for check_name, info in log_snippets.items():
# Truncate very long snippets for display
- display_snippet = snippet
+ display_snippet = info.snippet
if len(display_snippet) > 2000:
display_snippet = display_snippet[:2000] + "\n... (truncated)"
+
+ # Build content: clickable links header + pre-formatted log text
+ renderables: list[Any] = []
+ link_parts: list[str] = []
+ if pr:
+ link_parts.append(f"PR: [link={pr.url}]#{pr.number}[/link]")
+ if info.job_url:
+ link_parts.append(f"Log:
[link={info.job_url}]{info.job_url}[/link]")
+ if link_parts:
+ renderables.append(" ".join(link_parts))
+
+ # Use Text object to preserve exact whitespace/indentation in log
output
+ renderables.append(Text(display_snippet))
+
console.print(
Panel(
- display_snippet,
+ Group(*renderables),
title=f"Failed check logs: {check_name}",
border_style="red",
- expand=False,
+ expand=True,
)
)
@@ -2476,13 +2578,13 @@ def _launch_background_log_fetching(
github_repository: str,
prs: list,
llm_concurrency: int,
-) -> dict[int, Future[dict[str, str]]]:
+) -> dict[int, Future[dict[str, LogSnippetInfo]]]:
"""Launch background CI log fetching for all PRs with failed checks.
- Returns a dict mapping PR number -> Future[dict[str, str]].
+ Returns a dict mapping PR number -> Future[dict[str, LogSnippetInfo]].
Uses the same concurrency level as LLM assessments.
"""
- log_futures: dict[int, Future[dict[str, str]]] = {}
+ log_futures: dict[int, Future[dict[str, LogSnippetInfo]]] = {}
prs_with_failures = [pr for pr in prs if pr.failed_checks and pr.head_sha]
if not prs_with_failures:
return log_futures
@@ -2616,7 +2718,7 @@ def _llm_progress_status(completed: int, total: int,
flagged: int, errors: int)
remaining = total - completed
parts = [f"{completed}/{total} done"]
if flagged:
- parts.append(f"{flagged} flagged")
+ parts.append(f"{flagged} issues found")
if errors:
parts.append(f"{errors} errors")
if remaining:
@@ -2703,8 +2805,8 @@ class TriageContext:
llm_passing: list
# Main branch failure info (optional)
main_failures: RecentPRFailureInfo | None = None
- # Background CI log fetching: PR number -> Future[dict[str, str]]
- log_futures: dict[int, Future[dict[str, str]]] =
field(default_factory=dict)
+ # Background CI log fetching: PR number -> Future[dict[str,
LogSnippetInfo]]
+ log_futures: dict[int, Future[dict[str, LogSnippetInfo]]] =
field(default_factory=dict)
def collect_llm_progress(self) -> None:
"""Collect completed LLM results and print progress status."""
@@ -2944,7 +3046,7 @@ def _prompt_and_execute_flagged_pr(
if log_future.done():
log_snippets = log_future.result()
if log_snippets:
- _display_log_snippets_panel(log_snippets)
+ _display_log_snippets_panel(log_snippets, pr=pr)
else:
# Logs are still being fetched — offer user to wait or cancel
from airflow_breeze.utils.confirm import _read_char
@@ -2965,7 +3067,7 @@ def _prompt_and_execute_flagged_pr(
try:
log_snippets = log_future.result(timeout=120)
if log_snippets:
- _display_log_snippets_panel(log_snippets)
+ _display_log_snippets_panel(log_snippets, pr=pr)
except TimeoutError:
get_console().print(" [warning]CI log retrieval timed
out.[/]")
except Exception:
@@ -2978,7 +3080,7 @@ def _prompt_and_execute_flagged_pr(
ctx.token, ctx.github_repository, pr.head_sha, pr.failed_checks
)
if log_snippets:
- _display_log_snippets_panel(log_snippets)
+ _display_log_snippets_panel(log_snippets, pr=pr)
# Check if PR failures match main branch failures
main_matching: list[str] = []
@@ -3038,7 +3140,7 @@ def _prompt_and_execute_flagged_pr(
# downgrade it from "report" to regular "flagged" — user has reviewed and
decided.
if action != TriageAction.SKIP and getattr(assessment, "should_report",
False):
assessment.should_report = False
- console_print(" [info]Report status cleared — PR marked as
flagged.[/]")
+ console_print(" [info]Report status cleared — PR marked as issues
found.[/]")
# For actions that post comments, let the user select violations and
preview the comment
draft_comment = ""
@@ -3289,7 +3391,7 @@ def _enrich_candidate_details(
console_print(
f"[info]Fetching check details for {len(candidate_prs)} "
- f"candidate {'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
+ f"{'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
)
_fetch_check_details_batch(token, github_repository, candidate_prs)
@@ -3320,7 +3422,7 @@ def _enrich_candidate_details(
if run_api:
console_print(
f"[info]Fetching review thread details for {len(candidate_prs)} "
- f"candidate {'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
+ f"{'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
)
_fetch_unresolved_comments_batch(token, github_repository,
candidate_prs)
@@ -3364,8 +3466,8 @@ def _review_workflow_approval_prs(ctx: TriageContext,
pending_approval: list[PRD
author_count = ctx.author_flagged_count.get(pr.author_login, 0)
if author_count > 3:
console_print(
- f" [bold red]Author {pr.author_login} has {author_count}
flagged "
- f"{'PRs' if author_count != 1 else 'PR'} "
+ f" [bold red]Author {pr.author_login} has {author_count} "
+ f"{'PRs' if author_count != 1 else 'PR'} with issues "
f"— suggesting close instead of workflow approval.[/]"
)
close_comment = _build_close_comment(
@@ -3779,8 +3881,8 @@ def _review_deterministic_flagged_prs(
return
console_print(
- f"\n[info]Reviewing {len(det_flagged_prs)} deterministically flagged "
- f"{'PRs' if len(det_flagged_prs) != 1 else 'PR'}"
+ f"\n[info]Reviewing {len(det_flagged_prs)} {'PRs' if
len(det_flagged_prs) != 1 else 'PR'} "
+ f"with issues found"
f"{' (LLM assessments running in background)' if ctx.llm_future_to_pr
else ''}...[/]\n"
)
@@ -3796,7 +3898,7 @@ def _review_deterministic_flagged_prs(
count = ctx.author_flagged_count[current_author]
console_print()
get_console().rule(
- f"[bold]Author: {current_author}[/] ({count} flagged PR{'s' if
count != 1 else ''})",
+ f"[bold]Author: {current_author}[/] ({count} PR{'s' if count
!= 1 else ''} with issues)",
style="cyan",
)
@@ -3835,8 +3937,8 @@ def _review_llm_flagged_prs(ctx: TriageContext,
llm_candidates: list[PRData]) ->
if remaining:
status_parts.append(f"{remaining} still running")
console_print(
- f"\n[info]{len(new_flagged)} new LLM-flagged "
- f"{'PRs' if len(new_flagged) != 1 else 'PR'} ready for review "
+ f"\n[info]{len(new_flagged)} new "
+ f"{'PRs' if len(new_flagged) != 1 else 'PR'} with LLM issues
found, ready for review "
f"({', '.join(status_parts)}):[/]\n"
)
@@ -3874,7 +3976,7 @@ def _review_llm_flagged_prs(ctx: TriageContext,
llm_candidates: list[PRData]) ->
)
console_print(
- f"\n[info]LLM assessment complete: {len(ctx.llm_assessments)} flagged,
"
+ f"\n[info]LLM assessment complete: {len(ctx.llm_assessments)} issues
found, "
f"{len(ctx.llm_passing)} passed, {len(ctx.llm_errors)} errors "
f"(out of {len(ctx.llm_future_to_pr)} assessed).[/]\n"
)
@@ -4855,8 +4957,8 @@ def _display_triage_summary(
console_print(
f"\n[info]Assessment complete: {total_flagged} {'PRs' if total_flagged
!= 1 else 'PR'} "
- f"flagged ({total_deterministic_flags} CI/conflicts/comments, "
- f"{total_llm_flagged} LLM-flagged"
+ f"with issues found ({total_deterministic_flags}
CI/conflicts/comments, "
+ f"{total_llm_flagged} LLM"
f"{f', {total_llm_errors} LLM errors' if total_llm_errors else ''}"
f"{f', {len(pending_approval)} awaiting workflow approval' if
pending_approval else ''}"
f"{f', {len(workflows_in_progress)} workflows in progress' if
workflows_in_progress else ''}"
@@ -4881,12 +4983,12 @@ def _display_triage_summary(
summary_table.add_row(" Commented (no response)",
str(triaged_waiting_count))
summary_table.add_row(" Triaged (author responded)",
str(triaged_responded_count))
summary_table.add_row("PRs assessed", str(len(candidate_prs)))
- summary_table.add_row("Flagged by CI/conflicts/comments",
str(total_deterministic_flags))
- summary_table.add_row("Flagged by LLM", str(total_llm_flagged))
+ summary_table.add_row("Issues found by CI/conflicts/comments",
str(total_deterministic_flags))
+ summary_table.add_row("Issues found by LLM", str(total_llm_flagged))
if total_llm_report:
summary_table.add_row("[red]Potentially flagged for report[/red]",
f"[red]{total_llm_report}[/red]")
summary_table.add_row("LLM errors (skipped)", str(total_llm_errors))
- summary_table.add_row("Total flagged", str(total_flagged))
+ summary_table.add_row("Total issues found", str(total_flagged))
summary_table.add_row("PRs passing all checks", str(len(passing_prs)))
summary_table.add_row("Drafts with issues (skipped)",
str(len(skipped_drafts)))
summary_table.add_row(
@@ -5118,10 +5220,10 @@ _LOG_DOWNLOAD_MAX_BYTES = 5 * 1024 * 1024 # 5 MB
def _fetch_failed_job_log_snippets(
token: str, github_repository: str, head_sha: str, failed_check_names:
list[str]
-) -> dict[str, str]:
+) -> dict[str, LogSnippetInfo]:
"""Fetch short log snippets from failed GitHub Actions jobs for a commit.
- Returns a dict mapping failed check name -> log snippet (last N lines of
the failed step).
+ Returns a dict mapping failed check name -> LogSnippetInfo (snippet + job
URL).
Only fetches logs for checks in ``failed_check_names`` to limit API calls.
"""
import io
@@ -5141,7 +5243,7 @@ def _fetch_failed_job_log_snippets(
if not failed_runs:
return {}
- snippets: dict[str, str] = {}
+ snippets: dict[str, LogSnippetInfo] = {}
headers = {"Authorization": f"Bearer {token}", "Accept":
"application/vnd.github+json"}
for run in failed_runs:
@@ -5218,7 +5320,7 @@ def _fetch_failed_job_log_snippets(
job_name = job.get("name", "")
snippet = _extract_failed_step_snippet(zf, job_name, job)
if snippet:
- snippets[check_name] = snippet
+ snippets[check_name] = LogSnippetInfo(snippet=snippet,
job_url=job.get("html_url", ""))
zf.close()
@@ -5265,34 +5367,60 @@ def _extract_failed_step_snippet(zf, job_name: str,
job: dict) -> str:
candidate_files.append(zname)
for log_file in candidate_files:
- try:
- log_content = zf.read(log_file).decode("utf-8",
errors="replace")
- except (KeyError, OSError):
+ log_tail = _read_log_tail(zf, log_file)
+ if log_tail is None:
continue
- snippet = _extract_error_lines(log_content, step_name)
+ snippet = _extract_error_lines(log_tail, step_name)
if snippet:
return snippet
# Fallback: try to find any log file for this job and extract errors
for zname in zip_names:
if job_name in zname or job_name.replace("/", "_") in zname:
- try:
- log_content = zf.read(zname).decode("utf-8", errors="replace")
- except (KeyError, OSError):
+ log_tail = _read_log_tail(zf, zname)
+ if log_tail is None:
continue
- snippet = _extract_error_lines(log_content, "")
+ snippet = _extract_error_lines(log_tail, "")
if snippet:
return snippet
return ""
+_LOG_TAIL_BYTES = 128 * 1024 # 128 KB — enough for ~1000 log lines
+_LOG_TAIL_LINES = 1000
+
+
+def _read_log_tail(zf, log_file: str) -> str | None:
+ """Read only the last ~1000 lines of a log file from a zip archive.
+
+ Returns the tail as a string, or None if the file cannot be read.
+ """
+ try:
+ raw = zf.read(log_file)
+ except (KeyError, OSError):
+ return None
+ # Only decode the tail to save memory on large log files
+ if len(raw) > _LOG_TAIL_BYTES:
+ raw = raw[-_LOG_TAIL_BYTES:]
+ # Drop the first (likely partial) line
+ nl = raw.find(b"\n")
+ if nl != -1:
+ raw = raw[nl + 1 :]
+ text = raw.decode("utf-8", errors="replace")
+ lines = text.splitlines()
+ if len(lines) > _LOG_TAIL_LINES:
+ lines = lines[-_LOG_TAIL_LINES:]
+ return "\n".join(lines)
+
+
def _extract_error_lines(log_content: str, step_name: str) -> str:
"""Extract relevant error lines from a log file.
Looks for error markers, then takes surrounding context.
Falls back to the last N lines if no error markers are found.
+ The input is expected to already be truncated to the last ~1000 lines.
"""
lines = log_content.splitlines()
if not lines:
@@ -5310,7 +5438,6 @@ def _extract_error_lines(log_content: str, step_name:
str) -> str:
"AssertionError",
"Exception:",
"Traceback (most recent call last)",
- "✗",
"❌",
]
@@ -5323,14 +5450,14 @@ def _extract_error_lines(log_content: str, step_name:
str) -> str:
error_indices.append(i)
if error_indices:
- # Take a window around the first error cluster
- first_error = error_indices[0]
+ # Take a window around the last error cluster (most relevant failure
info)
last_error = error_indices[-1]
+ first_error = error_indices[0]
- # If errors are spread across the file, focus on the first cluster
- for i in range(1, len(error_indices)):
+ # Walk backwards to find the start of the last cluster
+ for i in range(len(error_indices) - 1, 0, -1):
if error_indices[i] - error_indices[i - 1] > 20:
- last_error = error_indices[i - 1]
+ first_error = error_indices[i]
break
start = max(0, first_error - 3)
@@ -5929,6 +6056,7 @@ def auto_triage(
for label, get_dir in [
("review", _get_review_cache_dir),
("triage", _get_triage_cache_dir),
+ ("status", _get_status_cache_dir),
]:
cache_dir = get_dir(github_repository)
if cache_dir.exists():
@@ -5972,11 +6100,11 @@ def auto_triage(
# Refresh collaborators cache in the background on every run
_refresh_collaborators_cache_in_background(token, github_repository)
- # Preload main branch CI failure information
- main_failures = _fetch_recent_pr_failures(token, github_repository)
+ # Preload main branch CI failure information (cached for 4 hours)
+ main_failures = _cached_fetch_recent_pr_failures(token, github_repository)
- # Show status of recent scheduled (canary) builds on main branch
- canary_builds = _fetch_main_canary_builds(token, github_repository)
+ # Show status of recent scheduled (canary) builds on main branch (cached
for 4 hours)
+ canary_builds = _cached_fetch_main_canary_builds(token, github_repository)
_display_canary_builds_status(canary_builds, token, github_repository)
# Resolve review-requested filter: --reviews-for-me uses authenticated
user, --reviews-for uses specified users
@@ -6663,7 +6791,7 @@ def auto_triage(
accepted_prs.extend(batch_accepted)
if not candidate_prs:
- console_print("[info]No candidates in this batch.[/]")
+ console_print("[info]No PRs to assess in this batch.[/]")
_display_pr_overview_table(all_prs)
continue
@@ -6879,7 +7007,7 @@ def auto_triage(
enrich_total = t_enrich_end - t_enrich_start
timing_table.add_row(
- "Enrich candidates (checks + mergeability + comments)",
+ "Enrich PRs (checks + mergeability + comments)",
_fmt_duration(enrich_total),
str(len(candidate_prs)),
_fmt_duration(enrich_total / num_candidates),
@@ -6978,7 +7106,17 @@ def auto_triage(
if any(pr.number == pr_num for pr in skipped_drafts):
result = "[dim]draft-skipped[/]"
elif pr_num in assessments or pr_num in llm_assessments:
- result = "[red]flagged[/]"
+ action_for_result = pr_actions.get(pr_num, "")
+ if action_for_result == "drafted":
+ result = "[yellow]drafted[/]"
+ elif action_for_result == "commented":
+ result = "[yellow]commented[/]"
+ elif action_for_result == "closed":
+ result = "[red]closed[/]"
+ elif action_for_result == "skipped":
+ result = "[dim]skipped[/]"
+ else:
+ result = "[yellow]issues found[/]"
elif any(pr.number == pr_num for pr in passing_prs) or any(
pr.number == pr_num for pr in llm_passing
):