ahmedabu98 commented on code in PR #31721:
URL: https://github.com/apache/beam/pull/31721#discussion_r1665874890


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -109,6 +109,26 @@ public interface BigQueryOptions
 
   void setNumStorageWriteApiStreamAppendClients(Integer value);
 
+  @Description(
+      "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing 
(ie. useStorageApiConnectionPool=true), "
+          + "this option sets the minimum number of connections each pool 
creates. This is on a per worker, per region basis. "
+          + "Note that in practice, the minimum number of connections created 
is the minimum of this value and "
+          + "(numStorageWriteApiStreamAppendClients x num destinations).")
+  @Default.Integer(2)
+  Integer getMinConnectionPoolConnections();
+
+  void setMinConnectionPoolConnections(Integer value);
+
+  @Description(
+      "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing 
(ie. useStorageApiConnectionPool=true), "
+          + "this option sets the maximum number of connections each pool 
creates. This is on a per worker, per region basis. "
+          + "This value should be greater than or equal to the total number of 
dynamic destinations, otherwise a "
+          + "race condition occurs where append operations compete over 
streams.")
+  @Default.Integer(20)

Review Comment:
   > Where does 20 come from?
   
   These values are copied over from the existing ConnectionWorkerPool defaults 
(see [code 
ref](https://github.com/googleapis/java-bigquerystorage/blob/b255926764e0e9dac689efdd31d38a25b251296f/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java#L175-L191)).
 Note that this restriction is only applied when a user explicitly enables 
multiplexing (ie. setUseStorageApiConnectionPool(true) )
   
   > could we be more explicit about the implications of the race condition? 
   
   Good call out, will make it more clear. This just affects performance. 
Individual append operations may fail when no streams are available, but we 
retry enough for records to ultimately make it to the table .
   
   > Can we detect when this race condition might occur
   
   StreamWriter (the BQ class doing append operations) will log why an append 
fails. In this case, we get a `PERMISSION_DENIED: Permission 
'TABLES_UPDATE_DATA' denied on resource` error. Example below:
   ```
   Connection finished with error 
com.google.api.gax.rpc.PermissionDeniedException: 
   io.grpc.StatusRuntimeException: PERMISSION_DENIED: Permission 
'TABLES_UPDATE_DATA' denied on resource
   
'projects/google.com:clouddfe/datasets/ahmedabualsaud/tables/a2batch_instants_1'
 (or it may not exist). 
   for stream 
projects/google.com:clouddfe/datasets/ahmedabualsaud/tables/a2batch_instants_4/streams/_default
 
   with write id: 1f52a5a0-c1a3-47ee-8f44-73b7f06a8aa9
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to