gemini-code-assist[bot] commented on code in PR #35576: URL: https://github.com/apache/beam/pull/35576#discussion_r2237864019
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java: ########## @@ -635,38 +638,44 @@ public void process( // Clear the stream name, forcing a new one to be created. streamName.write(""); } - appendClientInfo.set( - appendClientInfo - .get() - .withAppendClient( - writeStreamService, - getOrCreateStream, - false, - defaultMissingValueInterpretation)); - StreamAppendClient streamAppendClient = - Preconditions.checkArgumentNotNull( - appendClientInfo.get().getStreamAppendClient()); - String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read()); - long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read()); - for (AppendRowsContext context : contexts) { - context.streamName = streamNameRead; - streamAppendClient.pin(); - context.client = appendClientInfo.get().getStreamAppendClient(); - context.offset = currentOffset; - ++context.tryIteration; - currentOffset = context.offset + context.protoRows.getSerializedRowsCount(); + // Synchronize to prevent race condition with clearClients + synchronized (APPEND_CLIENTS) { + appendClientInfo.set( + appendClientInfo + .get() + .withAppendClient( + writeStreamService, + getOrCreateStream, + false, + defaultMissingValueInterpretation)); + StreamAppendClient streamAppendClient = + Preconditions.checkArgumentNotNull( + appendClientInfo.get().getStreamAppendClient()); + String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read()); + long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read()); + for (AppendRowsContext context : contexts) { + context.streamName = streamNameRead; + streamAppendClient.pin(); + context.client = appendClientInfo.get().getStreamAppendClient(); + context.offset = currentOffset; + ++context.tryIteration; + currentOffset = context.offset + context.protoRows.getSerializedRowsCount(); + } + streamOffset.write(currentOffset); } Review Comment:  This `synchronized` block holds a lock on the static `APPEND_CLIENTS` object while potentially performing a blocking I/O operation inside `withAppendClient` (when a new client is created). This can become a significant performance bottleneck, as it may serialize all threads trying to get an append client across different shards. To avoid this contention, consider refactoring to perform the I/O-bound client creation outside of the synchronized block. The critical section should ideally only protect the modification of shared state like `appendClientInfo`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org