gemini-code-assist[bot] commented on code in PR #38511:
URL: https://github.com/apache/beam/pull/38511#discussion_r3249053516
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -912,13 +912,9 @@ public void process(
// Initialize stream names and offsets for all contexts. This will be
called initially, but
// will also be called if we roll over to a new stream on a retry.
- BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean>
initializeContexts =
- (contexts, isFailure) -> {
+ Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts
=
+ (contexts) -> {
try {
- if (isFailure) {
- // Clear the stream name, forcing a new one to be created.
- streamName.write("");
- }
String streamNameRead =
Preconditions.checkArgumentNotNull(streamName.read());
Review Comment:

The PR aims to fix an issue where an empty stream name is encountered in
`StorageApiFinalizeWritesDoFn`. However, after `streamName.write("")` is called
in `handleAppendFailure` (line 718), this line will read that empty string and
use it to initialize the contexts. This causes retried appends to use an empty
stream name, which will fail and eventually lead to an empty stream name being
sent to the finalization DoFn via the `flushTag` output. `initializeContexts`
should ensure that the stream name is refreshed (recreated) if it is null or
empty before it is used to initialize the contexts.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -722,11 +713,20 @@ private void handleAppendFailure(
// Finalize the stream and clear streamName so a new stream will be
created.
o.get(flushTag)
.output(KV.of(failedContext.streamName, new
Operation(failedContext.offset - 1, true)));
+
+ // Clear streamName so a new stream will be created in resetClient
below.
+ streamName.write("");
Review Comment:

Using `streamName.write("")` sets the state to an empty string. While this
avoids a null value, it still results in an invalid stream name being used if
the state is read before being refreshed with a valid name. It is generally
more idiomatic in Beam to use `streamName.clear()` to indicate that the state
is missing and needs to be recreated, which allows subsequent `read()` calls to
return `null` and trigger recreation logic.
```suggestion
streamName.clear();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]