gemini-code-assist[bot] commented on code in PR #36164:
URL: https://github.com/apache/beam/pull/36164#discussion_r2350308956


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java:
##########
@@ -94,15 +96,17 @@ List<AssembledWorkItem> 
append(Windmill.StreamingGetWorkResponseChunk chunk) {
    */
   private Optional<AssembledWorkItem> flushToWorkItem() {
     try {
+      workItemBuilder.mergeFrom(data);
       return Optional.of(
           AssembledWorkItem.create(
-              WorkItem.parseFrom(data.newInput()),
+              workItemBuilder.build(),
               Preconditions.checkNotNull(metadata),
               workTimingInfosTracker.getLatencyAttributions(),
               bufferedSize));

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The change from `WorkItem.parseFrom(data.newInput())` to 
`workItemBuilder.build()` introduces a subtle but critical change in exception 
handling. `WorkItem.parseFrom()` throws an `InvalidProtocolBufferException` 
(which is a subclass of `IOException`) if a required field is missing, and this 
is caught by the existing `catch (IOException e)` block.
   
   However, `workItemBuilder.build()` throws an 
`UninitializedMessageException`, which is a `RuntimeException` and is not 
caught. This could lead to an unhandled exception and crash the worker thread 
for data that would have been gracefully handled before. 
   
   To preserve the original error handling behavior, you should use 
`buildPartial()` and then check if the message is initialized, throwing an 
`InvalidProtocolBufferException` if it's not. This correctly mimics the 
behavior of `parseFrom()`.
   
   ```java
         workItemBuilder.mergeFrom(data);
         WorkItem workItem = workItemBuilder.buildPartial();
         if (!workItem.isInitialized()) {
           throw new 
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException(
               "WorkItem not initialized").setUnfinishedMessage(workItem);
         }
         return Optional.of(
             AssembledWorkItem.create(
                 workItem,
                 Preconditions.checkNotNull(metadata),
                 workTimingInfosTracker.getLatencyAttributions(),
                 bufferedSize));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to