gemini-code-assist[bot] commented on code in PR #38579:
URL: https://github.com/apache/beam/pull/38579#discussion_r3279784501


##########
.test-infra/metrics/sync/github/github_runs_prefetcher/code/main.py:
##########
@@ -148,6 +203,44 @@ def enhance_workflow(workflow):
         print(f"No yaml file found for workflow: {workflow.name}")
 
 
+async def enrich_cancelled_schedule_run_jobs(workflows, semaphore, headers):
+    tasks = []
+    for workflow in workflows:
+        for run in workflow.runs:
+            if run.event != "schedule" or run.status != "cancelled":
+                continue
+            if run_duration(run.started_at, run.updated_at) > 
CANCELLED_FAILURE_MIN_DURATION:
+                continue
+            jobs_url = (
+                
f"https://api.github.com/repos/{GIT_ORG}/beam/actions/runs/{run.id}/jobs";
+            )
+            tasks.append(fetch_jobs_count(jobs_url, run, semaphore, headers))
+
+    if tasks:
+        await asyncio.gather(*tasks)
+
+
[email protected]_exception(backoff.constant, aiohttp.ClientResponseError, 
max_tries=5)
+async def fetch_jobs_count(url, run, semaphore, headers):
+    async with semaphore:
+        async with aiohttp.ClientSession() as session:
+            async with session.get(url, headers=headers) as response:
+                if response.status == 200:
+                    result = await response.json()
+                    run.jobs_count = result.get("total_count", 0)
+                    return
+                if response.status == 403:
+                    print(f"Retry for: {url}")
+                    headers["Authorization"] = get_token()
+                raise aiohttp.ClientResponseError(
+                    response.request_info,
+                    response.history,
+                    status=response.status,
+                    message=response.reason,
+                    headers=response.headers,
+                )

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `backoff` decorator should include a `giveup` condition to avoid 
retrying on non-recoverable errors like 404 (Not Found). Also, the function 
signature should be updated to accept a shared `session` to avoid creating one 
per request.
   
   ```python
   @backoff.on_exception(
       backoff.constant,
       aiohttp.ClientResponseError,
       max_tries=5,
       giveup=lambda e: e.status not in [403, 429, 500, 502, 503, 504]
   )
   async def fetch_jobs_count(session, url, run, semaphore, headers):
       async with semaphore:
           async with session.get(url, headers=headers) as response:
               if response.status == 200:
                   result = await response.json()
                   run.jobs_count = result.get("total_count", 0)
                   return
               if response.status == 403:
                   print(f"Retry for: {url}")
                   headers["Authorization"] = get_token()
               raise aiohttp.ClientResponseError(
                   response.request_info,
                   response.history,
                   status=response.status,
                   message=response.reason,
                   headers=response.headers,
               )
   ```



##########
.test-infra/metrics/sync/github/github_runs_prefetcher/code/main.py:
##########
@@ -148,6 +203,44 @@ def enhance_workflow(workflow):
         print(f"No yaml file found for workflow: {workflow.name}")
 
 
+async def enrich_cancelled_schedule_run_jobs(workflows, semaphore, headers):
+    tasks = []
+    for workflow in workflows:
+        for run in workflow.runs:
+            if run.event != "schedule" or run.status != "cancelled":
+                continue
+            if run_duration(run.started_at, run.updated_at) > 
CANCELLED_FAILURE_MIN_DURATION:
+                continue
+            jobs_url = (
+                
f"https://api.github.com/repos/{GIT_ORG}/beam/actions/runs/{run.id}/jobs";
+            )
+            tasks.append(fetch_jobs_count(jobs_url, run, semaphore, headers))
+
+    if tasks:
+        await asyncio.gather(*tasks)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Creating a new `aiohttp.ClientSession` for every request is inefficient and 
can lead to resource exhaustion. It is recommended to create a single session 
and reuse it for all requests in the batch. Additionally, consider passing this 
session to `fetch_jobs_count`.
   
   ```suggestion
   async def enrich_cancelled_schedule_run_jobs(workflows, semaphore, headers):
       tasks = []
       async with aiohttp.ClientSession() as session:
           for workflow in workflows:
               for run in workflow.runs:
                   if run.event != "schedule" or run.status != "cancelled":
                       continue
                   if run_duration(run.started_at, run.updated_at) > 
CANCELLED_FAILURE_MIN_DURATION:
                       continue
                   jobs_url = (
                       
f"https://api.github.com/repos/{GIT_ORG}/beam/actions/runs/{run.id}/jobs";
                   )
                   tasks.append(fetch_jobs_count(session, jobs_url, run, 
semaphore, headers))
   
           if tasks:
               await asyncio.gather(*tasks)
   ```



##########
.test-infra/metrics/sync/github/github_runs_prefetcher/code/main.py:
##########
@@ -426,11 +528,15 @@ def append_workflow_runs(workflow, runs):
         page += 1
     print("Successfully fetched workflow runs details")
 
-    for workflow in list(workflows.values()):
+    workflows_list = list(workflows.values())
+    await enrich_cancelled_schedule_run_jobs(workflows_list, semaphore, 
headers)
+    for workflow in workflows_list:
         runs = sorted(workflow.runs, key=lambda r: r.started_at, reverse=True)
-        workflow.runs = runs[: int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)]
+        workflow.runs = prepare_workflow_runs_for_flakiness(
+            runs[: int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)]
+        )

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   It is more efficient to sort and slice the workflow runs before calling 
`enrich_cancelled_schedule_run_jobs`. This avoids making unnecessary API calls 
to fetch job counts for runs that will eventually be discarded by the slice.
   
   ```python
       workflows_list = list(workflows.values())
       for workflow in workflows_list:
           workflow.runs = sorted(workflow.runs, key=lambda r: r.started_at, 
reverse=True)[: int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)]
       await enrich_cancelled_schedule_run_jobs(workflows_list, semaphore, 
headers)
       for workflow in workflows_list:
           workflow.runs = prepare_workflow_runs_for_flakiness(workflow.runs)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to