gemini-code-assist[bot] commented on code in PR #38410:
URL: https://github.com/apache/beam/pull/38410#discussion_r3242697185


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -722,11 +713,24 @@ private void handleAppendFailure(
         // Finalize the stream and clear streamName so a new stream will be 
created.
         o.get(flushTag)
             .output(KV.of(failedContext.streamName, new 
Operation(failedContext.offset - 1, true)));
+
+        // Clear streamName so a new stream will be created.
+        try {
+          streamName.write("");
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+
+        // Re-establish the client with the new stream.
+        resetClient.run();
+
         // Reinitialize all contexts with the new stream and new offsets.
-        initializeContexts.accept(failedContexts, true);
+        initializeContexts.accept(failedContexts);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Setting `streamName` to an empty string here and then immediately calling 
`initializeContexts(failedContexts)` propagates the empty string to all 
contexts. Since `initializeContexts` (line 922) reads this empty string and 
assigns it to `context.streamName`, any subsequent retry of these operations 
within the same bundle will use an empty stream name. This appears to be the 
root cause of the "empty stream name encountered" issue mentioned in the PR 
title. If an offset mismatch occurs, a new valid stream name should be acquired 
and stored in the state before re-initializing the contexts, or the contexts 
should be initialized with a valid name to avoid sending empty stream names to 
the finalizer or the BigQuery service.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -912,13 +916,9 @@ public void process(
 
         // Initialize stream names and offsets for all contexts. This will be 
called initially, but
         // will also be called if we roll over to a new stream on a retry.
-        BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean> 
initializeContexts =
-            (contexts, isFailure) -> {
+        Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts 
=
+            (contexts) -> {
               try {
-                if (isFailure) {
-                  // Clear the stream name, forcing a new one to be created.
-                  streamName.write("");
-                }
                 String streamNameRead = 
Preconditions.checkArgumentNotNull(streamName.read());

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   `Preconditions.checkArgumentNotNull(streamName.read())` only verifies that 
the value is not `null`. Since `handleAppendFailure` now explicitly writes an 
empty string `""` to the state (line 719), this check will pass, and the empty 
string will be used as the stream name for all contexts. This likely leads to 
the issue where `StorageApiFinalizeWritesDoFn` encounters an empty stream name 
when it receives the output from the flush tag. Consider checking for 
`Strings.isNullOrEmpty()` and ensuring a valid stream name is re-established if 
the state is empty.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -722,11 +713,24 @@ private void handleAppendFailure(
         // Finalize the stream and clear streamName so a new stream will be 
created.
         o.get(flushTag)
             .output(KV.of(failedContext.streamName, new 
Operation(failedContext.offset - 1, true)));
+
+        // Clear streamName so a new stream will be created.
+        try {
+          streamName.write("");
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `try-catch` block around `streamName.write("")` is unnecessary. 
`ValueState.write` does not throw checked exceptions in the Beam SDK. 
Additionally, catching a generic `Exception` and wrapping it in a 
`RuntimeException` is discouraged when the operation is known not to throw 
checked exceptions.
   
   ```java
           // Clear streamName so a new stream will be created.
           streamName.write("");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to