potiuk commented on code in PR #63319:
URL: https://github.com/apache/airflow/pull/63319#discussion_r2918196547
##########
dev/breeze/src/airflow_breeze/commands/pr_commands.py:
##########
@@ -1836,420 +2465,76 @@ def auto_triage(
if pending_approval:
skipped_detail += f", {len(pending_approval)} awaiting workflow
approval"
get_console().print(
- f"\n[info]Running LLM assessment for {len(llm_candidates)} "
- f"{'PRs' if len(llm_candidates) != 1 else 'PR'} (skipped
{skipped_detail})...[/]\n"
+ f"\n[info]Starting LLM assessment for {len(llm_candidates)} "
+ f"{'PRs' if len(llm_candidates) != 1 else 'PR'} in background "
+ f"(skipped {skipped_detail})...[/]\n"
)
- with ThreadPoolExecutor(max_workers=llm_concurrency) as executor:
- future_to_pr = {
- executor.submit(
- assess_pr,
- pr_number=pr.number,
- pr_title=pr.title,
- pr_body=pr.body,
- check_status_summary=pr.check_summary,
- llm_model=llm_model,
- ): pr
- for pr in llm_candidates
- }
- for future in as_completed(future_to_pr):
- pr = future_to_pr[future]
- assessment = future.result()
- if assessment.error:
- total_llm_errors += 1
- continue
- if not assessment.should_flag:
- get_console().print(f" [success]PR {_pr_link(pr)} passes
quality check.[/]")
- passing_prs.append(pr)
- continue
- assessments[pr.number] = assessment
-
- total_flagged = len(assessments)
- summary_parts = [
- f"{total_deterministic_flags} CI/conflicts/comments",
- f"{total_flagged - total_deterministic_flags} LLM-flagged",
- ]
- if pending_approval:
- summary_parts.append(f"{len(pending_approval)} awaiting workflow
approval")
- if total_llm_errors:
- summary_parts.append(f"{total_llm_errors} LLM errors")
- get_console().print(
- f"\n[info]Assessment complete: {total_flagged} {'PRs' if total_flagged
!= 1 else 'PR'} "
- f"flagged ({', '.join(summary_parts)}).[/]\n"
- )
+ llm_executor = ThreadPoolExecutor(max_workers=llm_concurrency)
+ llm_future_to_pr = {
+ llm_executor.submit(
+ assess_pr,
+ pr_number=pr.number,
+ pr_title=pr.title,
+ pr_body=pr.body,
+ check_status_summary=pr.check_summary,
+ llm_model=llm_model,
+ ): pr
+ for pr in llm_candidates
+ }
- # Phase 5: Present flagged PRs interactively, grouped by author
- total_converted = 0
- total_commented = 0
- total_closed = 0
- total_ready = 0
- total_skipped_action = 0
- quit_early = False
-
- # Build sorted list of flagged PRs grouped by author
- flagged_prs = [(pr, assessments[pr.number]) for pr in candidate_prs if
pr.number in assessments]
- flagged_prs.sort(key=lambda pair: (pair[0].author_login.lower(),
pair[0].number))
+ # Build shared triage context and stats
from collections import Counter
- author_flagged_count: dict[str, int] = dict(Counter(pr.author_login for
pr, _ in flagged_prs))
-
- current_author: str | None = None
- for pr, assessment in flagged_prs:
- if pr.author_login != current_author:
- current_author = pr.author_login
- count = author_flagged_count[current_author]
- get_console().print()
- get_console().rule(
- f"[bold]Author: {current_author}[/] ({count} flagged PR{'s' if
count != 1 else ''})",
- style="cyan",
- )
-
- # Fetch author profile for context (only for flagged PRs)
- author_profile = _fetch_author_profile(token, pr.author_login,
github_repository)
-
- comment = _build_comment(
- pr.author_login, assessment.violations, pr.number,
pr.commits_behind, pr.base_ref
- )
- comment_only = _build_comment(
- pr.author_login,
- assessment.violations,
- pr.number,
- pr.commits_behind,
- pr.base_ref,
- comment_only=True,
- )
- close_comment = _build_close_comment(
- pr.author_login,
- assessment.violations,
- pr.number,
- author_flagged_count.get(pr.author_login, 0),
- )
- _display_pr_panel(pr, author_profile, assessment, comment)
-
- default_action, reason = _compute_default_action(pr, assessment,
author_flagged_count)
- if default_action == TriageAction.CLOSE:
- get_console().print(Panel(close_comment, title="Proposed close
comment", border_style="red"))
- get_console().print(f" [bold]{reason}[/]")
-
- if dry_run:
- action_label = {
- TriageAction.DRAFT: "draft",
- TriageAction.COMMENT: "add comment",
- TriageAction.CLOSE: "close",
- TriageAction.READY: "ready",
- TriageAction.SKIP: "skip",
- }[default_action]
- get_console().print(f"[warning]Dry run — would default to:
{action_label}[/]")
- continue
-
- action = prompt_triage_action(
- f"Action for PR {_pr_link(pr)}?",
- default=default_action,
- forced_answer=answer_triage,
- )
-
- if action == TriageAction.QUIT:
- get_console().print("[warning]Quitting.[/]")
- quit_early = True
- break
-
- if action == TriageAction.SKIP:
- get_console().print(f" [info]Skipping PR {_pr_link(pr)} — no
action taken.[/]")
- total_skipped_action += 1
- continue
-
- if action == TriageAction.READY:
- get_console().print(
- f" [info]Marking PR {_pr_link(pr)} as ready — adding
'{_READY_FOR_REVIEW_LABEL}' label.[/]"
- )
- if _add_label(token, github_repository, pr.node_id,
_READY_FOR_REVIEW_LABEL):
- get_console().print(
- f" [success]Label '{_READY_FOR_REVIEW_LABEL}' added to PR
{_pr_link(pr)}.[/]"
- )
- total_ready += 1
- else:
- get_console().print(f" [warning]Failed to add label to PR
{_pr_link(pr)}.[/]")
- continue
-
- if action == TriageAction.COMMENT:
- get_console().print(f" Posting comment on PR {_pr_link(pr)}...")
- if _post_comment(token, pr.node_id, comment_only):
- get_console().print(f" [success]Comment posted on PR
{_pr_link(pr)}.[/]")
- total_commented += 1
- else:
- get_console().print(f" [error]Failed to post comment on PR
{_pr_link(pr)}.[/]")
- continue
-
- if action == TriageAction.DRAFT:
- get_console().print(f" Converting PR {_pr_link(pr)} to draft...")
- if _convert_pr_to_draft(token, pr.node_id):
- get_console().print(f" [success]PR {_pr_link(pr)} converted
to draft.[/]")
- else:
- get_console().print(f" [error]Failed to convert PR
{_pr_link(pr)} to draft.[/]")
- continue
-
- get_console().print(f" Posting comment on PR {_pr_link(pr)}...")
- if _post_comment(token, pr.node_id, comment):
- get_console().print(f" [success]Comment posted on PR
{_pr_link(pr)}.[/]")
- total_converted += 1
- else:
- get_console().print(f" [error]Failed to post comment on PR
{_pr_link(pr)}.[/]")
- continue
-
- if action == TriageAction.CLOSE:
- get_console().print(f" Closing PR {_pr_link(pr)}...")
- if _close_pr(token, pr.node_id):
- get_console().print(f" [success]PR {_pr_link(pr)} closed.[/]")
- else:
- get_console().print(f" [error]Failed to close PR
{_pr_link(pr)}.[/]")
- continue
-
- if _add_label(token, github_repository, pr.node_id,
_CLOSED_QUALITY_LABEL):
- get_console().print(
- f" [success]Label '{_CLOSED_QUALITY_LABEL}' added to PR
{_pr_link(pr)}.[/]"
- )
- else:
- get_console().print(f" [warning]Failed to add label to PR
{_pr_link(pr)}.[/]")
-
- get_console().print(f" Posting comment on PR {_pr_link(pr)}...")
- if _post_comment(token, pr.node_id, close_comment):
- get_console().print(f" [success]Comment posted on PR
{_pr_link(pr)}.[/]")
- total_closed += 1
- else:
- get_console().print(f" [error]Failed to post comment on PR
{_pr_link(pr)}.[/]")
-
- # Phase 5b: Present passing PRs for optional ready-for-review marking
- if not quit_early and passing_prs:
- passing_prs.sort(key=lambda p: (p.author_login.lower(), p.number))
- get_console().print(
- f"\n[info]{len(passing_prs)} {'PRs pass' if len(passing_prs) != 1
else 'PR passes'} "
- f"all checks — review to mark as ready:[/]\n"
- )
- for pr in passing_prs:
- author_profile = _fetch_author_profile(token, pr.author_login,
github_repository)
- _display_pr_info_panels(pr, author_profile)
-
- if dry_run:
- get_console().print("[warning]Dry run — skipping.[/]")
- continue
-
- action = prompt_triage_action(
- f"Action for PR {_pr_link(pr)}?",
- default=TriageAction.SKIP,
- forced_answer=answer_triage,
- )
-
- if action == TriageAction.QUIT:
- get_console().print("[warning]Quitting.[/]")
- quit_early = True
- break
-
- if action == TriageAction.READY:
- get_console().print(
- f" [info]Marking PR {_pr_link(pr)} as ready "
- f"— adding '{_READY_FOR_REVIEW_LABEL}' label.[/]"
- )
- if _add_label(token, github_repository, pr.node_id,
_READY_FOR_REVIEW_LABEL):
- get_console().print(
- f" [success]Label '{_READY_FOR_REVIEW_LABEL}' added
to PR {_pr_link(pr)}.[/]"
- )
- total_ready += 1
- else:
- get_console().print(f" [warning]Failed to add label to PR
{_pr_link(pr)}.[/]")
- else:
- get_console().print(f" [info]Skipping PR {_pr_link(pr)} — no
action taken.[/]")
- total_skipped_action += 1
-
- # Phase 6: Present NOT_RUN PRs for workflow approval
- total_workflows_approved = 0
- if not quit_early and pending_approval:
- pending_approval.sort(key=lambda p: (p.author_login.lower(), p.number))
- get_console().print(
- f"\n[info]{len(pending_approval)} {'PRs have' if
len(pending_approval) != 1 else 'PR has'} "
- f"no test workflows run — review and approve workflow runs:[/]\n"
- )
- for pr in pending_approval:
- author_profile = _fetch_author_profile(token, pr.author_login,
github_repository)
- pending_runs = _find_pending_workflow_runs(token,
github_repository, pr.head_sha)
- _display_workflow_approval_panel(pr, author_profile, pending_runs)
-
- # If author exceeds the close threshold, suggest closing instead
of approving
- author_count = author_flagged_count.get(pr.author_login, 0)
- if author_count > 3:
- get_console().print(
- f" [bold red]Author {pr.author_login} has {author_count}
flagged "
- f"{'PRs' if author_count != 1 else 'PR'} "
- f"— suggesting close instead of workflow approval.[/]"
- )
- close_comment = _build_close_comment(pr.author_login, [],
pr.number, author_count)
- get_console().print(Panel(close_comment, title="Proposed close
comment", border_style="red"))
-
- if dry_run:
- get_console().print("[warning]Dry run — would default to:
close[/]")
- continue
-
- action = prompt_triage_action(
- f"Action for PR {_pr_link(pr)}?",
- default=TriageAction.CLOSE,
- forced_answer=answer_triage,
- )
- if action == TriageAction.QUIT:
- get_console().print("[warning]Quitting.[/]")
- quit_early = True
- break
- if action == TriageAction.SKIP:
- get_console().print(f" [info]Skipping PR {_pr_link(pr)} —
no action taken.[/]")
- continue
- if action == TriageAction.CLOSE:
- get_console().print(f" Closing PR {_pr_link(pr)}...")
- if _close_pr(token, pr.node_id):
- get_console().print(f" [success]PR {_pr_link(pr)}
closed.[/]")
- else:
- get_console().print(f" [error]Failed to close PR
{_pr_link(pr)}.[/]")
- continue
- if _add_label(token, github_repository, pr.node_id,
_CLOSED_QUALITY_LABEL):
- get_console().print(
- f" [success]Label '{_CLOSED_QUALITY_LABEL}' added
to PR {_pr_link(pr)}.[/]"
- )
- else:
- get_console().print(f" [warning]Failed to add label
to PR {_pr_link(pr)}.[/]")
- get_console().print(f" Posting comment on PR
{_pr_link(pr)}...")
- if _post_comment(token, pr.node_id, close_comment):
- get_console().print(f" [success]Comment posted on PR
{_pr_link(pr)}.[/]")
- total_closed += 1
- else:
- get_console().print(f" [error]Failed to post comment
on PR {_pr_link(pr)}.[/]")
- continue
- # For DRAFT or READY, fall through to normal workflow approval
- # (approve workflows first, then triage later)
-
- if dry_run:
- get_console().print("[warning]Dry run — skipping workflow
approval.[/]")
- continue
-
- if not pending_runs:
- get_console().print(
- f" [dim]No pending workflow runs found for PR
{_pr_link(pr)}. "
- f"Workflows may need to be triggered manually.[/]"
- )
- continue
-
- answer = user_confirm(
- f"Review diff for PR {_pr_link(pr)} before approving
workflows?",
- forced_answer=answer_triage,
- )
- if answer == Answer.QUIT:
- get_console().print("[warning]Quitting.[/]")
- quit_early = True
- break
- if answer == Answer.NO:
- get_console().print(f" [info]Skipping workflow approval for
PR {_pr_link(pr)}.[/]")
- continue
-
- get_console().print(f" Fetching diff for PR {_pr_link(pr)}...")
- diff_text = _fetch_pr_diff(token, github_repository, pr.number)
- if diff_text:
- from rich.syntax import Syntax
-
- get_console().print(
- Panel(
- Syntax(diff_text, "diff", theme="monokai",
word_wrap=True),
- title=f"Diff for PR {_pr_link(pr)}",
- border_style="bright_cyan",
- )
- )
- else:
- get_console().print(
- f" [warning]Could not fetch diff for PR {_pr_link(pr)}. "
- f"Review manually at: {pr.url}/files[/]"
- )
-
- answer = user_confirm(
- f"No suspicious changes found in PR {_pr_link(pr)}? "
- f"Approve {len(pending_runs)} workflow {'runs' if
len(pending_runs) != 1 else 'run'}?",
- forced_answer=answer_triage,
- )
- if answer == Answer.QUIT:
- get_console().print("[warning]Quitting.[/]")
- quit_early = True
- break
- if answer == Answer.NO:
- get_console().print(
- f"\n [bold red]Suspicious changes detected in PR
{_pr_link(pr)} by {pr.author_login}.[/]"
- )
- get_console().print(f" Fetching all open PRs by
{pr.author_login}...")
- author_prs = _fetch_author_open_prs(token, github_repository,
pr.author_login)
- if not author_prs:
- get_console().print(f" [dim]No open PRs found for
{pr.author_login}.[/]")
- continue
-
- get_console().print()
- get_console().print(
- f" [bold red]The following {len(author_prs)} "
- f"{'PRs' if len(author_prs) != 1 else 'PR'} by "
- f"{pr.author_login} will be closed, labeled "
- f"'{_SUSPICIOUS_CHANGES_LABEL}', and commented:[/]"
- )
- for pr_info in author_prs:
- get_console().print(
- f" -
[link={pr_info['url']}]#{pr_info['number']}[/link] {pr_info['title']}"
- )
- get_console().print()
-
- confirm = user_confirm(
- f"Close all {len(author_prs)} {'PRs' if len(author_prs) !=
1 else 'PR'} "
- f"by {pr.author_login} and label as suspicious?",
- forced_answer=answer_triage,
- )
- if confirm == Answer.QUIT:
- get_console().print("[warning]Quitting.[/]")
- quit_early = True
- break
- if confirm == Answer.NO:
- get_console().print(f" [info]Skipping — no PRs closed for
{pr.author_login}.[/]")
- continue
-
- closed, commented = _close_suspicious_prs(token,
github_repository, author_prs, pr.number)
- get_console().print(
- f" [success]Closed {closed}/{len(author_prs)} "
- f"{'PRs' if len(author_prs) != 1 else 'PR'}, commented on
{commented}.[/]"
- )
- total_closed += closed
- continue
-
- approved = _approve_workflow_runs(token, github_repository,
pending_runs)
- if approved:
- get_console().print(
- f" [success]Approved {approved}/{len(pending_runs)}
workflow "
- f"{'runs' if len(pending_runs) != 1 else 'run'} for PR "
- f"{_pr_link(pr)}.[/]"
- )
- total_workflows_approved += 1
- else:
- get_console().print(f" [error]Failed to approve workflow runs
for PR {_pr_link(pr)}.[/]")
+ author_flagged_count: dict[str, int] = dict(
+ Counter(pr.author_login for pr in candidate_prs if pr.number in
assessments)
+ )
+ stats = TriageStats()
+ ctx = TriageContext(
+ token=token,
+ github_repository=github_repository,
+ dry_run=dry_run,
+ answer_triage=answer_triage,
+ stats=stats,
+ author_flagged_count=author_flagged_count,
+ llm_future_to_pr=llm_future_to_pr,
+ llm_assessments=llm_assessments,
+ llm_completed=llm_completed,
+ llm_errors=llm_errors,
+ llm_passing=llm_passing,
+ )
- # Summary
- get_console().print()
- summary_table = Table(title="Summary")
- summary_table.add_column("Metric", style="bold")
- summary_table.add_column("Count", justify="right")
- total_skipped = total_skipped_collaborator + total_skipped_bot +
total_skipped_accepted
- summary_table.add_row("PRs fetched", str(len(all_prs)))
- if verbose:
- summary_table.add_row("Collaborators skipped",
str(total_skipped_collaborator))
- summary_table.add_row("Bots skipped", str(total_skipped_bot))
- summary_table.add_row("Ready-for-review skipped",
str(total_skipped_accepted))
- summary_table.add_row("PRs skipped (filtered)", str(total_skipped))
- summary_table.add_row("PRs assessed", str(len(candidate_prs)))
- summary_table.add_row("Flagged by CI/conflicts/comments",
str(total_deterministic_flags))
- summary_table.add_row("Flagged by LLM", str(total_flagged -
total_deterministic_flags))
- summary_table.add_row("LLM errors (skipped)", str(total_llm_errors))
- summary_table.add_row("Total flagged", str(total_flagged))
- summary_table.add_row("PRs passing all checks", str(len(passing_prs)))
- summary_table.add_row("PRs converted to draft", str(total_converted))
- summary_table.add_row("PRs commented (not drafted)", str(total_commented))
- summary_table.add_row("PRs closed", str(total_closed))
- summary_table.add_row("PRs marked ready for review", str(total_ready))
- summary_table.add_row("PRs skipped (no action)", str(total_skipped_action))
- summary_table.add_row("Awaiting workflow approval",
str(len(pending_approval)))
- summary_table.add_row("PRs with workflows approved",
str(total_workflows_approved))
- get_console().print(summary_table)
+ # Phase 4b: Present NOT_RUN PRs for workflow approval (LLM runs in
background)
+ _review_workflow_approval_prs(ctx, pending_approval)
+
+ # Phase 5a: Present deterministically flagged PRs
+ det_flagged_prs = [(pr, assessments[pr.number]) for pr in candidate_prs if
pr.number in assessments]
+ det_flagged_prs.sort(key=lambda pair: (pair[0].author_login.lower(),
pair[0].number))
+ _review_deterministic_flagged_prs(ctx, det_flagged_prs)
+
+ # Phase 5b: Present LLM-flagged PRs as they become ready (streaming)
+ _review_llm_flagged_prs(ctx, llm_candidates)
+
+ # Add LLM passing PRs to the passing list
+ passing_prs.extend(llm_passing)
+
+ # Phase 5c: Present passing PRs for optional ready-for-review marking
+ _review_passing_prs(ctx, passing_prs)
+
+ # Shut down LLM executor if it was started
+ if llm_executor is not None:
+ llm_executor.shutdown(wait=False)
Review Comment:
Good point
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]