chamikaramj commented on a change in pull request #11767:
URL: https://github.com/apache/beam/pull/11767#discussion_r596508482
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
##########
@@ -78,4 +78,23 @@
Integer getBqStreamingApiLoggingFrequencySec();
void setBqStreamingApiLoggingFrequencySec(Integer value);
+
+ @Description("If set, then BigQueryIO.Write will default to using the
Storage Write API.")
+ @Default.Boolean(false)
+ Boolean getUseStorageWriteApi();
Review comment:
Should we add such "useXYZ" options for other read/write modes as well ?
(to make the API consistent).
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
##########
@@ -164,6 +172,47 @@ void createDataset(
/** Patch BigQuery {@link Table} description. */
Table patchTableDescription(TableReference tableReference, @Nullable
String tableDescription)
throws IOException, InterruptedException;
+
+ /** Create a Write Stream for use with the the Storage Write API. */
+ WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
+ throws IOException, InterruptedException;
+
+ /**
+ * Create an append client for a given Storage API write stream. The
stream must be created
+ * first.
+ */
+ StreamAppendClient getStreamAppendClient(String streamName) throws
Exception;
+
+ /** Flush a given stream up to the given offset. The stream must have type
BUFFERED. */
Review comment:
Ping regarding previous comments for this class (feel free to just
resolve if you think the suggestion is not needed).
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This {@link PTransform} manages loads into BigQuery using the Storage API.
*/
+public class StorageApiLoads<DestinationT, ElementT>
+ extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
+ private static final Logger LOG =
LoggerFactory.getLogger(StorageApiLoads.class);
+ static final int FILE_TRIGGERING_RECORD_COUNT = 100;
+
+ private final Coder<DestinationT> destinationCoder;
+ private final Coder<ElementT> elementCoder;
+ private final StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations;
+ private final CreateDisposition createDisposition;
+ private final String kmsKey;
+ private final Duration triggeringFrequency;
+ private final BigQueryServices bqServices;
+ private final int numShards;
+
+ public StorageApiLoads(
+ Coder<DestinationT> destinationCoder,
+ Coder<ElementT> elementCoder,
+ StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations,
+ CreateDisposition createDisposition,
+ String kmsKey,
+ Duration triggeringFrequency,
+ BigQueryServices bqServices,
+ int numShards) {
+ this.destinationCoder = destinationCoder;
+ this.elementCoder = elementCoder;
+ this.dynamicDestinations = dynamicDestinations;
+ this.createDisposition = createDisposition;
+ this.kmsKey = kmsKey;
+ this.triggeringFrequency = triggeringFrequency;
+ this.bqServices = bqServices;
+ this.numShards = numShards;
+ }
+
+ @Override
+ public WriteResult expand(PCollection<KV<DestinationT, ElementT>> input) {
+ return triggeringFrequency != null ? expandTriggered(input) :
expandUntriggered(input);
+ }
+
+ public WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>>
input) {
Review comment:
So, I'm not sure why we need a triggering frequency option for write
API. Seems like it's similar to streaming inserts where we can continuously
write records (or batches of records) unlike FILE_LOAD path where we have to
trigger regular load jobs. I might not be understanding something related to
the way Write API works.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
Review comment:
Add unit tests for new RetryManager ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]