gemini-code-assist[bot] commented on code in PR #38511:
URL: https://github.com/apache/beam/pull/38511#discussion_r3249053516


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -912,13 +912,9 @@ public void process(
 
         // Initialize stream names and offsets for all contexts. This will be 
called initially, but
         // will also be called if we roll over to a new stream on a retry.
-        BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean> 
initializeContexts =
-            (contexts, isFailure) -> {
+        Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts 
=
+            (contexts) -> {
               try {
-                if (isFailure) {
-                  // Clear the stream name, forcing a new one to be created.
-                  streamName.write("");
-                }
                 String streamNameRead = 
Preconditions.checkArgumentNotNull(streamName.read());

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The PR aims to fix an issue where an empty stream name is encountered in 
`StorageApiFinalizeWritesDoFn`. However, after `streamName.write("")` is called 
in `handleAppendFailure` (line 718), this line will read that empty string and 
use it to initialize the contexts. This causes retried appends to use an empty 
stream name, which will fail and eventually lead to an empty stream name being 
sent to the finalization DoFn via the `flushTag` output. `initializeContexts` 
should ensure that the stream name is refreshed (recreated) if it is null or 
empty before it is used to initialize the contexts.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -722,11 +713,20 @@ private void handleAppendFailure(
         // Finalize the stream and clear streamName so a new stream will be 
created.
         o.get(flushTag)
             .output(KV.of(failedContext.streamName, new 
Operation(failedContext.offset - 1, true)));
+
+        // Clear streamName so a new stream will be created in resetClient 
below.
+        streamName.write("");

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `streamName.write("")` sets the state to an empty string. While this 
avoids a null value, it still results in an invalid stream name being used if 
the state is read before being refreshed with a valid name. It is generally 
more idiomatic in Beam to use `streamName.clear()` to indicate that the state 
is missing and needs to be recreated, which allows subsequent `read()` calls to 
return `null` and trigger recreation logic.
   
   ```suggestion
           streamName.clear();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to