gemini-code-assist[bot] commented on code in PR #38410:
URL: https://github.com/apache/beam/pull/38410#discussion_r3242697185
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -722,11 +713,24 @@ private void handleAppendFailure(
// Finalize the stream and clear streamName so a new stream will be
created.
o.get(flushTag)
.output(KV.of(failedContext.streamName, new
Operation(failedContext.offset - 1, true)));
+
+ // Clear streamName so a new stream will be created.
+ try {
+ streamName.write("");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ // Re-establish the client with the new stream.
+ resetClient.run();
+
// Reinitialize all contexts with the new stream and new offsets.
- initializeContexts.accept(failedContexts, true);
+ initializeContexts.accept(failedContexts);
Review Comment:

Setting `streamName` to an empty string here and then immediately calling
`initializeContexts(failedContexts)` propagates the empty string to all
contexts. Since `initializeContexts` (line 922) reads this empty string and
assigns it to `context.streamName`, any subsequent retry of these operations
within the same bundle will use an empty stream name. This appears to be the
root cause of the "empty stream name encountered" issue mentioned in the PR
title. If an offset mismatch occurs, a new valid stream name should be acquired
and stored in the state before re-initializing the contexts, or the contexts
should be initialized with a valid name to avoid sending empty stream names to
the finalizer or the BigQuery service.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -912,13 +916,9 @@ public void process(
// Initialize stream names and offsets for all contexts. This will be
called initially, but
// will also be called if we roll over to a new stream on a retry.
- BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean>
initializeContexts =
- (contexts, isFailure) -> {
+ Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts
=
+ (contexts) -> {
try {
- if (isFailure) {
- // Clear the stream name, forcing a new one to be created.
- streamName.write("");
- }
String streamNameRead =
Preconditions.checkArgumentNotNull(streamName.read());
Review Comment:

`Preconditions.checkArgumentNotNull(streamName.read())` only verifies that
the value is not `null`. Since `handleAppendFailure` now explicitly writes an
empty string `""` to the state (line 719), this check will pass, and the empty
string will be used as the stream name for all contexts. This likely leads to
the issue where `StorageApiFinalizeWritesDoFn` encounters an empty stream name
when it receives the output from the flush tag. Consider checking for
`Strings.isNullOrEmpty()` and ensuring a valid stream name is re-established if
the state is empty.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -722,11 +713,24 @@ private void handleAppendFailure(
// Finalize the stream and clear streamName so a new stream will be
created.
o.get(flushTag)
.output(KV.of(failedContext.streamName, new
Operation(failedContext.offset - 1, true)));
+
+ // Clear streamName so a new stream will be created.
+ try {
+ streamName.write("");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
Review Comment:

The `try-catch` block around `streamName.write("")` is unnecessary.
`ValueState.write` does not throw checked exceptions in the Beam SDK.
Additionally, catching a generic `Exception` and wrapping it in a
`RuntimeException` is discouraged when the operation is known not to throw
checked exceptions.
```java
// Clear streamName so a new stream will be created.
streamName.write("");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]