potiuk commented on code in PR #62682: URL: https://github.com/apache/airflow/pull/62682#discussion_r2885577585
########## dev/breeze/src/airflow_breeze/commands/pr_commands.py: ########## @@ -0,0 +1,1711 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass + +import click +from rich.panel import Panel +from rich.table import Table + +from airflow_breeze.commands.common_options import ( + option_answer, + option_dry_run, + option_github_repository, + option_github_token, + option_llm_model, + option_verbose, +) +from airflow_breeze.utils.click_utils import BreezeGroup +from airflow_breeze.utils.confirm import Answer, TriageAction, prompt_triage_action, user_confirm +from airflow_breeze.utils.console import get_console +from airflow_breeze.utils.custom_param_types import NotVerifiedBetterChoice +from airflow_breeze.utils.run_utils import run_command +from airflow_breeze.utils.shared_options import get_dry_run, get_verbose + +QUALITY_CRITERIA_LINK = ( + "[Pull Request quality criteria](https://github.com/apache/airflow/blob/main/" + "contributing-docs/05_pull_requests.rst#pull-request-quality-criteria)" +) + +# authorAssociation values that indicate the author has write access +_COLLABORATOR_ASSOCIATIONS = {"COLLABORATOR", "MEMBER", "OWNER"} + +# Label applied when a maintainer marks a flagged PR as ready for review +_READY_FOR_REVIEW_LABEL = "ready for maintainer review" + +# Label applied when a PR is closed due to multiple quality violations +_CLOSED_QUALITY_LABEL = "closed because of multiple quality violations" + +# Label applied when a PR is closed due to suspicious changes +_SUSPICIOUS_CHANGES_LABEL = "suspicious changes detected" + +# GitHub accounts that should be auto-skipped during triage +_BOT_ACCOUNT_LOGINS = {"dependabot", "dependabot[bot]", "renovate[bot]", "github-actions[bot]"} + +_SEARCH_PRS_QUERY = """ +query($query: String!, $first: Int!, $after: String) { + search(query: $query, type: ISSUE, first: $first, after: $after) { + issueCount + pageInfo { + hasNextPage + endCursor + } + nodes { + ... on PullRequest { + number + title + body + url + createdAt + updatedAt + id + author { login } + authorAssociation + baseRefName + mergeable + labels(first: 20) { + nodes { name } + } + commits(last: 1) { + nodes { + commit { + oid + statusCheckRollup { + state + } + } + } + } + } + } + } +} +""" + +_CHECK_CONTEXTS_QUERY = """ +query($owner: String!, $repo: String!, $oid: GitObjectID!, $first: Int!, $after: String) { + repository(owner: $owner, name: $repo) { + object(oid: $oid) { + ... on Commit { + statusCheckRollup { + contexts(first: $first, after: $after) { + totalCount + pageInfo { + hasNextPage + endCursor + } + nodes { + ... on CheckRun { + __typename + name + conclusion + status + } + ... on StatusContext { + __typename + context + state + } + } + } + } + } + } + } +} +""" + +_AUTHOR_PROFILE_QUERY = """ +query( + $login: String!, + $repoAll: String!, $repoMerged: String!, $repoClosed: String!, + $globalAll: String!, $globalMerged: String!, $globalClosed: String! +) { + user(login: $login) { + createdAt + repositoriesContributedTo( + first: 10, + contributionTypes: [COMMIT, PULL_REQUEST], + orderBy: {field: STARGAZERS, direction: DESC} + ) { + totalCount + nodes { + nameWithOwner + url + stargazerCount + isPrivate + } + } + } + repoAll: search(query: $repoAll, type: ISSUE) { issueCount } + repoMerged: search(query: $repoMerged, type: ISSUE) { issueCount } + repoClosed: search(query: $repoClosed, type: ISSUE) { issueCount } + globalAll: search(query: $globalAll, type: ISSUE) { issueCount } + globalMerged: search(query: $globalMerged, type: ISSUE) { issueCount } + globalClosed: search(query: $globalClosed, type: ISSUE) { issueCount } +} +""" + +_CONVERT_TO_DRAFT_MUTATION = """ +mutation($prId: ID!) { + convertPullRequestToDraft(input: {pullRequestId: $prId}) { + pullRequest { id } + } +} +""" + +_ADD_COMMENT_MUTATION = """ +mutation($subjectId: ID!, $body: String!) { + addComment(input: {subjectId: $subjectId, body: $body}) { + commentEdge { node { id } } + } +} +""" + +_ADD_LABELS_MUTATION = """ +mutation($labelableId: ID!, $labelIds: [ID!]!) { + addLabelsToLabelable(input: {labelableId: $labelableId, labelIds: $labelIds}) { + labelable { ... on PullRequest { id } } + } +} +""" + +_GET_LABEL_ID_QUERY = """ +query($owner: String!, $repo: String!, $name: String!) { + repository(owner: $owner, name: $repo) { + label(name: $name) { id } + } +} +""" + +_CLOSE_PR_MUTATION = """ +mutation($prId: ID!) { + closePullRequest(input: {pullRequestId: $prId}) { + pullRequest { id } + } +} +""" + + +@dataclass +class PRData: + """PR data fetched from GraphQL.""" + + number: int + title: str + body: str + url: str + created_at: str + updated_at: str + node_id: str + author_login: str + author_association: str + head_sha: str + base_ref: str # e.g. "main" + check_summary: str + checks_state: str # statusCheckRollup.state: SUCCESS, FAILURE, PENDING, etc. + failed_checks: list[str] # best-effort list of individual failing check names + commits_behind: int # how many commits behind the base branch + mergeable: str # MERGEABLE, CONFLICTING, or UNKNOWN + labels: list[str] # label names attached to this PR + + [email protected](cls=BreezeGroup, name="pr", help="Tools for managing GitHub pull requests.") +def pr_group(): + pass + + +def _resolve_github_token(github_token: str | None) -> str | None: + """Resolve GitHub token from option, environment, or gh CLI.""" + if github_token: + return github_token + gh_token_result = run_command( + ["gh", "auth", "token"], + capture_output=True, + text=True, + check=False, + dry_run_override=False, + ) + if gh_token_result.returncode == 0: + return gh_token_result.stdout.strip() + return None + + +def _graphql_request(token: str, query: str, variables: dict) -> dict: + """Execute a GitHub GraphQL request. Returns the 'data' dict or exits on error.""" + import requests + + response = requests.post( + "https://api.github.com/graphql", + json={"query": query, "variables": variables}, + headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, + timeout=60, + ) + if response.status_code != 200: + get_console().print(f"[error]GraphQL request failed: {response.status_code} {response.text}[/]") + sys.exit(1) + result = response.json() + if "errors" in result: + get_console().print(f"[error]GraphQL errors: {result['errors']}[/]") + sys.exit(1) + return result["data"] + + +_CHECK_FAILURE_CONCLUSIONS = {"FAILURE", "TIMED_OUT", "ACTION_REQUIRED"} +_STATUS_FAILURE_STATES = {"FAILURE", "ERROR"} + +# Batch sizes for chunked GraphQL queries to avoid GitHub timeout errors +_CHECK_DETAIL_BATCH_SIZE = 10 +_COMMITS_BEHIND_BATCH_SIZE = 20 + +# Substrings that indicate a check is from a CI test workflow (not just labelers/bots) +_TEST_WORKFLOW_PATTERNS = [ + "test", + "static check", + "build", + "ci image", + "prod image", + "helm", + "k8s", + "basic", + "unit", + "integration", + "provider", + "mypy", + "pre-commit", + "docs", +] + + +def _is_test_check(name: str) -> bool: + """Return True if the check name looks like a CI test workflow (not just a bot/labeler).""" + lower = name.lower() + return any(p in lower for p in _TEST_WORKFLOW_PATTERNS) + + +def _extract_basic_check_info(pr_node: dict) -> tuple[str, str]: + """Extract basic check info from a lightweight GraphQL PR node (no contexts). + + Returns (head_sha, rollup_state). + """ + commits = pr_node.get("commits", {}).get("nodes", []) + if not commits: + return "", "UNKNOWN" + commit = commits[0].get("commit", {}) + head_sha = commit.get("oid", "") + rollup = commit.get("statusCheckRollup") + if not rollup: + return head_sha, "UNKNOWN" + return head_sha, rollup.get("state", "UNKNOWN") + + +def _process_check_contexts(contexts: list[dict], total_count: int) -> tuple[str, list[str], bool]: + """Process check context nodes into summary text, failed names, and test-check presence. + + Returns (summary_text, failed_check_names, has_test_checks). + """ + lines: list[str] = [] + failed: list[str] = [] + has_test_checks = False + for ctx in contexts: + typename = ctx.get("__typename") + if typename == "CheckRun": + name = ctx.get("name", "unknown") + conclusion = ctx.get("conclusion") or ctx.get("status") or "unknown" + lines.append(f" {name}: {conclusion}") + if _is_test_check(name): + has_test_checks = True + if conclusion.upper() in _CHECK_FAILURE_CONCLUSIONS: + failed.append(name) + elif typename == "StatusContext": + name = ctx.get("context", "unknown") + state = ctx.get("state", "unknown") + lines.append(f" {name}: {state}") + if _is_test_check(name): + has_test_checks = True + if state.upper() in _STATUS_FAILURE_STATES: + failed.append(name) + if total_count > len(contexts): + extra = total_count - len(contexts) + lines.append(f" ... ({extra} more {'checks' if extra != 1 else 'check'} not shown)") + summary = "\n".join(lines) if lines else "No check runs found." + return summary, failed, has_test_checks + + +def _fetch_failed_checks(token: str, github_repository: str, head_sha: str) -> list[str]: + """Fetch all failing check names for a commit by paginating through check contexts.""" + owner, repo = github_repository.split("/", 1) + failed: list[str] = [] + cursor: str | None = None + + while True: + variables: dict = {"owner": owner, "repo": repo, "oid": head_sha, "first": 100} + if cursor: + variables["after"] = cursor + + data = _graphql_request(token, _CHECK_CONTEXTS_QUERY, variables) + rollup = (data.get("repository", {}).get("object", {}) or {}).get("statusCheckRollup") + if not rollup: + break + + contexts_data = rollup.get("contexts", {}) + for ctx in contexts_data.get("nodes", []): + typename = ctx.get("__typename") + if typename == "CheckRun": + conclusion = ctx.get("conclusion") or ctx.get("status") or "unknown" + if conclusion.upper() in _CHECK_FAILURE_CONCLUSIONS: + failed.append(ctx.get("name", "unknown")) + elif typename == "StatusContext": + state = ctx.get("state", "unknown") + if state.upper() in _STATUS_FAILURE_STATES: + failed.append(ctx.get("context", "unknown")) + + page_info = contexts_data.get("pageInfo", {}) + if not page_info.get("hasNextPage"): + break + cursor = page_info.get("endCursor") + + return failed + + +def _fetch_check_details_batch(token: str, github_repository: str, prs: list[PRData]) -> None: + """Fetch detailed check contexts for PRs in chunked GraphQL queries. + + Updates each PR's check_summary, checks_state, and failed_checks in-place. + Processes in chunks of _CHECK_DETAIL_BATCH_SIZE to avoid GitHub timeout errors. + """ + owner, repo = github_repository.split("/", 1) + eligible = [pr for pr in prs if pr.head_sha] + if not eligible: + return + + for chunk_start in range(0, len(eligible), _CHECK_DETAIL_BATCH_SIZE): + chunk = eligible[chunk_start : chunk_start + _CHECK_DETAIL_BATCH_SIZE] + + object_fields = [] + for pr in chunk: + alias = f"pr{pr.number}" + object_fields.append( + f' {alias}: object(oid: "{pr.head_sha}") {{\n' + f" ... on Commit {{\n" + f" statusCheckRollup {{\n" + f" state\n" + f" contexts(first: 100) {{\n" + f" totalCount\n" + f" nodes {{\n" + f" ... on CheckRun {{ __typename name conclusion status }}\n" + f" ... on StatusContext {{ __typename context state }}\n" + f" }}\n" + f" }}\n" + f" }}\n" + f" }}\n" + f" }}" + ) + + query = ( + f'query {{\n repository(owner: "{owner}", name: "{repo}") {{\n' + + "\n".join(object_fields) + + "\n }\n}" + ) + + try: + data = _graphql_request(token, query, {}) + except SystemExit: + continue + + repo_data = data.get("repository", {}) + for pr in chunk: + alias = f"pr{pr.number}" + commit_data = repo_data.get(alias) or {} + rollup = commit_data.get("statusCheckRollup") + if not rollup: + continue + + rollup_state = rollup.get("state", "UNKNOWN") + contexts_data = rollup.get("contexts", {}) + total_count = contexts_data.get("totalCount", 0) + contexts = contexts_data.get("nodes", []) + + summary, failed, has_test_checks = _process_check_contexts(contexts, total_count) + + if contexts and not has_test_checks: + rollup_state = "NOT_RUN" + + pr.checks_state = rollup_state + pr.check_summary = summary + pr.failed_checks = failed + + +def _fetch_commits_behind_batch(token: str, github_repository: str, prs: list[PRData]) -> dict[int, int]: + """Fetch how many commits each PR is behind its base branch in chunked GraphQL queries. + + Uses aliased ref.compare fields batched into chunks of _COMMITS_BEHIND_BATCH_SIZE + to avoid GitHub timeout errors. + Returns a dict mapping PR number to commits behind count. + """ + owner, repo = github_repository.split("/", 1) + eligible = [(i, pr) for i, pr in enumerate(prs) if pr.head_sha] + if not eligible: + return {} + + result: dict[int, int] = {} + for chunk_start in range(0, len(eligible), _COMMITS_BEHIND_BATCH_SIZE): + chunk = eligible[chunk_start : chunk_start + _COMMITS_BEHIND_BATCH_SIZE] + + compare_fields = [] + for _i, pr in chunk: + alias = f"pr{pr.number}" + compare_fields.append( + f' {alias}: ref(qualifiedName: "refs/heads/{pr.base_ref}") {{\n' + f' compare(headRef: "{pr.head_sha}") {{ behindBy }}\n' + f" }}" + ) + + query = ( + f'query {{\n repository(owner: "{owner}", name: "{repo}") {{\n' + + "\n".join(compare_fields) + + "\n }\n}" + ) + + try: + data = _graphql_request(token, query, {}) + except SystemExit: + continue + + repo_data = data.get("repository", {}) + for _i, pr in chunk: + alias = f"pr{pr.number}" + ref_data = repo_data.get(alias) or {} + compare = ref_data.get("compare") or {} + result[pr.number] = compare.get("behindBy", 0) + + return result + + +def _fetch_prs_graphql( + token: str, + github_repository: str, + labels: tuple[str, ...], + filter_user: str | None, + sort: str, + batch_size: int, +) -> list[PRData]: + """Fetch a single batch of matching PRs via GraphQL.""" + query_parts = [f"repo:{github_repository}", "type:pr", "is:open", "draft:false"] + if filter_user: + query_parts.append(f"author:{filter_user}") + for label in labels: + query_parts.append(f'label:"{label}"') + search_query = " ".join(query_parts) + + sort_field, sort_direction = sort.rsplit("-", 1) + search_query += f" sort:{sort_field}-{sort_direction}" + + get_console().print(f"[info]Searching PRs: {search_query}[/]") + + data = _graphql_request(token, _SEARCH_PRS_QUERY, {"query": search_query, "first": batch_size}) + search_data = data["search"] Review Comment: That's by design. Batching is done outside of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
