reuvenlax commented on code in PR #17550:
URL: https://github.com/apache/beam/pull/17550#discussion_r870727332
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -93,12 +93,21 @@
void setUseStorageWriteApiAtLeastOnce(Boolean value);
@Description(
- "If set, then BigQueryIO.Write will default to using this number of
Storage Write API streams.")
+ "If set, then BigQueryIO.Write will default to using this number of
Storage Write API streams. ")
@Default.Integer(0)
Integer getNumStorageWriteApiStreams();
void setNumStorageWriteApiStreams(Integer value);
+ @Description(
+ "The number of stream append clients indicated will be allocated at a
per worker and destination "
+ + "basis. A large value can cause a large pipeline to go over the
BigQuery connection quota quickly. "
+ + "With low-mid volume pipelines using the default configuration
should be enough.")
+ @Default.Integer(1)
Review Comment:
A bit confusing - need to clarify that this only applies for at-least once
writes using the default stream
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -197,6 +204,10 @@ String getDefaultStreamName() {
return BigQueryHelpers.stripPartitionDecorator(tableUrn) +
"/streams/_default";
}
+ String getStreamAppendClientCacheEntryName() {
+ return getDefaultStreamName() + "-client" + clientNumber;
+ }
Review Comment:
This is a bit weird, since this code doesn't always use the default stream.
Now the cache is probably not needed in the non default stream case (since
we'll create a new stream for every bundle), however if we change that we need
to rename the cache and also make sure to close the client (since right now we
rely on the cache removal listener to close the client)
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -221,14 +242,11 @@ StreamAppendClient getStreamAppendClient(boolean
lookupCache) {
if (lookupCache) {
this.streamAppendClient =
APPEND_CLIENTS.get(
- streamName,
- () ->
- datasetService.getStreamAppendClient(
- streamName, descriptorWrapper.descriptor));
+ getStreamAppendClientCacheEntryName(), () ->
createStreamAppendClient());
Review Comment:
It will also be broken if you have two sinks in the pipeline, one using the
default stream and one not.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -221,14 +242,11 @@ StreamAppendClient getStreamAppendClient(boolean
lookupCache) {
if (lookupCache) {
this.streamAppendClient =
APPEND_CLIENTS.get(
- streamName,
- () ->
- datasetService.getStreamAppendClient(
- streamName, descriptorWrapper.descriptor));
+ getStreamAppendClientCacheEntryName(), () ->
createStreamAppendClient());
Review Comment:
getStreamAppendClientCacheEntryName doesn't necessarily return the stream
name we used
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java:
##########
@@ -50,14 +49,16 @@ public PCollection<Void>
expand(PCollection<KV<DestinationT, StorageApiWritePayl
BigQueryOptions bigQueryOptions =
input.getPipeline().getOptions().as(BigQueryOptions.class);
// Append records to the Storage API streams.
input.apply(
- "Write Records",
Review Comment:
Changing transform names can affect update compatibility - do you need this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]