gemini-code-assist[bot] commented on code in PR #38565:
URL: https://github.com/apache/beam/pull/38565#discussion_r3274744914


##########
sdks/python/apache_beam/ml/inference/agent_development_kit.py:
##########
@@ -288,15 +279,30 @@ async def _invoke_agent(
       The text of the agent's final response, or ``None`` if the agent
       produced no final text response.
     """
+    # Check for your specific session ID
+    try:
+      # Attempt to get the specific session
+      await runner.session_service.get_session(session_id)
+    except Exception as e:
+      await runner.session_service.create_session(
+          app_name=app_name,
+          user_id=user_id,
+          session_id=session_id,
+      )

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This implementation is susceptible to race conditions. When multiple 
elements in a batch share the same `session_id`, they may concurrently fail the 
`get_session` check and attempt to call `create_session`. The previous 
implementation explicitly handled `sessions.SessionExistsError` to prevent 
failures in such scenarios. Additionally, catching a broad `Exception` is 
discouraged as it can mask unrelated issues like authentication or network 
errors. The variable `e` is also unused.
   
   ```python
       try:
         await runner.session_service.get_session(session_id)
       except Exception:
         try:
           await runner.session_service.create_session(
               app_name=app_name,
               user_id=user_id,
               session_id=session_id,
           )
         except sessions.SessionExistsError:
           # It's okay if the session was created by a concurrent task.
           pass
   ```



##########
sdks/python/apache_beam/ml/inference/agent_development_kit.py:
##########
@@ -288,15 +279,30 @@ async def _invoke_agent(
       The text of the agent's final response, or ``None`` if the agent
       produced no final text response.
     """
+    # Check for your specific session ID
+    try:
+      # Attempt to get the specific session
+      await runner.session_service.get_session(session_id)
+    except Exception as e:
+      await runner.session_service.create_session(
+          app_name=app_name,
+          user_id=user_id,
+          session_id=session_id,
+      )
+
     async for event in runner.run_async(
         user_id=user_id,
         session_id=session_id,
         new_message=message,
     ):
       if event.is_final_response():
-        if event.content:
-          return event.content.text
-    return None
+        if event.content and event.content.parts:
+          return "".join([p.text for p in event.content.parts])
+        raise ValueError(
+            f"Agent {runner.agent.name} did not return a response, "
+            f"final event: {event}")
+
+    raise ValueError(f"Agent {runner.agent.name} did not return a response")

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Raising a `ValueError` when no final response is found (or when it lacks 
content) is a significant change in behavior. Previously, this function 
returned `None`, allowing `run_inference` to yield a `PredictionResult` with a 
`None` inference. This change will now cause the entire batch/bundle to fail if 
an agent is silent. If this is intended, please update the function's return 
type hint (currently `Optional[str]`) and the docstring (which still states it 
returns `None`) to reflect this new contract.



##########
sdks/python/apache_beam/ml/inference/agent_development_kit.py:
##########
@@ -288,15 +279,30 @@ async def _invoke_agent(
       The text of the agent's final response, or ``None`` if the agent
       produced no final text response.
     """
+    # Check for your specific session ID
+    try:
+      # Attempt to get the specific session
+      await runner.session_service.get_session(session_id)
+    except Exception as e:
+      await runner.session_service.create_session(
+          app_name=app_name,
+          user_id=user_id,
+          session_id=session_id,
+      )
+
     async for event in runner.run_async(
         user_id=user_id,
         session_id=session_id,
         new_message=message,
     ):
       if event.is_final_response():
-        if event.content:
-          return event.content.text
-    return None
+        if event.content and event.content.parts:
+          return "".join([p.text for p in event.content.parts])

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using a generator expression instead of a list comprehension inside `join()` 
is more memory-efficient as it avoids creating an intermediate list in memory.
   
   ```suggestion
             return "".join(p.text for p in event.content.parts)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to