This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 23233760c43 Improve auto-triage display labels and log extraction 
(#63687)
23233760c43 is described below

commit 23233760c436b1477fd9569a0a72fe35041449dd
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Mar 16 22:01:45 2026 +0100

    Improve auto-triage display labels and log extraction (#63687)
    
    * Improve auto-triage display labels and log extraction
    
    - Replace "flagged"/"candidate" with "issues found" in all user-facing
      tables and status messages for clearer triage output
    - Show per-action result (drafted/commented/closed/skipped) in summary
      table instead of generic "flagged"
    - Extract log tail (last ~1000 lines) to reduce memory usage on large
      CI log files
    - Focus error extraction on last error cluster for more relevant output
    
    * Fix mypy errors in pr_commands
    
    - Fix return type of _get_cached_status to Any (payload can be dict or list)
    - Annotate renderables list to accept both str and Text objects
---
 .../src/airflow_breeze/commands/pr_commands.py     | 250 ++++++++++++++++-----
 1 file changed, 194 insertions(+), 56 deletions(-)

diff --git a/dev/breeze/src/airflow_breeze/commands/pr_commands.py 
b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
index ecf98c185e6..945a38d7db7 100644
--- a/dev/breeze/src/airflow_breeze/commands/pr_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
@@ -25,7 +25,7 @@ from concurrent.futures import Future, ThreadPoolExecutor
 from dataclasses import dataclass, field
 from pathlib import Path
 from threading import Thread
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 
 import click
 from rich.panel import Panel
@@ -135,6 +135,83 @@ def _save_assessment_cache(github_repository: str, 
pr_number: int, head_sha: str
     cache_file.write_text(json.dumps({"head_sha": head_sha, "assessment": 
assessment}, indent=2))
 
 
+_STATUS_CACHE_TTL_SECONDS = 4 * 3600  # 4 hours
+
+
+def _get_status_cache_dir(github_repository: str) -> Path:
+    """Return the directory for storing cached PR/main status results."""
+    from airflow_breeze.utils.path_utils import BUILD_CACHE_PATH
+
+    safe_name = github_repository.replace("/", "_")
+    cache_dir = Path(BUILD_CACHE_PATH) / "status_cache" / safe_name
+    cache_dir.mkdir(parents=True, exist_ok=True)
+    return cache_dir
+
+
+def _get_cached_status(github_repository: str, cache_key: str) -> Any:
+    """Load a cached status result if it exists and is within the TTL.
+
+    Uses wall-clock time (``time.time()``) because this cache persists across 
process restarts.
+    """
+    cache_file = _get_status_cache_dir(github_repository) / f"{cache_key}.json"
+    if not cache_file.exists():
+        return None
+    try:
+        data = json.loads(cache_file.read_text())
+        cached_at = data.get("cached_at", 0)
+        if time.time() - cached_at < _STATUS_CACHE_TTL_SECONDS:
+            return data.get("payload")
+        return None
+    except (json.JSONDecodeError, KeyError, OSError):
+        return None
+
+
+def _save_status_cache(github_repository: str, cache_key: str, payload: dict | 
list) -> None:
+    """Save a status result to the cache with a wall-clock timestamp."""
+    cache_file = _get_status_cache_dir(github_repository) / f"{cache_key}.json"
+    cache_file.write_text(json.dumps({"cached_at": time.time(), "payload": 
payload}))
+
+
+def _cached_fetch_recent_pr_failures(
+    token: str, github_repository: str, *, branch: str = "main", hours: int = 
4, max_prs: int = 10
+) -> RecentPRFailureInfo:
+    """Return cached recent-PR failure info, fetching fresh data when the 
cache expires."""
+    cache_key = f"recent_pr_failures_{branch}"
+    cached = _get_cached_status(github_repository, cache_key)
+    if cached is not None:
+        get_console().print("[dim]Using cached recent-PR failure data (expires 
after 4 h).[/]")
+        return RecentPRFailureInfo(
+            failing_checks=cached["failing_checks"],
+            failing_check_names=set(cached["failing_check_names"]),
+            prs_examined=cached["prs_examined"],
+        )
+    result = _fetch_recent_pr_failures(token, github_repository, 
branch=branch, hours=hours, max_prs=max_prs)
+    _save_status_cache(
+        github_repository,
+        cache_key,
+        {
+            "failing_checks": result.failing_checks,
+            "failing_check_names": list(result.failing_check_names),
+            "prs_examined": result.prs_examined,
+        },
+    )
+    return result
+
+
+def _cached_fetch_main_canary_builds(
+    token: str, github_repository: str, *, branch: str = "main", count: int = 4
+) -> list[dict]:
+    """Return cached canary build data, fetching fresh data when the cache 
expires."""
+    cache_key = f"canary_builds_{branch}"
+    cached = _get_cached_status(github_repository, cache_key)
+    if cached is not None:
+        get_console().print("[dim]Using cached canary build data (expires 
after 4 h).[/]")
+        return cached
+    result = _fetch_main_canary_builds(token, github_repository, 
branch=branch, count=count)
+    _save_status_cache(github_repository, cache_key, result)
+    return result
+
+
 def _cached_assess_pr(
     github_repository: str,
     head_sha: str,
@@ -490,6 +567,14 @@ class RecentPRFailureInfo:
         return matched
 
 
+@dataclass
+class LogSnippetInfo:
+    """Log snippet from a failed CI check, with a link to the full log."""
+
+    snippet: str
+    job_url: str  # html_url of the failed job (for clickable link)
+
+
 @dataclass
 class PRData:
     """PR data fetched from GraphQL."""
@@ -2453,20 +2538,37 @@ def _display_pr_panel(pr: PRData, author_profile: dict 
| None, assessment):
     _display_unresolved_threads_panel(pr)
 
 
-def _display_log_snippets_panel(log_snippets: dict[str, str]) -> None:
+def _display_log_snippets_panel(log_snippets: dict[str, LogSnippetInfo], pr: 
PRData | None = None) -> None:
     """Display Rich panel(s) with log snippets from failed CI checks."""
+    from rich.console import Group
+    from rich.text import Text
+
     console = get_console()
-    for check_name, snippet in log_snippets.items():
+    for check_name, info in log_snippets.items():
         # Truncate very long snippets for display
-        display_snippet = snippet
+        display_snippet = info.snippet
         if len(display_snippet) > 2000:
             display_snippet = display_snippet[:2000] + "\n... (truncated)"
+
+        # Build content: clickable links header + pre-formatted log text
+        renderables: list[Any] = []
+        link_parts: list[str] = []
+        if pr:
+            link_parts.append(f"PR: [link={pr.url}]#{pr.number}[/link]")
+        if info.job_url:
+            link_parts.append(f"Log: 
[link={info.job_url}]{info.job_url}[/link]")
+        if link_parts:
+            renderables.append("  ".join(link_parts))
+
+        # Use Text object to preserve exact whitespace/indentation in log 
output
+        renderables.append(Text(display_snippet))
+
         console.print(
             Panel(
-                display_snippet,
+                Group(*renderables),
                 title=f"Failed check logs: {check_name}",
                 border_style="red",
-                expand=False,
+                expand=True,
             )
         )
 
@@ -2476,13 +2578,13 @@ def _launch_background_log_fetching(
     github_repository: str,
     prs: list,
     llm_concurrency: int,
-) -> dict[int, Future[dict[str, str]]]:
+) -> dict[int, Future[dict[str, LogSnippetInfo]]]:
     """Launch background CI log fetching for all PRs with failed checks.
 
-    Returns a dict mapping PR number -> Future[dict[str, str]].
+    Returns a dict mapping PR number -> Future[dict[str, LogSnippetInfo]].
     Uses the same concurrency level as LLM assessments.
     """
-    log_futures: dict[int, Future[dict[str, str]]] = {}
+    log_futures: dict[int, Future[dict[str, LogSnippetInfo]]] = {}
     prs_with_failures = [pr for pr in prs if pr.failed_checks and pr.head_sha]
     if not prs_with_failures:
         return log_futures
@@ -2616,7 +2718,7 @@ def _llm_progress_status(completed: int, total: int, 
flagged: int, errors: int)
     remaining = total - completed
     parts = [f"{completed}/{total} done"]
     if flagged:
-        parts.append(f"{flagged} flagged")
+        parts.append(f"{flagged} issues found")
     if errors:
         parts.append(f"{errors} errors")
     if remaining:
@@ -2703,8 +2805,8 @@ class TriageContext:
     llm_passing: list
     # Main branch failure info (optional)
     main_failures: RecentPRFailureInfo | None = None
-    # Background CI log fetching: PR number -> Future[dict[str, str]]
-    log_futures: dict[int, Future[dict[str, str]]] = 
field(default_factory=dict)
+    # Background CI log fetching: PR number -> Future[dict[str, 
LogSnippetInfo]]
+    log_futures: dict[int, Future[dict[str, LogSnippetInfo]]] = 
field(default_factory=dict)
 
     def collect_llm_progress(self) -> None:
         """Collect completed LLM results and print progress status."""
@@ -2944,7 +3046,7 @@ def _prompt_and_execute_flagged_pr(
             if log_future.done():
                 log_snippets = log_future.result()
                 if log_snippets:
-                    _display_log_snippets_panel(log_snippets)
+                    _display_log_snippets_panel(log_snippets, pr=pr)
             else:
                 # Logs are still being fetched — offer user to wait or cancel
                 from airflow_breeze.utils.confirm import _read_char
@@ -2965,7 +3067,7 @@ def _prompt_and_execute_flagged_pr(
                     try:
                         log_snippets = log_future.result(timeout=120)
                         if log_snippets:
-                            _display_log_snippets_panel(log_snippets)
+                            _display_log_snippets_panel(log_snippets, pr=pr)
                     except TimeoutError:
                         get_console().print("  [warning]CI log retrieval timed 
out.[/]")
                     except Exception:
@@ -2978,7 +3080,7 @@ def _prompt_and_execute_flagged_pr(
                 ctx.token, ctx.github_repository, pr.head_sha, pr.failed_checks
             )
             if log_snippets:
-                _display_log_snippets_panel(log_snippets)
+                _display_log_snippets_panel(log_snippets, pr=pr)
 
     # Check if PR failures match main branch failures
     main_matching: list[str] = []
@@ -3038,7 +3140,7 @@ def _prompt_and_execute_flagged_pr(
     # downgrade it from "report" to regular "flagged" — user has reviewed and 
decided.
     if action != TriageAction.SKIP and getattr(assessment, "should_report", 
False):
         assessment.should_report = False
-        console_print("  [info]Report status cleared — PR marked as 
flagged.[/]")
+        console_print("  [info]Report status cleared — PR marked as issues 
found.[/]")
 
     # For actions that post comments, let the user select violations and 
preview the comment
     draft_comment = ""
@@ -3289,7 +3391,7 @@ def _enrich_candidate_details(
 
     console_print(
         f"[info]Fetching check details for {len(candidate_prs)} "
-        f"candidate {'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
+        f"{'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
     )
     _fetch_check_details_batch(token, github_repository, candidate_prs)
 
@@ -3320,7 +3422,7 @@ def _enrich_candidate_details(
     if run_api:
         console_print(
             f"[info]Fetching review thread details for {len(candidate_prs)} "
-            f"candidate {'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
+            f"{'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
         )
         _fetch_unresolved_comments_batch(token, github_repository, 
candidate_prs)
 
@@ -3364,8 +3466,8 @@ def _review_workflow_approval_prs(ctx: TriageContext, 
pending_approval: list[PRD
         author_count = ctx.author_flagged_count.get(pr.author_login, 0)
         if author_count > 3:
             console_print(
-                f"  [bold red]Author {pr.author_login} has {author_count} 
flagged "
-                f"{'PRs' if author_count != 1 else 'PR'} "
+                f"  [bold red]Author {pr.author_login} has {author_count} "
+                f"{'PRs' if author_count != 1 else 'PR'} with issues "
                 f"— suggesting close instead of workflow approval.[/]"
             )
             close_comment = _build_close_comment(
@@ -3779,8 +3881,8 @@ def _review_deterministic_flagged_prs(
         return
 
     console_print(
-        f"\n[info]Reviewing {len(det_flagged_prs)} deterministically flagged "
-        f"{'PRs' if len(det_flagged_prs) != 1 else 'PR'}"
+        f"\n[info]Reviewing {len(det_flagged_prs)} {'PRs' if 
len(det_flagged_prs) != 1 else 'PR'} "
+        f"with issues found"
         f"{' (LLM assessments running in background)' if ctx.llm_future_to_pr 
else ''}...[/]\n"
     )
 
@@ -3796,7 +3898,7 @@ def _review_deterministic_flagged_prs(
             count = ctx.author_flagged_count[current_author]
             console_print()
             get_console().rule(
-                f"[bold]Author: {current_author}[/] ({count} flagged PR{'s' if 
count != 1 else ''})",
+                f"[bold]Author: {current_author}[/] ({count} PR{'s' if count 
!= 1 else ''} with issues)",
                 style="cyan",
             )
 
@@ -3835,8 +3937,8 @@ def _review_llm_flagged_prs(ctx: TriageContext, 
llm_candidates: list[PRData]) ->
             if remaining:
                 status_parts.append(f"{remaining} still running")
             console_print(
-                f"\n[info]{len(new_flagged)} new LLM-flagged "
-                f"{'PRs' if len(new_flagged) != 1 else 'PR'} ready for review "
+                f"\n[info]{len(new_flagged)} new "
+                f"{'PRs' if len(new_flagged) != 1 else 'PR'} with LLM issues 
found, ready for review "
                 f"({', '.join(status_parts)}):[/]\n"
             )
 
@@ -3874,7 +3976,7 @@ def _review_llm_flagged_prs(ctx: TriageContext, 
llm_candidates: list[PRData]) ->
         )
 
     console_print(
-        f"\n[info]LLM assessment complete: {len(ctx.llm_assessments)} flagged, 
"
+        f"\n[info]LLM assessment complete: {len(ctx.llm_assessments)} issues 
found, "
         f"{len(ctx.llm_passing)} passed, {len(ctx.llm_errors)} errors "
         f"(out of {len(ctx.llm_future_to_pr)} assessed).[/]\n"
     )
@@ -4855,8 +4957,8 @@ def _display_triage_summary(
 
     console_print(
         f"\n[info]Assessment complete: {total_flagged} {'PRs' if total_flagged 
!= 1 else 'PR'} "
-        f"flagged ({total_deterministic_flags} CI/conflicts/comments, "
-        f"{total_llm_flagged} LLM-flagged"
+        f"with issues found ({total_deterministic_flags} 
CI/conflicts/comments, "
+        f"{total_llm_flagged} LLM"
         f"{f', {total_llm_errors} LLM errors' if total_llm_errors else ''}"
         f"{f', {len(pending_approval)} awaiting workflow approval' if 
pending_approval else ''}"
         f"{f', {len(workflows_in_progress)} workflows in progress' if 
workflows_in_progress else ''}"
@@ -4881,12 +4983,12 @@ def _display_triage_summary(
         summary_table.add_row("  Commented (no response)", 
str(triaged_waiting_count))
         summary_table.add_row("  Triaged (author responded)", 
str(triaged_responded_count))
     summary_table.add_row("PRs assessed", str(len(candidate_prs)))
-    summary_table.add_row("Flagged by CI/conflicts/comments", 
str(total_deterministic_flags))
-    summary_table.add_row("Flagged by LLM", str(total_llm_flagged))
+    summary_table.add_row("Issues found by CI/conflicts/comments", 
str(total_deterministic_flags))
+    summary_table.add_row("Issues found by LLM", str(total_llm_flagged))
     if total_llm_report:
         summary_table.add_row("[red]Potentially flagged for report[/red]", 
f"[red]{total_llm_report}[/red]")
     summary_table.add_row("LLM errors (skipped)", str(total_llm_errors))
-    summary_table.add_row("Total flagged", str(total_flagged))
+    summary_table.add_row("Total issues found", str(total_flagged))
     summary_table.add_row("PRs passing all checks", str(len(passing_prs)))
     summary_table.add_row("Drafts with issues (skipped)", 
str(len(skipped_drafts)))
     summary_table.add_row(
@@ -5118,10 +5220,10 @@ _LOG_DOWNLOAD_MAX_BYTES = 5 * 1024 * 1024  # 5 MB
 
 def _fetch_failed_job_log_snippets(
     token: str, github_repository: str, head_sha: str, failed_check_names: 
list[str]
-) -> dict[str, str]:
+) -> dict[str, LogSnippetInfo]:
     """Fetch short log snippets from failed GitHub Actions jobs for a commit.
 
-    Returns a dict mapping failed check name -> log snippet (last N lines of 
the failed step).
+    Returns a dict mapping failed check name -> LogSnippetInfo (snippet + job 
URL).
     Only fetches logs for checks in ``failed_check_names`` to limit API calls.
     """
     import io
@@ -5141,7 +5243,7 @@ def _fetch_failed_job_log_snippets(
     if not failed_runs:
         return {}
 
-    snippets: dict[str, str] = {}
+    snippets: dict[str, LogSnippetInfo] = {}
     headers = {"Authorization": f"Bearer {token}", "Accept": 
"application/vnd.github+json"}
 
     for run in failed_runs:
@@ -5218,7 +5320,7 @@ def _fetch_failed_job_log_snippets(
             job_name = job.get("name", "")
             snippet = _extract_failed_step_snippet(zf, job_name, job)
             if snippet:
-                snippets[check_name] = snippet
+                snippets[check_name] = LogSnippetInfo(snippet=snippet, 
job_url=job.get("html_url", ""))
 
         zf.close()
 
@@ -5265,34 +5367,60 @@ def _extract_failed_step_snippet(zf, job_name: str, 
job: dict) -> str:
                     candidate_files.append(zname)
 
         for log_file in candidate_files:
-            try:
-                log_content = zf.read(log_file).decode("utf-8", 
errors="replace")
-            except (KeyError, OSError):
+            log_tail = _read_log_tail(zf, log_file)
+            if log_tail is None:
                 continue
 
-            snippet = _extract_error_lines(log_content, step_name)
+            snippet = _extract_error_lines(log_tail, step_name)
             if snippet:
                 return snippet
 
     # Fallback: try to find any log file for this job and extract errors
     for zname in zip_names:
         if job_name in zname or job_name.replace("/", "_") in zname:
-            try:
-                log_content = zf.read(zname).decode("utf-8", errors="replace")
-            except (KeyError, OSError):
+            log_tail = _read_log_tail(zf, zname)
+            if log_tail is None:
                 continue
-            snippet = _extract_error_lines(log_content, "")
+            snippet = _extract_error_lines(log_tail, "")
             if snippet:
                 return snippet
 
     return ""
 
 
+_LOG_TAIL_BYTES = 128 * 1024  # 128 KB — enough for ~1000 log lines
+_LOG_TAIL_LINES = 1000
+
+
+def _read_log_tail(zf, log_file: str) -> str | None:
+    """Read only the last ~1000 lines of a log file from a zip archive.
+
+    Returns the tail as a string, or None if the file cannot be read.
+    """
+    try:
+        raw = zf.read(log_file)
+    except (KeyError, OSError):
+        return None
+    # Only decode the tail to save memory on large log files
+    if len(raw) > _LOG_TAIL_BYTES:
+        raw = raw[-_LOG_TAIL_BYTES:]
+        # Drop the first (likely partial) line
+        nl = raw.find(b"\n")
+        if nl != -1:
+            raw = raw[nl + 1 :]
+    text = raw.decode("utf-8", errors="replace")
+    lines = text.splitlines()
+    if len(lines) > _LOG_TAIL_LINES:
+        lines = lines[-_LOG_TAIL_LINES:]
+    return "\n".join(lines)
+
+
 def _extract_error_lines(log_content: str, step_name: str) -> str:
     """Extract relevant error lines from a log file.
 
     Looks for error markers, then takes surrounding context.
     Falls back to the last N lines if no error markers are found.
+    The input is expected to already be truncated to the last ~1000 lines.
     """
     lines = log_content.splitlines()
     if not lines:
@@ -5310,7 +5438,6 @@ def _extract_error_lines(log_content: str, step_name: 
str) -> str:
         "AssertionError",
         "Exception:",
         "Traceback (most recent call last)",
-        "✗",
         "❌",
     ]
 
@@ -5323,14 +5450,14 @@ def _extract_error_lines(log_content: str, step_name: 
str) -> str:
             error_indices.append(i)
 
     if error_indices:
-        # Take a window around the first error cluster
-        first_error = error_indices[0]
+        # Take a window around the last error cluster (most relevant failure 
info)
         last_error = error_indices[-1]
+        first_error = error_indices[0]
 
-        # If errors are spread across the file, focus on the first cluster
-        for i in range(1, len(error_indices)):
+        # Walk backwards to find the start of the last cluster
+        for i in range(len(error_indices) - 1, 0, -1):
             if error_indices[i] - error_indices[i - 1] > 20:
-                last_error = error_indices[i - 1]
+                first_error = error_indices[i]
                 break
 
         start = max(0, first_error - 3)
@@ -5929,6 +6056,7 @@ def auto_triage(
         for label, get_dir in [
             ("review", _get_review_cache_dir),
             ("triage", _get_triage_cache_dir),
+            ("status", _get_status_cache_dir),
         ]:
             cache_dir = get_dir(github_repository)
             if cache_dir.exists():
@@ -5972,11 +6100,11 @@ def auto_triage(
     # Refresh collaborators cache in the background on every run
     _refresh_collaborators_cache_in_background(token, github_repository)
 
-    # Preload main branch CI failure information
-    main_failures = _fetch_recent_pr_failures(token, github_repository)
+    # Preload main branch CI failure information (cached for 4 hours)
+    main_failures = _cached_fetch_recent_pr_failures(token, github_repository)
 
-    # Show status of recent scheduled (canary) builds on main branch
-    canary_builds = _fetch_main_canary_builds(token, github_repository)
+    # Show status of recent scheduled (canary) builds on main branch (cached 
for 4 hours)
+    canary_builds = _cached_fetch_main_canary_builds(token, github_repository)
     _display_canary_builds_status(canary_builds, token, github_repository)
 
     # Resolve review-requested filter: --reviews-for-me uses authenticated 
user, --reviews-for uses specified users
@@ -6663,7 +6791,7 @@ def auto_triage(
         accepted_prs.extend(batch_accepted)
 
         if not candidate_prs:
-            console_print("[info]No candidates in this batch.[/]")
+            console_print("[info]No PRs to assess in this batch.[/]")
             _display_pr_overview_table(all_prs)
             continue
 
@@ -6879,7 +7007,7 @@ def auto_triage(
 
     enrich_total = t_enrich_end - t_enrich_start
     timing_table.add_row(
-        "Enrich candidates (checks + mergeability + comments)",
+        "Enrich PRs (checks + mergeability + comments)",
         _fmt_duration(enrich_total),
         str(len(candidate_prs)),
         _fmt_duration(enrich_total / num_candidates),
@@ -6978,7 +7106,17 @@ def auto_triage(
             if any(pr.number == pr_num for pr in skipped_drafts):
                 result = "[dim]draft-skipped[/]"
             elif pr_num in assessments or pr_num in llm_assessments:
-                result = "[red]flagged[/]"
+                action_for_result = pr_actions.get(pr_num, "")
+                if action_for_result == "drafted":
+                    result = "[yellow]drafted[/]"
+                elif action_for_result == "commented":
+                    result = "[yellow]commented[/]"
+                elif action_for_result == "closed":
+                    result = "[red]closed[/]"
+                elif action_for_result == "skipped":
+                    result = "[dim]skipped[/]"
+                else:
+                    result = "[yellow]issues found[/]"
             elif any(pr.number == pr_num for pr in passing_prs) or any(
                 pr.number == pr_num for pr in llm_passing
             ):

Reply via email to