This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a48cf93c68b Run LLM assessments in parallel with deterministic triage 
(#63319)
a48cf93c68b is described below

commit a48cf93c68bcb4afa115423c0a360b5824e3aeee
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Mar 11 14:34:09 2026 +0100

    Run LLM assessments in parallel with deterministic triage (#63319)
    
    Start LLM assessments as background futures using ThreadPoolExecutor,
    then process workflow approvals and deterministic flags while LLM runs.
    Show LLM progress between interactive prompts. Present LLM-flagged PRs
    to the user as they complete instead of blocking — while the user
    reviews each PR, more LLM results arrive in the background.
    
    Move workflow approval (formerly Phase 6) to run immediately after
    LLM submission (Phase 4b) to maximize parallel processing time.
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../src/airflow_breeze/commands/pr_commands.py     | 766 ++++++++++++++-------
 1 file changed, 502 insertions(+), 264 deletions(-)

diff --git a/dev/breeze/src/airflow_breeze/commands/pr_commands.py 
b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
index 13f8ce631d7..6e8bd6176fa 100644
--- a/dev/breeze/src/airflow_breeze/commands/pr_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 import sys
 import time
-from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import ThreadPoolExecutor
 from dataclasses import dataclass
 
 import click
@@ -1267,6 +1267,56 @@ def _resolve_unknown_mergeable(token: str, 
github_repository: str, prs: list[PRD
     return resolved
 
 
+def _llm_progress_status(completed: int, total: int, flagged: int, errors: 
int) -> str:
+    """Build a one-line LLM assessment progress string for display between 
prompts."""
+    if total == 0:
+        return ""
+    remaining = total - completed
+    parts = [f"{completed}/{total} done"]
+    if flagged:
+        parts.append(f"{flagged} flagged")
+    if errors:
+        parts.append(f"{errors} errors")
+    if remaining:
+        parts.append(f"{remaining} in progress")
+    return f"[dim]LLM assessment: {', '.join(parts)}[/]"
+
+
+def _collect_llm_results(
+    future_to_pr: dict,
+    llm_assessments: dict,
+    llm_completed: list[int],
+    llm_errors: list[int],
+    llm_passing: list,
+    block: bool = False,
+) -> None:
+    """Collect completed LLM futures. If block=False, only collect 
already-done futures."""
+    from concurrent.futures import wait
+
+    if not future_to_pr:
+        return
+
+    if block:
+        # Wait for all remaining futures
+        pending = [f for f in future_to_pr if not f.done()]
+        if pending:
+            wait(pending)
+
+    done_futures = [f for f in future_to_pr if f.done() and f not in 
llm_completed]
+    for future in done_futures:
+        llm_completed.append(future)
+        pr = future_to_pr[future]
+        assessment = future.result()
+        if assessment.error:
+            llm_errors.append(pr.number)
+            continue
+        if not assessment.should_flag:
+            llm_passing.append(pr)
+            get_console().print(f"  [success]PR {_pr_link(pr)} passes LLM 
quality check.[/]")
+            continue
+        llm_assessments[pr.number] = assessment
+
+
 def _fetch_pr_diff(token: str, github_repository: str, pr_number: int) -> str 
| None:
     """Fetch the diff for a PR via GitHub REST API. Returns the diff text or 
None on failure."""
     import requests
@@ -1848,9 +1898,13 @@ def auto_triage(
             f"{'PRs' if len(pending_approval) != 1 else 'PR'} awaiting 
workflow approval.[/]\n"
         )
 
-    # Phase 4: Run LLM assessments concurrently for PRs without CI failures
-    total_llm_errors = 0
-    pr_timings: dict[int, float] = {}  # PR number -> assessment duration in 
seconds
+    # Phase 4: Start LLM assessments in background (non-blocking) while we 
process deterministic flags
+    llm_future_to_pr: dict = {}
+    llm_assessments: dict[int, PRAssessment] = {}
+    llm_completed: list = []
+    llm_errors: list[int] = []
+    llm_passing: list[PRData] = []
+    llm_executor = None
 
     if not run_llm:
         if llm_candidates:
@@ -1864,62 +1918,25 @@ def auto_triage(
         if pending_approval:
             skipped_detail += f", {len(pending_approval)} awaiting workflow 
approval"
         get_console().print(
-            f"\n[info]Running LLM assessment for {len(llm_candidates)} "
-            f"{'PRs' if len(llm_candidates) != 1 else 'PR'} (skipped 
{skipped_detail})...[/]\n"
+            f"\n[info]Starting LLM assessment for {len(llm_candidates)} "
+            f"{'PRs' if len(llm_candidates) != 1 else 'PR'} in background "
+            f"(skipped {skipped_detail})...[/]\n"
         )
-        with ThreadPoolExecutor(max_workers=llm_concurrency) as executor:
-            future_to_pr = {}
-            future_start_times: dict[int, float] = {}
-            for pr in llm_candidates:
-                get_console().print(f"  [dim]Assessing PR {_pr_link(pr)}: 
{pr.title[:60]}...[/]")
-                future_start_times[pr.number] = time.monotonic()
-                future = executor.submit(
-                    assess_pr,
-                    pr_number=pr.number,
-                    pr_title=pr.title,
-                    pr_body=pr.body,
-                    check_status_summary=pr.check_summary,
-                    llm_model=llm_model,
-                )
-                future_to_pr[future] = pr
-            for future in as_completed(future_to_pr):
-                pr = future_to_pr[future]
-                pr_timings[pr.number] = time.monotonic() - 
future_start_times[pr.number]
-                assessment = future.result()
-                if assessment.error:
-                    total_llm_errors += 1
-                    continue
-                if not assessment.should_flag:
-                    get_console().print(
-                        f"  [success]PR {_pr_link(pr)}: {pr.title[:60]} — 
passes quality check "
-                        f"({_fmt_duration(pr_timings[pr.number])}).[/]"
-                    )
-                    passing_prs.append(pr)
-                    continue
-                get_console().print(
-                    f"  [red]PR {_pr_link(pr)}: {pr.title[:60]} — flagged by 
LLM "
-                    f"({_fmt_duration(pr_timings[pr.number])}).[/]"
-                )
-                assessments[pr.number] = assessment
-
-    total_flagged = len(assessments)
-    summary_parts = [
-        f"{total_deterministic_flags} CI/conflicts/comments",
-        f"{total_flagged - total_deterministic_flags} LLM-flagged",
-    ]
-    if pending_approval:
-        summary_parts.append(f"{len(pending_approval)} awaiting workflow 
approval")
-    if total_llm_errors:
-        summary_parts.append(f"{total_llm_errors} LLM errors")
-    get_console().print(
-        f"\n[info]Assessment complete: {total_flagged} {'PRs' if total_flagged 
!= 1 else 'PR'} "
-        f"flagged ({', '.join(summary_parts)}).[/]\n"
-    )
-
-    t_phase4_end = time.monotonic()
+        llm_executor = ThreadPoolExecutor(max_workers=llm_concurrency)
+        llm_future_to_pr = {
+            llm_executor.submit(
+                assess_pr,
+                pr_number=pr.number,
+                pr_title=pr.title,
+                pr_body=pr.body,
+                check_status_summary=pr.check_summary,
+                llm_model=llm_model,
+            ): pr
+            for pr in llm_candidates
+        }
 
-    # Phase 5: Present flagged PRs interactively, grouped by author
-    t_phase5_start = time.monotonic()
+    # Phase 4b: Present NOT_RUN PRs for workflow approval while LLM runs
+    total_workflows_approved = 0
     total_converted = 0
     total_commented = 0
     total_closed = 0
@@ -1928,197 +1945,36 @@ def auto_triage(
     pr_actions: dict[int, str] = {}  # PR number -> action taken by user
 
     quit_early = False
-
-    # Build sorted list of flagged PRs grouped by author
-    flagged_prs = [(pr, assessments[pr.number]) for pr in candidate_prs if 
pr.number in assessments]
-    flagged_prs.sort(key=lambda pair: (pair[0].author_login.lower(), 
pair[0].number))
+    # author_flagged_count is populated below after deterministic flagged PRs 
are built,
+    # but we need a preliminary version for the workflow approval phase
     from collections import Counter
 
-    author_flagged_count: dict[str, int] = dict(Counter(pr.author_login for 
pr, _ in flagged_prs))
-
-    current_author: str | None = None
-    for pr, assessment in flagged_prs:
-        if pr.author_login != current_author:
-            current_author = pr.author_login
-            count = author_flagged_count[current_author]
-            get_console().print()
-            get_console().rule(
-                f"[bold]Author: {current_author}[/] ({count} flagged PR{'s' if 
count != 1 else ''})",
-                style="cyan",
-            )
-
-        # Fetch author profile for context (only for flagged PRs)
-        author_profile = _fetch_author_profile(token, pr.author_login, 
github_repository)
-
-        comment = _build_comment(
-            pr.author_login, assessment.violations, pr.number, 
pr.commits_behind, pr.base_ref
-        )
-        comment_only = _build_comment(
-            pr.author_login,
-            assessment.violations,
-            pr.number,
-            pr.commits_behind,
-            pr.base_ref,
-            comment_only=True,
-        )
-        close_comment = _build_close_comment(
-            pr.author_login,
-            assessment.violations,
-            pr.number,
-            author_flagged_count.get(pr.author_login, 0),
-        )
-        _display_pr_panel(pr, author_profile, assessment, comment)
-
-        default_action, reason = _compute_default_action(pr, assessment, 
author_flagged_count)
-        if default_action == TriageAction.CLOSE:
-            get_console().print(Panel(close_comment, title="Proposed close 
comment", border_style="red"))
-        get_console().print(f"  [bold]{reason}[/]")
-
-        if dry_run:
-            action_label = {
-                TriageAction.DRAFT: "draft",
-                TriageAction.COMMENT: "add comment",
-                TriageAction.CLOSE: "close",
-                TriageAction.READY: "ready",
-                TriageAction.SKIP: "skip",
-            }[default_action]
-            get_console().print(f"[warning]Dry run — would default to: 
{action_label}[/]")
-            continue
-
-        action = prompt_triage_action(
-            f"Action for PR {_pr_link(pr)}?",
-            default=default_action,
-            forced_answer=answer_triage,
-        )
-
-        if action == TriageAction.QUIT:
-            get_console().print("[warning]Quitting.[/]")
-            quit_early = True
-            break
-
-        if action == TriageAction.SKIP:
-            get_console().print(f"  [info]Skipping PR {_pr_link(pr)} — no 
action taken.[/]")
-            total_skipped_action += 1
-            pr_actions[pr.number] = "skipped"
-            continue
-
-        if action == TriageAction.READY:
-            get_console().print(
-                f"  [info]Marking PR {_pr_link(pr)} as ready — adding 
'{_READY_FOR_REVIEW_LABEL}' label.[/]"
-            )
-            if _add_label(token, github_repository, pr.node_id, 
_READY_FOR_REVIEW_LABEL):
-                get_console().print(
-                    f"  [success]Label '{_READY_FOR_REVIEW_LABEL}' added to PR 
{_pr_link(pr)}.[/]"
-                )
-                total_ready += 1
-                pr_actions[pr.number] = "ready"
-            else:
-                get_console().print(f"  [warning]Failed to add label to PR 
{_pr_link(pr)}.[/]")
-            continue
-
-        if action == TriageAction.COMMENT:
-            get_console().print(f"  Posting comment on PR {_pr_link(pr)}...")
-            if _post_comment(token, pr.node_id, comment_only):
-                get_console().print(f"  [success]Comment posted on PR 
{_pr_link(pr)}.[/]")
-                total_commented += 1
-                pr_actions[pr.number] = "commented"
-            else:
-                get_console().print(f"  [error]Failed to post comment on PR 
{_pr_link(pr)}.[/]")
-            continue
-
-        if action == TriageAction.DRAFT:
-            get_console().print(f"  Converting PR {_pr_link(pr)} to draft...")
-            if _convert_pr_to_draft(token, pr.node_id):
-                get_console().print(f"  [success]PR {_pr_link(pr)} converted 
to draft.[/]")
-            else:
-                get_console().print(f"  [error]Failed to convert PR 
{_pr_link(pr)} to draft.[/]")
-                continue
-
-            get_console().print(f"  Posting comment on PR {_pr_link(pr)}...")
-            if _post_comment(token, pr.node_id, comment):
-                get_console().print(f"  [success]Comment posted on PR 
{_pr_link(pr)}.[/]")
-                total_converted += 1
-                pr_actions[pr.number] = "drafted"
-            else:
-                get_console().print(f"  [error]Failed to post comment on PR 
{_pr_link(pr)}.[/]")
-            continue
-
-        if action == TriageAction.CLOSE:
-            get_console().print(f"  Closing PR {_pr_link(pr)}...")
-            if _close_pr(token, pr.node_id):
-                get_console().print(f"  [success]PR {_pr_link(pr)} closed.[/]")
-            else:
-                get_console().print(f"  [error]Failed to close PR 
{_pr_link(pr)}.[/]")
-                continue
-
-            if _add_label(token, github_repository, pr.node_id, 
_CLOSED_QUALITY_LABEL):
-                get_console().print(
-                    f"  [success]Label '{_CLOSED_QUALITY_LABEL}' added to PR 
{_pr_link(pr)}.[/]"
-                )
-            else:
-                get_console().print(f"  [warning]Failed to add label to PR 
{_pr_link(pr)}.[/]")
-
-            get_console().print(f"  Posting comment on PR {_pr_link(pr)}...")
-            if _post_comment(token, pr.node_id, close_comment):
-                get_console().print(f"  [success]Comment posted on PR 
{_pr_link(pr)}.[/]")
-                total_closed += 1
-                pr_actions[pr.number] = "closed"
-            else:
-                get_console().print(f"  [error]Failed to post comment on PR 
{_pr_link(pr)}.[/]")
-
-    # Phase 5b: Present passing PRs for optional ready-for-review marking
-    if not quit_early and passing_prs:
-        passing_prs.sort(key=lambda p: (p.author_login.lower(), p.number))
-        get_console().print(
-            f"\n[info]{len(passing_prs)} {'PRs pass' if len(passing_prs) != 1 
else 'PR passes'} "
-            f"all checks — review to mark as ready:[/]\n"
-        )
-        for pr in passing_prs:
-            author_profile = _fetch_author_profile(token, pr.author_login, 
github_repository)
-            _display_pr_info_panels(pr, author_profile)
-
-            if dry_run:
-                get_console().print("[warning]Dry run — skipping.[/]")
-                continue
-
-            action = prompt_triage_action(
-                f"Action for PR {_pr_link(pr)}?",
-                default=TriageAction.SKIP,
-                forced_answer=answer_triage,
-            )
-
-            if action == TriageAction.QUIT:
-                get_console().print("[warning]Quitting.[/]")
-                quit_early = True
-                break
-
-            if action == TriageAction.READY:
-                get_console().print(
-                    f"  [info]Marking PR {_pr_link(pr)} as ready "
-                    f"— adding '{_READY_FOR_REVIEW_LABEL}' label.[/]"
-                )
-                if _add_label(token, github_repository, pr.node_id, 
_READY_FOR_REVIEW_LABEL):
-                    get_console().print(
-                        f"  [success]Label '{_READY_FOR_REVIEW_LABEL}' added 
to PR {_pr_link(pr)}.[/]"
-                    )
-                    total_ready += 1
-                    pr_actions[pr.number] = "ready"
-                else:
-                    get_console().print(f"  [warning]Failed to add label to PR 
{_pr_link(pr)}.[/]")
-            else:
-                get_console().print(f"  [info]Skipping PR {_pr_link(pr)} — no 
action taken.[/]")
-                total_skipped_action += 1
-                pr_actions[pr.number] = "skipped"
+    author_flagged_count: dict[str, int] = dict(
+        Counter(pr.author_login for pr in candidate_prs if pr.number in 
assessments)
+    )
 
-    # Phase 6: Present NOT_RUN PRs for workflow approval
-    total_workflows_approved = 0
     if not quit_early and pending_approval:
         pending_approval.sort(key=lambda p: (p.author_login.lower(), p.number))
         get_console().print(
             f"\n[info]{len(pending_approval)} {'PRs have' if 
len(pending_approval) != 1 else 'PR has'} "
-            f"no test workflows run — review and approve workflow runs:[/]\n"
+            f"no test workflows run — review and approve workflow runs"
+            f"{' (LLM assessments running in background)' if llm_future_to_pr 
else ''}:[/]\n"
         )
         for pr in pending_approval:
+            # Collect any completed LLM results and show progress
+            if llm_future_to_pr:
+                _collect_llm_results(
+                    llm_future_to_pr, llm_assessments, llm_completed, 
llm_errors, llm_passing
+                )
+                progress = _llm_progress_status(
+                    len(llm_completed),
+                    len(llm_future_to_pr),
+                    len(llm_assessments),
+                    len(llm_errors),
+                )
+                if progress:
+                    get_console().print(progress)
+
             author_profile = _fetch_author_profile(token, pr.author_login, 
github_repository)
             pending_runs = _find_pending_workflow_runs(token, 
github_repository, pr.head_sha)
             _display_workflow_approval_panel(pr, author_profile, pending_runs)
@@ -2173,7 +2029,6 @@ def auto_triage(
                         get_console().print(f"  [error]Failed to post comment 
on PR {_pr_link(pr)}.[/]")
                     continue
                 # For DRAFT or READY, fall through to normal workflow approval
-                # (approve workflows first, then triage later)
 
             if dry_run:
                 get_console().print("[warning]Dry run — skipping workflow 
approval.[/]")
@@ -2282,6 +2137,389 @@ def auto_triage(
             else:
                 get_console().print(f"  [error]Failed to approve workflow runs 
for PR {_pr_link(pr)}.[/]")
 
+    # Phase 5a: Present deterministically flagged PRs for interactive review
+    # LLM assessments continue running in background during this phase
+
+    # Build sorted list of deterministic-flagged PRs grouped by author
+    det_flagged_prs = [(pr, assessments[pr.number]) for pr in candidate_prs if 
pr.number in assessments]
+    det_flagged_prs.sort(key=lambda pair: (pair[0].author_login.lower(), 
pair[0].number))
+
+    if det_flagged_prs:
+        get_console().print(
+            f"\n[info]Reviewing {len(det_flagged_prs)} deterministically 
flagged "
+            f"{'PRs' if len(det_flagged_prs) != 1 else 'PR'}"
+            f"{' (LLM assessments running in background)' if llm_future_to_pr 
else ''}...[/]\n"
+        )
+
+    current_author: str | None = None
+    for pr, assessment in det_flagged_prs:
+        if pr.author_login != current_author:
+            current_author = pr.author_login
+            count = author_flagged_count[current_author]
+            get_console().print()
+            get_console().rule(
+                f"[bold]Author: {current_author}[/] ({count} flagged PR{'s' if 
count != 1 else ''})",
+                style="cyan",
+            )
+
+        # Collect any completed LLM results and show progress
+        if llm_future_to_pr:
+            _collect_llm_results(llm_future_to_pr, llm_assessments, 
llm_completed, llm_errors, llm_passing)
+            progress = _llm_progress_status(
+                len(llm_completed),
+                len(llm_future_to_pr),
+                len(llm_assessments),
+                len(llm_errors),
+            )
+            if progress:
+                get_console().print(progress)
+
+        # Fetch author profile for context (only for flagged PRs)
+        author_profile = _fetch_author_profile(token, pr.author_login, 
github_repository)
+
+        comment = _build_comment(
+            pr.author_login, assessment.violations, pr.number, 
pr.commits_behind, pr.base_ref
+        )
+        close_comment = _build_close_comment(
+            pr.author_login,
+            assessment.violations,
+            pr.number,
+            author_flagged_count.get(pr.author_login, 0),
+        )
+        _display_pr_panel(pr, author_profile, assessment, comment)
+
+        default_action, reason = _compute_default_action(pr, assessment, 
author_flagged_count)
+        if default_action == TriageAction.CLOSE:
+            get_console().print(Panel(close_comment, title="Proposed close 
comment", border_style="red"))
+        get_console().print(f"  [bold]{reason}[/]")
+
+        if dry_run:
+            action_label = {
+                TriageAction.DRAFT: "draft",
+                TriageAction.CLOSE: "close",
+                TriageAction.READY: "ready",
+                TriageAction.SKIP: "skip",
+            }[default_action]
+            get_console().print(f"[warning]Dry run — would default to: 
{action_label}[/]")
+            continue
+
+        action = prompt_triage_action(
+            f"Action for PR {_pr_link(pr)}?",
+            default=default_action,
+            forced_answer=answer_triage,
+        )
+
+        if action == TriageAction.QUIT:
+            get_console().print("[warning]Quitting.[/]")
+            quit_early = True
+            break
+
+        if action == TriageAction.SKIP:
+            get_console().print(f"  [info]Skipping PR {_pr_link(pr)} — no 
action taken.[/]")
+            total_skipped_action += 1
+            pr_actions[pr.number] = "skipped"
+            continue
+
+        if action == TriageAction.READY:
+            get_console().print(
+                f"  [info]Marking PR {_pr_link(pr)} as ready — adding 
'{_READY_FOR_REVIEW_LABEL}' label.[/]"
+            )
+            if _add_label(token, github_repository, pr.node_id, 
_READY_FOR_REVIEW_LABEL):
+                get_console().print(
+                    f"  [success]Label '{_READY_FOR_REVIEW_LABEL}' added to PR 
{_pr_link(pr)}.[/]"
+                )
+                total_ready += 1
+                pr_actions[pr.number] = "ready"
+            else:
+                get_console().print(f"  [warning]Failed to add label to PR 
{_pr_link(pr)}.[/]")
+            continue
+
+        if action == TriageAction.DRAFT:
+            get_console().print(f"  Converting PR {_pr_link(pr)} to draft...")
+            if _convert_pr_to_draft(token, pr.node_id):
+                get_console().print(f"  [success]PR {_pr_link(pr)} converted 
to draft.[/]")
+            else:
+                get_console().print(f"  [error]Failed to convert PR 
{_pr_link(pr)} to draft.[/]")
+                continue
+
+            get_console().print(f"  Posting comment on PR {_pr_link(pr)}...")
+            if _post_comment(token, pr.node_id, comment):
+                get_console().print(f"  [success]Comment posted on PR 
{_pr_link(pr)}.[/]")
+                total_converted += 1
+                pr_actions[pr.number] = "drafted"
+            else:
+                get_console().print(f"  [error]Failed to post comment on PR 
{_pr_link(pr)}.[/]")
+            continue
+
+        if action == TriageAction.CLOSE:
+            get_console().print(f"  Closing PR {_pr_link(pr)}...")
+            if _close_pr(token, pr.node_id):
+                get_console().print(f"  [success]PR {_pr_link(pr)} closed.[/]")
+            else:
+                get_console().print(f"  [error]Failed to close PR 
{_pr_link(pr)}.[/]")
+                continue
+
+            if _add_label(token, github_repository, pr.node_id, 
_CLOSED_QUALITY_LABEL):
+                get_console().print(
+                    f"  [success]Label '{_CLOSED_QUALITY_LABEL}' added to PR 
{_pr_link(pr)}.[/]"
+                )
+            else:
+                get_console().print(f"  [warning]Failed to add label to PR 
{_pr_link(pr)}.[/]")
+
+            get_console().print(f"  Posting comment on PR {_pr_link(pr)}...")
+            if _post_comment(token, pr.node_id, close_comment):
+                get_console().print(f"  [success]Comment posted on PR 
{_pr_link(pr)}.[/]")
+                total_closed += 1
+                pr_actions[pr.number] = "closed"
+            else:
+                get_console().print(f"  [error]Failed to post comment on PR 
{_pr_link(pr)}.[/]")
+
+    # Phase 5b: Present LLM-flagged PRs as they become ready (no blocking wait)
+    total_llm_errors = 0
+    llm_presented: set[int] = set()  # PR numbers already presented to user
+    if not quit_early and llm_future_to_pr:
+        # Collect whatever has completed so far
+        _collect_llm_results(llm_future_to_pr, llm_assessments, llm_completed, 
llm_errors, llm_passing)
+
+        # Loop: present flagged PRs as they arrive, keep collecting new results
+        while not quit_early:
+            # Find newly flagged PRs not yet presented
+            new_flagged = [
+                (pr, llm_assessments[pr.number])
+                for pr in llm_candidates
+                if pr.number in llm_assessments and pr.number not in 
llm_presented
+            ]
+            new_flagged.sort(key=lambda pair: (pair[0].author_login.lower(), 
pair[0].number))
+
+            if new_flagged:
+                remaining = len(llm_future_to_pr) - len(llm_completed)
+                status_parts = [f"{len(llm_completed)}/{len(llm_future_to_pr)} 
done"]
+                if remaining:
+                    status_parts.append(f"{remaining} still running")
+                get_console().print(
+                    f"\n[info]{len(new_flagged)} new LLM-flagged "
+                    f"{'PRs' if len(new_flagged) != 1 else 'PR'} ready for 
review "
+                    f"({', '.join(status_parts)}):[/]\n"
+                )
+
+                for pr, assessment in new_flagged:
+                    llm_presented.add(pr.number)
+                    author_flagged_count[pr.author_login] = 
author_flagged_count.get(pr.author_login, 0) + 1
+
+                    author_profile = _fetch_author_profile(token, 
pr.author_login, github_repository)
+
+                    comment = _build_comment(
+                        pr.author_login, assessment.violations, pr.number, 
pr.commits_behind, pr.base_ref
+                    )
+                    comment_only = _build_comment(
+                        pr.author_login,
+                        assessment.violations,
+                        pr.number,
+                        pr.commits_behind,
+                        pr.base_ref,
+                        comment_only=True,
+                    )
+                    close_comment = _build_close_comment(
+                        pr.author_login,
+                        assessment.violations,
+                        pr.number,
+                        author_flagged_count.get(pr.author_login, 0),
+                    )
+                    _display_pr_panel(pr, author_profile, assessment, comment)
+
+                    default_action, reason = _compute_default_action(pr, 
assessment, author_flagged_count)
+                    if default_action == TriageAction.CLOSE:
+                        get_console().print(
+                            Panel(close_comment, title="Proposed close 
comment", border_style="red")
+                        )
+                    get_console().print(f"  [bold]{reason}[/]")
+
+                    if dry_run:
+                        action_label = {
+                            TriageAction.DRAFT: "draft",
+                            TriageAction.COMMENT: "add comment",
+                            TriageAction.CLOSE: "close",
+                            TriageAction.READY: "ready",
+                            TriageAction.SKIP: "skip",
+                        }[default_action]
+                        get_console().print(f"[warning]Dry run — would default 
to: {action_label}[/]")
+                        # Collect more results while in dry-run
+                        _collect_llm_results(
+                            llm_future_to_pr, llm_assessments, llm_completed, 
llm_errors, llm_passing
+                        )
+                        continue
+
+                    action = prompt_triage_action(
+                        f"Action for PR {_pr_link(pr)}?",
+                        default=default_action,
+                        forced_answer=answer_triage,
+                    )
+
+                    # While user was deciding, more results may have arrived
+                    _collect_llm_results(
+                        llm_future_to_pr, llm_assessments, llm_completed, 
llm_errors, llm_passing
+                    )
+
+                    if action == TriageAction.QUIT:
+                        get_console().print("[warning]Quitting.[/]")
+                        quit_early = True
+                        break
+
+                    if action == TriageAction.SKIP:
+                        get_console().print(f"  [info]Skipping PR 
{_pr_link(pr)} — no action taken.[/]")
+                        total_skipped_action += 1
+                        continue
+
+                    if action == TriageAction.READY:
+                        get_console().print(
+                            f"  [info]Marking PR {_pr_link(pr)} as ready "
+                            f"— adding '{_READY_FOR_REVIEW_LABEL}' label.[/]"
+                        )
+                        if _add_label(token, github_repository, pr.node_id, 
_READY_FOR_REVIEW_LABEL):
+                            get_console().print(
+                                f"  [success]Label '{_READY_FOR_REVIEW_LABEL}' 
added to PR {_pr_link(pr)}.[/]"
+                            )
+                            total_ready += 1
+                        else:
+                            get_console().print(f"  [warning]Failed to add 
label to PR {_pr_link(pr)}.[/]")
+                        continue
+
+                    if action == TriageAction.COMMENT:
+                        get_console().print(f"  Posting comment on PR 
{_pr_link(pr)}...")
+                        if _post_comment(token, pr.node_id, comment_only):
+                            get_console().print(f"  [success]Comment posted on 
PR {_pr_link(pr)}.[/]")
+                            total_commented += 1
+                        else:
+                            get_console().print(f"  [error]Failed to post 
comment on PR {_pr_link(pr)}.[/]")
+                        continue
+
+                    if action == TriageAction.DRAFT:
+                        get_console().print(f"  Converting PR {_pr_link(pr)} 
to draft...")
+                        if _convert_pr_to_draft(token, pr.node_id):
+                            get_console().print(f"  [success]PR {_pr_link(pr)} 
converted to draft.[/]")
+                        else:
+                            get_console().print(f"  [error]Failed to convert 
PR {_pr_link(pr)} to draft.[/]")
+                            continue
+
+                        get_console().print(f"  Posting comment on PR 
{_pr_link(pr)}...")
+                        if _post_comment(token, pr.node_id, comment):
+                            get_console().print(f"  [success]Comment posted on 
PR {_pr_link(pr)}.[/]")
+                            total_converted += 1
+                        else:
+                            get_console().print(f"  [error]Failed to post 
comment on PR {_pr_link(pr)}.[/]")
+                        continue
+
+                    if action == TriageAction.CLOSE:
+                        get_console().print(f"  Closing PR {_pr_link(pr)}...")
+                        if _close_pr(token, pr.node_id):
+                            get_console().print(f"  [success]PR {_pr_link(pr)} 
closed.[/]")
+                        else:
+                            get_console().print(f"  [error]Failed to close PR 
{_pr_link(pr)}.[/]")
+                            continue
+
+                        if _add_label(token, github_repository, pr.node_id, 
_CLOSED_QUALITY_LABEL):
+                            get_console().print(
+                                f"  [success]Label '{_CLOSED_QUALITY_LABEL}' 
added to PR {_pr_link(pr)}.[/]"
+                            )
+                        else:
+                            get_console().print(f"  [warning]Failed to add 
label to PR {_pr_link(pr)}.[/]")
+
+                        get_console().print(f"  Posting comment on PR 
{_pr_link(pr)}...")
+                        if _post_comment(token, pr.node_id, close_comment):
+                            get_console().print(f"  [success]Comment posted on 
PR {_pr_link(pr)}.[/]")
+                            total_closed += 1
+                        else:
+                            get_console().print(f"  [error]Failed to post 
comment on PR {_pr_link(pr)}.[/]")
+
+            # Check if all futures are done
+            if len(llm_completed) >= len(llm_future_to_pr):
+                break
+
+            # Still pending — wait briefly for more results, then loop back
+            get_console().print(
+                f"[dim]Waiting for {len(llm_future_to_pr) - 
len(llm_completed)} "
+                f"remaining LLM {'assessments' if len(llm_future_to_pr) - 
len(llm_completed) != 1 else 'assessment'}...[/]"
+            )
+            time.sleep(2)
+            _collect_llm_results(llm_future_to_pr, llm_assessments, 
llm_completed, llm_errors, llm_passing)
+
+        total_llm_errors = len(llm_errors)
+        get_console().print(
+            f"\n[info]LLM assessment complete: {len(llm_assessments)} flagged, 
"
+            f"{len(llm_passing)} passed, {total_llm_errors} errors "
+            f"(out of {len(llm_future_to_pr)} assessed).[/]\n"
+        )
+
+        # Add LLM passing PRs to the passing list
+        passing_prs.extend(llm_passing)
+
+    # Phase 5c: Present passing PRs for optional ready-for-review marking
+    if not quit_early and passing_prs:
+        passing_prs.sort(key=lambda p: (p.author_login.lower(), p.number))
+        get_console().print(
+            f"\n[info]{len(passing_prs)} {'PRs pass' if len(passing_prs) != 1 
else 'PR passes'} "
+            f"all checks — review to mark as ready:[/]\n"
+        )
+        for pr in passing_prs:
+            author_profile = _fetch_author_profile(token, pr.author_login, 
github_repository)
+            _display_pr_info_panels(pr, author_profile)
+
+            if dry_run:
+                get_console().print("[warning]Dry run — skipping.[/]")
+                continue
+
+            action = prompt_triage_action(
+                f"Action for PR {_pr_link(pr)}?",
+                default=TriageAction.SKIP,
+                forced_answer=answer_triage,
+            )
+
+            if action == TriageAction.QUIT:
+                get_console().print("[warning]Quitting.[/]")
+                quit_early = True
+                break
+
+            if action == TriageAction.SKIP:
+                get_console().print(f"  [info]Skipping PR {_pr_link(pr)} — no 
action taken.[/]")
+                total_skipped_action += 1
+                pr_actions[pr.number] = "skipped"
+                continue
+
+            if action == TriageAction.READY:
+                get_console().print(
+                    f"  [info]Marking PR {_pr_link(pr)} as ready "
+                    f"— adding '{_READY_FOR_REVIEW_LABEL}' label.[/]"
+                )
+                if _add_label(token, github_repository, pr.node_id, 
_READY_FOR_REVIEW_LABEL):
+                    get_console().print(
+                        f"  [success]Label '{_READY_FOR_REVIEW_LABEL}' added 
to PR {_pr_link(pr)}.[/]"
+                    )
+                    total_ready += 1
+                    pr_actions[pr.number] = "ready"
+                else:
+                    get_console().print(f"  [warning]Failed to add label to PR 
{_pr_link(pr)}.[/]")
+            else:
+                get_console().print(f"  [info]Skipping PR {_pr_link(pr)} — no 
action taken.[/]")
+                total_skipped_action += 1
+                pr_actions[pr.number] = "skipped"
+
+    # Shut down LLM executor if it was started
+    if llm_executor is not None:
+        llm_executor.shutdown(wait=False, cancel_futures=True)
+
+    # Combine flagged counts for summary
+    total_flagged = total_deterministic_flags + len(llm_assessments)
+    total_llm_flagged = len(llm_assessments)
+
+    get_console().print(
+        f"\n[info]Assessment complete: {total_flagged} {'PRs' if total_flagged 
!= 1 else 'PR'} "
+        f"flagged ({total_deterministic_flags} CI/conflicts/comments, "
+        f"{total_llm_flagged} LLM-flagged"
+        f"{f', {total_llm_errors} LLM errors' if total_llm_errors else ''}"
+        f"{f', {len(pending_approval)} awaiting workflow approval' if 
pending_approval else ''}"
+        f").[/]\n"
+    )
+
     # Summary
     t_total_end = time.monotonic()
     get_console().print()
@@ -2297,7 +2535,7 @@ def auto_triage(
     summary_table.add_row("PRs skipped (filtered)", str(total_skipped))
     summary_table.add_row("PRs assessed", str(len(candidate_prs)))
     summary_table.add_row("Flagged by CI/conflicts/comments", 
str(total_deterministic_flags))
-    summary_table.add_row("Flagged by LLM", str(total_flagged - 
total_deterministic_flags))
+    summary_table.add_row("Flagged by LLM", str(total_llm_flagged))
     summary_table.add_row("LLM errors (skipped)", str(total_llm_errors))
     summary_table.add_row("Total flagged", str(total_flagged))
     summary_table.add_row("PRs passing all checks", str(len(passing_prs)))
@@ -2367,23 +2605,25 @@ def auto_triage(
     else:
         timing_table.add_row("Deterministic triage", "[dim]—[/]", "0", 
"[dim]—[/]", "[dim]—[/]", "[dim]—[/]")
 
-    if pr_timings:
-        llm_values = list(pr_timings.values())
-        llm_total = sum(llm_values)
+    llm_count = len(llm_completed)
+    llm_wall_time = t_total_end - t_phase2c_end  # LLM runs in background 
across all interactive phases
+    if llm_count:
         timing_table.add_row(
-            "LLM assessment",
-            _fmt_duration(t_phase4_end - t_phase2c_end),
-            str(len(llm_values)),
-            _fmt_duration(llm_total / len(llm_values)),
-            _fmt_duration(min(llm_values)),
-            _fmt_duration(max(llm_values)),
+            "LLM assessment (background)",
+            _fmt_duration(llm_wall_time),
+            str(llm_count),
+            "[dim]—[/]",
+            "[dim]—[/]",
+            "[dim]—[/]",
         )
     else:
-        timing_table.add_row("LLM assessment", "[dim]—[/]", "0", "[dim]—[/]", 
"[dim]—[/]", "[dim]—[/]")
+        timing_table.add_row(
+            "LLM assessment (background)", "[dim]—[/]", "0", "[dim]—[/]", 
"[dim]—[/]", "[dim]—[/]"
+        )
 
     timing_table.add_row(
-        "Interactive review",
-        _fmt_duration(t_total_end - t_phase5_start),
+        "Interactive review (overlaps LLM)",
+        _fmt_duration(t_total_end - t_phase2c_end),
         "",
         "",
         "",
@@ -2400,7 +2640,7 @@ def auto_triage(
     )
     get_console().print(timing_table)
 
-    if deterministic_timings or pr_timings:
+    if deterministic_timings:
         pr_titles = {pr.number: pr.title for pr in candidate_prs}
         # Amortize batch fetch time evenly across candidate PRs
         num_candidates = len(candidate_prs) or 1
@@ -2423,24 +2663,23 @@ def auto_triage(
         pr_timing_table.add_column("Action")
         pr_timing_table.add_column("Fetch (avg)", justify="right")
         pr_timing_table.add_column("Deterministic", justify="right")
-        pr_timing_table.add_column("LLM", justify="right")
         pr_timing_table.add_column("Total", justify="right")
 
-        # Collect all PR numbers that went through any phase
         all_pr_numbers = sorted(
-            deterministic_timings.keys() | pr_timings.keys(),
-            key=lambda n: deterministic_timings.get(n, 0) + pr_timings.get(n, 
0) + fetch_per_pr,
+            deterministic_timings.keys(),
+            key=lambda n: deterministic_timings.get(n, 0) + fetch_per_pr,
             reverse=True,
         )
         for pr_num in all_pr_numbers:
             title = pr_titles.get(pr_num, "")[:60]
             det_time = deterministic_timings.get(pr_num, 0)
-            llm_time = pr_timings.get(pr_num, 0)
-            total_time = fetch_per_pr + det_time + llm_time
+            total_time = fetch_per_pr + det_time
 
-            if pr_num in assessments:
+            if pr_num in assessments or pr_num in llm_assessments:
                 result = "[red]flagged[/]"
-            elif any(pr.number == pr_num for pr in passing_prs):
+            elif any(pr.number == pr_num for pr in passing_prs) or any(
+                pr.number == pr_num for pr in llm_passing
+            ):
                 result = "[success]passed[/]"
             elif any(pr.number == pr_num for pr in pending_approval):
                 result = "[dim]pending[/]"
@@ -2457,7 +2696,6 @@ def auto_triage(
                 action_display,
                 _fmt_duration(fetch_per_pr),
                 _fmt_duration(det_time) if det_time else "[dim]—[/]",
-                _fmt_duration(llm_time) if llm_time else "[dim]—[/]",
                 _fmt_duration(total_time),
             )
         get_console().print(pr_timing_table)


Reply via email to