gemini-code-assist[bot] commented on code in PR #35576:
URL: https://github.com/apache/beam/pull/35576#discussion_r2237864019


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -635,38 +638,44 @@ public void process(
                 // Clear the stream name, forcing a new one to be created.
                 streamName.write("");
               }
-              appendClientInfo.set(
-                  appendClientInfo
-                      .get()
-                      .withAppendClient(
-                          writeStreamService,
-                          getOrCreateStream,
-                          false,
-                          defaultMissingValueInterpretation));
-              StreamAppendClient streamAppendClient =
-                  Preconditions.checkArgumentNotNull(
-                      appendClientInfo.get().getStreamAppendClient());
-              String streamNameRead = 
Preconditions.checkArgumentNotNull(streamName.read());
-              long currentOffset = 
Preconditions.checkArgumentNotNull(streamOffset.read());
-              for (AppendRowsContext context : contexts) {
-                context.streamName = streamNameRead;
-                streamAppendClient.pin();
-                context.client = 
appendClientInfo.get().getStreamAppendClient();
-                context.offset = currentOffset;
-                ++context.tryIteration;
-                currentOffset = context.offset + 
context.protoRows.getSerializedRowsCount();
+              // Synchronize to prevent race condition with clearClients
+              synchronized (APPEND_CLIENTS) {
+                appendClientInfo.set(
+                    appendClientInfo
+                        .get()
+                        .withAppendClient(
+                            writeStreamService,
+                            getOrCreateStream,
+                            false,
+                            defaultMissingValueInterpretation));
+                StreamAppendClient streamAppendClient =
+                    Preconditions.checkArgumentNotNull(
+                        appendClientInfo.get().getStreamAppendClient());
+                String streamNameRead = 
Preconditions.checkArgumentNotNull(streamName.read());
+                long currentOffset = 
Preconditions.checkArgumentNotNull(streamOffset.read());
+                for (AppendRowsContext context : contexts) {
+                  context.streamName = streamNameRead;
+                  streamAppendClient.pin();
+                  context.client = 
appendClientInfo.get().getStreamAppendClient();
+                  context.offset = currentOffset;
+                  ++context.tryIteration;
+                  currentOffset = context.offset + 
context.protoRows.getSerializedRowsCount();
+                }
+                streamOffset.write(currentOffset);
               }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This `synchronized` block holds a lock on the static `APPEND_CLIENTS` object 
while potentially performing a blocking I/O operation inside `withAppendClient` 
(when a new client is created). This can become a significant performance 
bottleneck, as it may serialize all threads trying to get an append client 
across different shards.
   
   To avoid this contention, consider refactoring to perform the I/O-bound 
client creation outside of the synchronized block. The critical section should 
ideally only protect the modification of shared state like `appendClientInfo`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to