This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a48cf93c68b Run LLM assessments in parallel with deterministic triage
(#63319)
a48cf93c68b is described below
commit a48cf93c68bcb4afa115423c0a360b5824e3aeee
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Mar 11 14:34:09 2026 +0100
Run LLM assessments in parallel with deterministic triage (#63319)
Start LLM assessments as background futures using ThreadPoolExecutor,
then process workflow approvals and deterministic flags while LLM runs.
Show LLM progress between interactive prompts. Present LLM-flagged PRs
to the user as they complete instead of blocking — while the user
reviews each PR, more LLM results arrive in the background.
Move workflow approval (formerly Phase 6) to run immediately after
LLM submission (Phase 4b) to maximize parallel processing time.
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../src/airflow_breeze/commands/pr_commands.py | 766 ++++++++++++++-------
1 file changed, 502 insertions(+), 264 deletions(-)
diff --git a/dev/breeze/src/airflow_breeze/commands/pr_commands.py
b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
index 13f8ce631d7..6e8bd6176fa 100644
--- a/dev/breeze/src/airflow_breeze/commands/pr_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
@@ -18,7 +18,7 @@ from __future__ import annotations
import sys
import time
-from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import click
@@ -1267,6 +1267,56 @@ def _resolve_unknown_mergeable(token: str,
github_repository: str, prs: list[PRD
return resolved
+def _llm_progress_status(completed: int, total: int, flagged: int, errors:
int) -> str:
+ """Build a one-line LLM assessment progress string for display between
prompts."""
+ if total == 0:
+ return ""
+ remaining = total - completed
+ parts = [f"{completed}/{total} done"]
+ if flagged:
+ parts.append(f"{flagged} flagged")
+ if errors:
+ parts.append(f"{errors} errors")
+ if remaining:
+ parts.append(f"{remaining} in progress")
+ return f"[dim]LLM assessment: {', '.join(parts)}[/]"
+
+
+def _collect_llm_results(
+ future_to_pr: dict,
+ llm_assessments: dict,
+ llm_completed: list[int],
+ llm_errors: list[int],
+ llm_passing: list,
+ block: bool = False,
+) -> None:
+ """Collect completed LLM futures. If block=False, only collect
already-done futures."""
+ from concurrent.futures import wait
+
+ if not future_to_pr:
+ return
+
+ if block:
+ # Wait for all remaining futures
+ pending = [f for f in future_to_pr if not f.done()]
+ if pending:
+ wait(pending)
+
+ done_futures = [f for f in future_to_pr if f.done() and f not in
llm_completed]
+ for future in done_futures:
+ llm_completed.append(future)
+ pr = future_to_pr[future]
+ assessment = future.result()
+ if assessment.error:
+ llm_errors.append(pr.number)
+ continue
+ if not assessment.should_flag:
+ llm_passing.append(pr)
+ get_console().print(f" [success]PR {_pr_link(pr)} passes LLM
quality check.[/]")
+ continue
+ llm_assessments[pr.number] = assessment
+
+
def _fetch_pr_diff(token: str, github_repository: str, pr_number: int) -> str
| None:
"""Fetch the diff for a PR via GitHub REST API. Returns the diff text or
None on failure."""
import requests
@@ -1848,9 +1898,13 @@ def auto_triage(
f"{'PRs' if len(pending_approval) != 1 else 'PR'} awaiting
workflow approval.[/]\n"
)
- # Phase 4: Run LLM assessments concurrently for PRs without CI failures
- total_llm_errors = 0
- pr_timings: dict[int, float] = {} # PR number -> assessment duration in
seconds
+ # Phase 4: Start LLM assessments in background (non-blocking) while we
process deterministic flags
+ llm_future_to_pr: dict = {}
+ llm_assessments: dict[int, PRAssessment] = {}
+ llm_completed: list = []
+ llm_errors: list[int] = []
+ llm_passing: list[PRData] = []
+ llm_executor = None
if not run_llm:
if llm_candidates:
@@ -1864,62 +1918,25 @@ def auto_triage(
if pending_approval:
skipped_detail += f", {len(pending_approval)} awaiting workflow
approval"
get_console().print(
- f"\n[info]Running LLM assessment for {len(llm_candidates)} "
- f"{'PRs' if len(llm_candidates) != 1 else 'PR'} (skipped
{skipped_detail})...[/]\n"
+ f"\n[info]Starting LLM assessment for {len(llm_candidates)} "
+ f"{'PRs' if len(llm_candidates) != 1 else 'PR'} in background "
+ f"(skipped {skipped_detail})...[/]\n"
)
- with ThreadPoolExecutor(max_workers=llm_concurrency) as executor:
- future_to_pr = {}
- future_start_times: dict[int, float] = {}
- for pr in llm_candidates:
- get_console().print(f" [dim]Assessing PR {_pr_link(pr)}:
{pr.title[:60]}...[/]")
- future_start_times[pr.number] = time.monotonic()
- future = executor.submit(
- assess_pr,
- pr_number=pr.number,
- pr_title=pr.title,
- pr_body=pr.body,
- check_status_summary=pr.check_summary,
- llm_model=llm_model,
- )
- future_to_pr[future] = pr
- for future in as_completed(future_to_pr):
- pr = future_to_pr[future]
- pr_timings[pr.number] = time.monotonic() -
future_start_times[pr.number]
- assessment = future.result()
- if assessment.error:
- total_llm_errors += 1
- continue
- if not assessment.should_flag:
- get_console().print(
- f" [success]PR {_pr_link(pr)}: {pr.title[:60]} —
passes quality check "
- f"({_fmt_duration(pr_timings[pr.number])}).[/]"
- )
- passing_prs.append(pr)
- continue
- get_console().print(
- f" [red]PR {_pr_link(pr)}: {pr.title[:60]} — flagged by
LLM "
- f"({_fmt_duration(pr_timings[pr.number])}).[/]"
- )
- assessments[pr.number] = assessment
-
- total_flagged = len(assessments)
- summary_parts = [
- f"{total_deterministic_flags} CI/conflicts/comments",
- f"{total_flagged - total_deterministic_flags} LLM-flagged",
- ]
- if pending_approval:
- summary_parts.append(f"{len(pending_approval)} awaiting workflow
approval")
- if total_llm_errors:
- summary_parts.append(f"{total_llm_errors} LLM errors")
- get_console().print(
- f"\n[info]Assessment complete: {total_flagged} {'PRs' if total_flagged
!= 1 else 'PR'} "
- f"flagged ({', '.join(summary_parts)}).[/]\n"
- )
-
- t_phase4_end = time.monotonic()
+ llm_executor = ThreadPoolExecutor(max_workers=llm_concurrency)
+ llm_future_to_pr = {
+ llm_executor.submit(
+ assess_pr,
+ pr_number=pr.number,
+ pr_title=pr.title,
+ pr_body=pr.body,
+ check_status_summary=pr.check_summary,
+ llm_model=llm_model,
+ ): pr
+ for pr in llm_candidates
+ }
- # Phase 5: Present flagged PRs interactively, grouped by author
- t_phase5_start = time.monotonic()
+ # Phase 4b: Present NOT_RUN PRs for workflow approval while LLM runs
+ total_workflows_approved = 0
total_converted = 0
total_commented = 0
total_closed = 0
@@ -1928,197 +1945,36 @@ def auto_triage(
pr_actions: dict[int, str] = {} # PR number -> action taken by user
quit_early = False
-
- # Build sorted list of flagged PRs grouped by author
- flagged_prs = [(pr, assessments[pr.number]) for pr in candidate_prs if
pr.number in assessments]
- flagged_prs.sort(key=lambda pair: (pair[0].author_login.lower(),
pair[0].number))
+ # author_flagged_count is populated below after deterministic flagged PRs
are built,
+ # but we need a preliminary version for the workflow approval phase
from collections import Counter
- author_flagged_count: dict[str, int] = dict(Counter(pr.author_login for
pr, _ in flagged_prs))
-
- current_author: str | None = None
- for pr, assessment in flagged_prs:
- if pr.author_login != current_author:
- current_author = pr.author_login
- count = author_flagged_count[current_author]
- get_console().print()
- get_console().rule(
- f"[bold]Author: {current_author}[/] ({count} flagged PR{'s' if
count != 1 else ''})",
- style="cyan",
- )
-
- # Fetch author profile for context (only for flagged PRs)
- author_profile = _fetch_author_profile(token, pr.author_login,
github_repository)
-
- comment = _build_comment(
- pr.author_login, assessment.violations, pr.number,
pr.commits_behind, pr.base_ref
- )
- comment_only = _build_comment(
- pr.author_login,
- assessment.violations,
- pr.number,
- pr.commits_behind,
- pr.base_ref,
- comment_only=True,
- )
- close_comment = _build_close_comment(
- pr.author_login,
- assessment.violations,
- pr.number,
- author_flagged_count.get(pr.author_login, 0),
- )
- _display_pr_panel(pr, author_profile, assessment, comment)
-
- default_action, reason = _compute_default_action(pr, assessment,
author_flagged_count)
- if default_action == TriageAction.CLOSE:
- get_console().print(Panel(close_comment, title="Proposed close
comment", border_style="red"))
- get_console().print(f" [bold]{reason}[/]")
-
- if dry_run:
- action_label = {
- TriageAction.DRAFT: "draft",
- TriageAction.COMMENT: "add comment",
- TriageAction.CLOSE: "close",
- TriageAction.READY: "ready",
- TriageAction.SKIP: "skip",
- }[default_action]
- get_console().print(f"[warning]Dry run — would default to:
{action_label}[/]")
- continue
-
- action = prompt_triage_action(
- f"Action for PR {_pr_link(pr)}?",
- default=default_action,
- forced_answer=answer_triage,
- )
-
- if action == TriageAction.QUIT:
- get_console().print("[warning]Quitting.[/]")
- quit_early = True
- break
-
- if action == TriageAction.SKIP:
- get_console().print(f" [info]Skipping PR {_pr_link(pr)} — no
action taken.[/]")
- total_skipped_action += 1
- pr_actions[pr.number] = "skipped"
- continue
-
- if action == TriageAction.READY:
- get_console().print(
- f" [info]Marking PR {_pr_link(pr)} as ready — adding
'{_READY_FOR_REVIEW_LABEL}' label.[/]"
- )
- if _add_label(token, github_repository, pr.node_id,
_READY_FOR_REVIEW_LABEL):
- get_console().print(
- f" [success]Label '{_READY_FOR_REVIEW_LABEL}' added to PR
{_pr_link(pr)}.[/]"
- )
- total_ready += 1
- pr_actions[pr.number] = "ready"
- else:
- get_console().print(f" [warning]Failed to add label to PR
{_pr_link(pr)}.[/]")
- continue
-
- if action == TriageAction.COMMENT:
- get_console().print(f" Posting comment on PR {_pr_link(pr)}...")
- if _post_comment(token, pr.node_id, comment_only):
- get_console().print(f" [success]Comment posted on PR
{_pr_link(pr)}.[/]")
- total_commented += 1
- pr_actions[pr.number] = "commented"
- else:
- get_console().print(f" [error]Failed to post comment on PR
{_pr_link(pr)}.[/]")
- continue
-
- if action == TriageAction.DRAFT:
- get_console().print(f" Converting PR {_pr_link(pr)} to draft...")
- if _convert_pr_to_draft(token, pr.node_id):
- get_console().print(f" [success]PR {_pr_link(pr)} converted
to draft.[/]")
- else:
- get_console().print(f" [error]Failed to convert PR
{_pr_link(pr)} to draft.[/]")
- continue
-
- get_console().print(f" Posting comment on PR {_pr_link(pr)}...")
- if _post_comment(token, pr.node_id, comment):
- get_console().print(f" [success]Comment posted on PR
{_pr_link(pr)}.[/]")
- total_converted += 1
- pr_actions[pr.number] = "drafted"
- else:
- get_console().print(f" [error]Failed to post comment on PR
{_pr_link(pr)}.[/]")
- continue
-
- if action == TriageAction.CLOSE:
- get_console().print(f" Closing PR {_pr_link(pr)}...")
- if _close_pr(token, pr.node_id):
- get_console().print(f" [success]PR {_pr_link(pr)} closed.[/]")
- else:
- get_console().print(f" [error]Failed to close PR
{_pr_link(pr)}.[/]")
- continue
-
- if _add_label(token, github_repository, pr.node_id,
_CLOSED_QUALITY_LABEL):
- get_console().print(
- f" [success]Label '{_CLOSED_QUALITY_LABEL}' added to PR
{_pr_link(pr)}.[/]"
- )
- else:
- get_console().print(f" [warning]Failed to add label to PR
{_pr_link(pr)}.[/]")
-
- get_console().print(f" Posting comment on PR {_pr_link(pr)}...")
- if _post_comment(token, pr.node_id, close_comment):
- get_console().print(f" [success]Comment posted on PR
{_pr_link(pr)}.[/]")
- total_closed += 1
- pr_actions[pr.number] = "closed"
- else:
- get_console().print(f" [error]Failed to post comment on PR
{_pr_link(pr)}.[/]")
-
- # Phase 5b: Present passing PRs for optional ready-for-review marking
- if not quit_early and passing_prs:
- passing_prs.sort(key=lambda p: (p.author_login.lower(), p.number))
- get_console().print(
- f"\n[info]{len(passing_prs)} {'PRs pass' if len(passing_prs) != 1
else 'PR passes'} "
- f"all checks — review to mark as ready:[/]\n"
- )
- for pr in passing_prs:
- author_profile = _fetch_author_profile(token, pr.author_login,
github_repository)
- _display_pr_info_panels(pr, author_profile)
-
- if dry_run:
- get_console().print("[warning]Dry run — skipping.[/]")
- continue
-
- action = prompt_triage_action(
- f"Action for PR {_pr_link(pr)}?",
- default=TriageAction.SKIP,
- forced_answer=answer_triage,
- )
-
- if action == TriageAction.QUIT:
- get_console().print("[warning]Quitting.[/]")
- quit_early = True
- break
-
- if action == TriageAction.READY:
- get_console().print(
- f" [info]Marking PR {_pr_link(pr)} as ready "
- f"— adding '{_READY_FOR_REVIEW_LABEL}' label.[/]"
- )
- if _add_label(token, github_repository, pr.node_id,
_READY_FOR_REVIEW_LABEL):
- get_console().print(
- f" [success]Label '{_READY_FOR_REVIEW_LABEL}' added
to PR {_pr_link(pr)}.[/]"
- )
- total_ready += 1
- pr_actions[pr.number] = "ready"
- else:
- get_console().print(f" [warning]Failed to add label to PR
{_pr_link(pr)}.[/]")
- else:
- get_console().print(f" [info]Skipping PR {_pr_link(pr)} — no
action taken.[/]")
- total_skipped_action += 1
- pr_actions[pr.number] = "skipped"
+ author_flagged_count: dict[str, int] = dict(
+ Counter(pr.author_login for pr in candidate_prs if pr.number in
assessments)
+ )
- # Phase 6: Present NOT_RUN PRs for workflow approval
- total_workflows_approved = 0
if not quit_early and pending_approval:
pending_approval.sort(key=lambda p: (p.author_login.lower(), p.number))
get_console().print(
f"\n[info]{len(pending_approval)} {'PRs have' if
len(pending_approval) != 1 else 'PR has'} "
- f"no test workflows run — review and approve workflow runs:[/]\n"
+ f"no test workflows run — review and approve workflow runs"
+ f"{' (LLM assessments running in background)' if llm_future_to_pr
else ''}:[/]\n"
)
for pr in pending_approval:
+ # Collect any completed LLM results and show progress
+ if llm_future_to_pr:
+ _collect_llm_results(
+ llm_future_to_pr, llm_assessments, llm_completed,
llm_errors, llm_passing
+ )
+ progress = _llm_progress_status(
+ len(llm_completed),
+ len(llm_future_to_pr),
+ len(llm_assessments),
+ len(llm_errors),
+ )
+ if progress:
+ get_console().print(progress)
+
author_profile = _fetch_author_profile(token, pr.author_login,
github_repository)
pending_runs = _find_pending_workflow_runs(token,
github_repository, pr.head_sha)
_display_workflow_approval_panel(pr, author_profile, pending_runs)
@@ -2173,7 +2029,6 @@ def auto_triage(
get_console().print(f" [error]Failed to post comment
on PR {_pr_link(pr)}.[/]")
continue
# For DRAFT or READY, fall through to normal workflow approval
- # (approve workflows first, then triage later)
if dry_run:
get_console().print("[warning]Dry run — skipping workflow
approval.[/]")
@@ -2282,6 +2137,389 @@ def auto_triage(
else:
get_console().print(f" [error]Failed to approve workflow runs
for PR {_pr_link(pr)}.[/]")
+ # Phase 5a: Present deterministically flagged PRs for interactive review
+ # LLM assessments continue running in background during this phase
+
+ # Build sorted list of deterministic-flagged PRs grouped by author
+ det_flagged_prs = [(pr, assessments[pr.number]) for pr in candidate_prs if
pr.number in assessments]
+ det_flagged_prs.sort(key=lambda pair: (pair[0].author_login.lower(),
pair[0].number))
+
+ if det_flagged_prs:
+ get_console().print(
+ f"\n[info]Reviewing {len(det_flagged_prs)} deterministically
flagged "
+ f"{'PRs' if len(det_flagged_prs) != 1 else 'PR'}"
+ f"{' (LLM assessments running in background)' if llm_future_to_pr
else ''}...[/]\n"
+ )
+
+ current_author: str | None = None
+ for pr, assessment in det_flagged_prs:
+ if pr.author_login != current_author:
+ current_author = pr.author_login
+ count = author_flagged_count[current_author]
+ get_console().print()
+ get_console().rule(
+ f"[bold]Author: {current_author}[/] ({count} flagged PR{'s' if
count != 1 else ''})",
+ style="cyan",
+ )
+
+ # Collect any completed LLM results and show progress
+ if llm_future_to_pr:
+ _collect_llm_results(llm_future_to_pr, llm_assessments,
llm_completed, llm_errors, llm_passing)
+ progress = _llm_progress_status(
+ len(llm_completed),
+ len(llm_future_to_pr),
+ len(llm_assessments),
+ len(llm_errors),
+ )
+ if progress:
+ get_console().print(progress)
+
+ # Fetch author profile for context (only for flagged PRs)
+ author_profile = _fetch_author_profile(token, pr.author_login,
github_repository)
+
+ comment = _build_comment(
+ pr.author_login, assessment.violations, pr.number,
pr.commits_behind, pr.base_ref
+ )
+ close_comment = _build_close_comment(
+ pr.author_login,
+ assessment.violations,
+ pr.number,
+ author_flagged_count.get(pr.author_login, 0),
+ )
+ _display_pr_panel(pr, author_profile, assessment, comment)
+
+ default_action, reason = _compute_default_action(pr, assessment,
author_flagged_count)
+ if default_action == TriageAction.CLOSE:
+ get_console().print(Panel(close_comment, title="Proposed close
comment", border_style="red"))
+ get_console().print(f" [bold]{reason}[/]")
+
+ if dry_run:
+ action_label = {
+ TriageAction.DRAFT: "draft",
+ TriageAction.CLOSE: "close",
+ TriageAction.READY: "ready",
+ TriageAction.SKIP: "skip",
+ }[default_action]
+ get_console().print(f"[warning]Dry run — would default to:
{action_label}[/]")
+ continue
+
+ action = prompt_triage_action(
+ f"Action for PR {_pr_link(pr)}?",
+ default=default_action,
+ forced_answer=answer_triage,
+ )
+
+ if action == TriageAction.QUIT:
+ get_console().print("[warning]Quitting.[/]")
+ quit_early = True
+ break
+
+ if action == TriageAction.SKIP:
+ get_console().print(f" [info]Skipping PR {_pr_link(pr)} — no
action taken.[/]")
+ total_skipped_action += 1
+ pr_actions[pr.number] = "skipped"
+ continue
+
+ if action == TriageAction.READY:
+ get_console().print(
+ f" [info]Marking PR {_pr_link(pr)} as ready — adding
'{_READY_FOR_REVIEW_LABEL}' label.[/]"
+ )
+ if _add_label(token, github_repository, pr.node_id,
_READY_FOR_REVIEW_LABEL):
+ get_console().print(
+ f" [success]Label '{_READY_FOR_REVIEW_LABEL}' added to PR
{_pr_link(pr)}.[/]"
+ )
+ total_ready += 1
+ pr_actions[pr.number] = "ready"
+ else:
+ get_console().print(f" [warning]Failed to add label to PR
{_pr_link(pr)}.[/]")
+ continue
+
+ if action == TriageAction.DRAFT:
+ get_console().print(f" Converting PR {_pr_link(pr)} to draft...")
+ if _convert_pr_to_draft(token, pr.node_id):
+ get_console().print(f" [success]PR {_pr_link(pr)} converted
to draft.[/]")
+ else:
+ get_console().print(f" [error]Failed to convert PR
{_pr_link(pr)} to draft.[/]")
+ continue
+
+ get_console().print(f" Posting comment on PR {_pr_link(pr)}...")
+ if _post_comment(token, pr.node_id, comment):
+ get_console().print(f" [success]Comment posted on PR
{_pr_link(pr)}.[/]")
+ total_converted += 1
+ pr_actions[pr.number] = "drafted"
+ else:
+ get_console().print(f" [error]Failed to post comment on PR
{_pr_link(pr)}.[/]")
+ continue
+
+ if action == TriageAction.CLOSE:
+ get_console().print(f" Closing PR {_pr_link(pr)}...")
+ if _close_pr(token, pr.node_id):
+ get_console().print(f" [success]PR {_pr_link(pr)} closed.[/]")
+ else:
+ get_console().print(f" [error]Failed to close PR
{_pr_link(pr)}.[/]")
+ continue
+
+ if _add_label(token, github_repository, pr.node_id,
_CLOSED_QUALITY_LABEL):
+ get_console().print(
+ f" [success]Label '{_CLOSED_QUALITY_LABEL}' added to PR
{_pr_link(pr)}.[/]"
+ )
+ else:
+ get_console().print(f" [warning]Failed to add label to PR
{_pr_link(pr)}.[/]")
+
+ get_console().print(f" Posting comment on PR {_pr_link(pr)}...")
+ if _post_comment(token, pr.node_id, close_comment):
+ get_console().print(f" [success]Comment posted on PR
{_pr_link(pr)}.[/]")
+ total_closed += 1
+ pr_actions[pr.number] = "closed"
+ else:
+ get_console().print(f" [error]Failed to post comment on PR
{_pr_link(pr)}.[/]")
+
+ # Phase 5b: Present LLM-flagged PRs as they become ready (no blocking wait)
+ total_llm_errors = 0
+ llm_presented: set[int] = set() # PR numbers already presented to user
+ if not quit_early and llm_future_to_pr:
+ # Collect whatever has completed so far
+ _collect_llm_results(llm_future_to_pr, llm_assessments, llm_completed,
llm_errors, llm_passing)
+
+ # Loop: present flagged PRs as they arrive, keep collecting new results
+ while not quit_early:
+ # Find newly flagged PRs not yet presented
+ new_flagged = [
+ (pr, llm_assessments[pr.number])
+ for pr in llm_candidates
+ if pr.number in llm_assessments and pr.number not in
llm_presented
+ ]
+ new_flagged.sort(key=lambda pair: (pair[0].author_login.lower(),
pair[0].number))
+
+ if new_flagged:
+ remaining = len(llm_future_to_pr) - len(llm_completed)
+ status_parts = [f"{len(llm_completed)}/{len(llm_future_to_pr)}
done"]
+ if remaining:
+ status_parts.append(f"{remaining} still running")
+ get_console().print(
+ f"\n[info]{len(new_flagged)} new LLM-flagged "
+ f"{'PRs' if len(new_flagged) != 1 else 'PR'} ready for
review "
+ f"({', '.join(status_parts)}):[/]\n"
+ )
+
+ for pr, assessment in new_flagged:
+ llm_presented.add(pr.number)
+ author_flagged_count[pr.author_login] =
author_flagged_count.get(pr.author_login, 0) + 1
+
+ author_profile = _fetch_author_profile(token,
pr.author_login, github_repository)
+
+ comment = _build_comment(
+ pr.author_login, assessment.violations, pr.number,
pr.commits_behind, pr.base_ref
+ )
+ comment_only = _build_comment(
+ pr.author_login,
+ assessment.violations,
+ pr.number,
+ pr.commits_behind,
+ pr.base_ref,
+ comment_only=True,
+ )
+ close_comment = _build_close_comment(
+ pr.author_login,
+ assessment.violations,
+ pr.number,
+ author_flagged_count.get(pr.author_login, 0),
+ )
+ _display_pr_panel(pr, author_profile, assessment, comment)
+
+ default_action, reason = _compute_default_action(pr,
assessment, author_flagged_count)
+ if default_action == TriageAction.CLOSE:
+ get_console().print(
+ Panel(close_comment, title="Proposed close
comment", border_style="red")
+ )
+ get_console().print(f" [bold]{reason}[/]")
+
+ if dry_run:
+ action_label = {
+ TriageAction.DRAFT: "draft",
+ TriageAction.COMMENT: "add comment",
+ TriageAction.CLOSE: "close",
+ TriageAction.READY: "ready",
+ TriageAction.SKIP: "skip",
+ }[default_action]
+ get_console().print(f"[warning]Dry run — would default
to: {action_label}[/]")
+ # Collect more results while in dry-run
+ _collect_llm_results(
+ llm_future_to_pr, llm_assessments, llm_completed,
llm_errors, llm_passing
+ )
+ continue
+
+ action = prompt_triage_action(
+ f"Action for PR {_pr_link(pr)}?",
+ default=default_action,
+ forced_answer=answer_triage,
+ )
+
+ # While user was deciding, more results may have arrived
+ _collect_llm_results(
+ llm_future_to_pr, llm_assessments, llm_completed,
llm_errors, llm_passing
+ )
+
+ if action == TriageAction.QUIT:
+ get_console().print("[warning]Quitting.[/]")
+ quit_early = True
+ break
+
+ if action == TriageAction.SKIP:
+ get_console().print(f" [info]Skipping PR
{_pr_link(pr)} — no action taken.[/]")
+ total_skipped_action += 1
+ continue
+
+ if action == TriageAction.READY:
+ get_console().print(
+ f" [info]Marking PR {_pr_link(pr)} as ready "
+ f"— adding '{_READY_FOR_REVIEW_LABEL}' label.[/]"
+ )
+ if _add_label(token, github_repository, pr.node_id,
_READY_FOR_REVIEW_LABEL):
+ get_console().print(
+ f" [success]Label '{_READY_FOR_REVIEW_LABEL}'
added to PR {_pr_link(pr)}.[/]"
+ )
+ total_ready += 1
+ else:
+ get_console().print(f" [warning]Failed to add
label to PR {_pr_link(pr)}.[/]")
+ continue
+
+ if action == TriageAction.COMMENT:
+ get_console().print(f" Posting comment on PR
{_pr_link(pr)}...")
+ if _post_comment(token, pr.node_id, comment_only):
+ get_console().print(f" [success]Comment posted on
PR {_pr_link(pr)}.[/]")
+ total_commented += 1
+ else:
+ get_console().print(f" [error]Failed to post
comment on PR {_pr_link(pr)}.[/]")
+ continue
+
+ if action == TriageAction.DRAFT:
+ get_console().print(f" Converting PR {_pr_link(pr)}
to draft...")
+ if _convert_pr_to_draft(token, pr.node_id):
+ get_console().print(f" [success]PR {_pr_link(pr)}
converted to draft.[/]")
+ else:
+ get_console().print(f" [error]Failed to convert
PR {_pr_link(pr)} to draft.[/]")
+ continue
+
+ get_console().print(f" Posting comment on PR
{_pr_link(pr)}...")
+ if _post_comment(token, pr.node_id, comment):
+ get_console().print(f" [success]Comment posted on
PR {_pr_link(pr)}.[/]")
+ total_converted += 1
+ else:
+ get_console().print(f" [error]Failed to post
comment on PR {_pr_link(pr)}.[/]")
+ continue
+
+ if action == TriageAction.CLOSE:
+ get_console().print(f" Closing PR {_pr_link(pr)}...")
+ if _close_pr(token, pr.node_id):
+ get_console().print(f" [success]PR {_pr_link(pr)}
closed.[/]")
+ else:
+ get_console().print(f" [error]Failed to close PR
{_pr_link(pr)}.[/]")
+ continue
+
+ if _add_label(token, github_repository, pr.node_id,
_CLOSED_QUALITY_LABEL):
+ get_console().print(
+ f" [success]Label '{_CLOSED_QUALITY_LABEL}'
added to PR {_pr_link(pr)}.[/]"
+ )
+ else:
+ get_console().print(f" [warning]Failed to add
label to PR {_pr_link(pr)}.[/]")
+
+ get_console().print(f" Posting comment on PR
{_pr_link(pr)}...")
+ if _post_comment(token, pr.node_id, close_comment):
+ get_console().print(f" [success]Comment posted on
PR {_pr_link(pr)}.[/]")
+ total_closed += 1
+ else:
+ get_console().print(f" [error]Failed to post
comment on PR {_pr_link(pr)}.[/]")
+
+ # Check if all futures are done
+ if len(llm_completed) >= len(llm_future_to_pr):
+ break
+
+ # Still pending — wait briefly for more results, then loop back
+ get_console().print(
+ f"[dim]Waiting for {len(llm_future_to_pr) -
len(llm_completed)} "
+ f"remaining LLM {'assessments' if len(llm_future_to_pr) -
len(llm_completed) != 1 else 'assessment'}...[/]"
+ )
+ time.sleep(2)
+ _collect_llm_results(llm_future_to_pr, llm_assessments,
llm_completed, llm_errors, llm_passing)
+
+ total_llm_errors = len(llm_errors)
+ get_console().print(
+ f"\n[info]LLM assessment complete: {len(llm_assessments)} flagged,
"
+ f"{len(llm_passing)} passed, {total_llm_errors} errors "
+ f"(out of {len(llm_future_to_pr)} assessed).[/]\n"
+ )
+
+ # Add LLM passing PRs to the passing list
+ passing_prs.extend(llm_passing)
+
+ # Phase 5c: Present passing PRs for optional ready-for-review marking
+ if not quit_early and passing_prs:
+ passing_prs.sort(key=lambda p: (p.author_login.lower(), p.number))
+ get_console().print(
+ f"\n[info]{len(passing_prs)} {'PRs pass' if len(passing_prs) != 1
else 'PR passes'} "
+ f"all checks — review to mark as ready:[/]\n"
+ )
+ for pr in passing_prs:
+ author_profile = _fetch_author_profile(token, pr.author_login,
github_repository)
+ _display_pr_info_panels(pr, author_profile)
+
+ if dry_run:
+ get_console().print("[warning]Dry run — skipping.[/]")
+ continue
+
+ action = prompt_triage_action(
+ f"Action for PR {_pr_link(pr)}?",
+ default=TriageAction.SKIP,
+ forced_answer=answer_triage,
+ )
+
+ if action == TriageAction.QUIT:
+ get_console().print("[warning]Quitting.[/]")
+ quit_early = True
+ break
+
+ if action == TriageAction.SKIP:
+ get_console().print(f" [info]Skipping PR {_pr_link(pr)} — no
action taken.[/]")
+ total_skipped_action += 1
+ pr_actions[pr.number] = "skipped"
+ continue
+
+ if action == TriageAction.READY:
+ get_console().print(
+ f" [info]Marking PR {_pr_link(pr)} as ready "
+ f"— adding '{_READY_FOR_REVIEW_LABEL}' label.[/]"
+ )
+ if _add_label(token, github_repository, pr.node_id,
_READY_FOR_REVIEW_LABEL):
+ get_console().print(
+ f" [success]Label '{_READY_FOR_REVIEW_LABEL}' added
to PR {_pr_link(pr)}.[/]"
+ )
+ total_ready += 1
+ pr_actions[pr.number] = "ready"
+ else:
+ get_console().print(f" [warning]Failed to add label to PR
{_pr_link(pr)}.[/]")
+ else:
+ get_console().print(f" [info]Skipping PR {_pr_link(pr)} — no
action taken.[/]")
+ total_skipped_action += 1
+ pr_actions[pr.number] = "skipped"
+
+ # Shut down LLM executor if it was started
+ if llm_executor is not None:
+ llm_executor.shutdown(wait=False, cancel_futures=True)
+
+ # Combine flagged counts for summary
+ total_flagged = total_deterministic_flags + len(llm_assessments)
+ total_llm_flagged = len(llm_assessments)
+
+ get_console().print(
+ f"\n[info]Assessment complete: {total_flagged} {'PRs' if total_flagged
!= 1 else 'PR'} "
+ f"flagged ({total_deterministic_flags} CI/conflicts/comments, "
+ f"{total_llm_flagged} LLM-flagged"
+ f"{f', {total_llm_errors} LLM errors' if total_llm_errors else ''}"
+ f"{f', {len(pending_approval)} awaiting workflow approval' if
pending_approval else ''}"
+ f").[/]\n"
+ )
+
# Summary
t_total_end = time.monotonic()
get_console().print()
@@ -2297,7 +2535,7 @@ def auto_triage(
summary_table.add_row("PRs skipped (filtered)", str(total_skipped))
summary_table.add_row("PRs assessed", str(len(candidate_prs)))
summary_table.add_row("Flagged by CI/conflicts/comments",
str(total_deterministic_flags))
- summary_table.add_row("Flagged by LLM", str(total_flagged -
total_deterministic_flags))
+ summary_table.add_row("Flagged by LLM", str(total_llm_flagged))
summary_table.add_row("LLM errors (skipped)", str(total_llm_errors))
summary_table.add_row("Total flagged", str(total_flagged))
summary_table.add_row("PRs passing all checks", str(len(passing_prs)))
@@ -2367,23 +2605,25 @@ def auto_triage(
else:
timing_table.add_row("Deterministic triage", "[dim]—[/]", "0",
"[dim]—[/]", "[dim]—[/]", "[dim]—[/]")
- if pr_timings:
- llm_values = list(pr_timings.values())
- llm_total = sum(llm_values)
+ llm_count = len(llm_completed)
+ llm_wall_time = t_total_end - t_phase2c_end # LLM runs in background
across all interactive phases
+ if llm_count:
timing_table.add_row(
- "LLM assessment",
- _fmt_duration(t_phase4_end - t_phase2c_end),
- str(len(llm_values)),
- _fmt_duration(llm_total / len(llm_values)),
- _fmt_duration(min(llm_values)),
- _fmt_duration(max(llm_values)),
+ "LLM assessment (background)",
+ _fmt_duration(llm_wall_time),
+ str(llm_count),
+ "[dim]—[/]",
+ "[dim]—[/]",
+ "[dim]—[/]",
)
else:
- timing_table.add_row("LLM assessment", "[dim]—[/]", "0", "[dim]—[/]",
"[dim]—[/]", "[dim]—[/]")
+ timing_table.add_row(
+ "LLM assessment (background)", "[dim]—[/]", "0", "[dim]—[/]",
"[dim]—[/]", "[dim]—[/]"
+ )
timing_table.add_row(
- "Interactive review",
- _fmt_duration(t_total_end - t_phase5_start),
+ "Interactive review (overlaps LLM)",
+ _fmt_duration(t_total_end - t_phase2c_end),
"",
"",
"",
@@ -2400,7 +2640,7 @@ def auto_triage(
)
get_console().print(timing_table)
- if deterministic_timings or pr_timings:
+ if deterministic_timings:
pr_titles = {pr.number: pr.title for pr in candidate_prs}
# Amortize batch fetch time evenly across candidate PRs
num_candidates = len(candidate_prs) or 1
@@ -2423,24 +2663,23 @@ def auto_triage(
pr_timing_table.add_column("Action")
pr_timing_table.add_column("Fetch (avg)", justify="right")
pr_timing_table.add_column("Deterministic", justify="right")
- pr_timing_table.add_column("LLM", justify="right")
pr_timing_table.add_column("Total", justify="right")
- # Collect all PR numbers that went through any phase
all_pr_numbers = sorted(
- deterministic_timings.keys() | pr_timings.keys(),
- key=lambda n: deterministic_timings.get(n, 0) + pr_timings.get(n,
0) + fetch_per_pr,
+ deterministic_timings.keys(),
+ key=lambda n: deterministic_timings.get(n, 0) + fetch_per_pr,
reverse=True,
)
for pr_num in all_pr_numbers:
title = pr_titles.get(pr_num, "")[:60]
det_time = deterministic_timings.get(pr_num, 0)
- llm_time = pr_timings.get(pr_num, 0)
- total_time = fetch_per_pr + det_time + llm_time
+ total_time = fetch_per_pr + det_time
- if pr_num in assessments:
+ if pr_num in assessments or pr_num in llm_assessments:
result = "[red]flagged[/]"
- elif any(pr.number == pr_num for pr in passing_prs):
+ elif any(pr.number == pr_num for pr in passing_prs) or any(
+ pr.number == pr_num for pr in llm_passing
+ ):
result = "[success]passed[/]"
elif any(pr.number == pr_num for pr in pending_approval):
result = "[dim]pending[/]"
@@ -2457,7 +2696,6 @@ def auto_triage(
action_display,
_fmt_duration(fetch_per_pr),
_fmt_duration(det_time) if det_time else "[dim]—[/]",
- _fmt_duration(llm_time) if llm_time else "[dim]—[/]",
_fmt_duration(total_time),
)
get_console().print(pr_timing_table)