This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 725db64 [BEAM-13268] Change autosharding big query inserts to happen
in parallel for a bundle.
new 5656e26 Merge pull request #16003 from [BEAM-13268] Change
autosharding big query inserts to happen in parallel for a bundle.
725db64 is described below
commit 725db640331788edf23cdbe3298e6b7850e7c2e1
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Nov 17 05:12:23 2021 -0800
[BEAM-13268] Change autosharding big query inserts to happen in parallel
for a bundle.
---
.../sdk/io/gcp/bigquery/BatchedStreamingWrite.java | 107 ++++++---------------
1 file changed, 32 insertions(+), 75 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index dfe797e..2c1321d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -317,19 +316,44 @@ class BatchedStreamingWrite<ErrorT, ElementT>
Window.<KV<String, TableRowInfo<ElementT>>>into(new
GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes())
- // Group and batch table rows such that each batch has no more
than
- // getMaxStreamingRowsToBatch rows. Also set a buffering time
limit to avoid being
- // stuck at a partial batch forever, especially in a global
window.
+ // Group and batch table rows so that we flush once we have
enough elements for an
+ // insert rpc batch. Also set a buffering time limit to avoid
being stuck at a
+ // partial batch forever, especially in a global window.
.apply(
GroupIntoBatches.<String, TableRowInfo<ElementT>>ofSize(
options.getMaxStreamingRowsToBatch())
.withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())
- .setCoder(
- KvCoder.of(
- ShardedKey.Coder.of(StringUtf8Coder.of()),
IterableCoder.of(valueCoder)))
+ // Undo the batching performed and flush output using
finalization. This may build up
+ // batches of rows to insert beyond the batch size but that is
handled with parallel
+ // insert requests in the underlying BQ services.
.apply(
- ParDo.of(new InsertBatchedElements())
+ ParDo.of(
+ new DoFn<
+ KV<ShardedKey<String>,
Iterable<TableRowInfo<ElementT>>>,
+ KV<String, TableRowInfo<ElementT>>>() {
+ @ProcessElement
+ public void apply(
+ ProcessContext context,
+ @Element
+ KV<ShardedKey<String>,
Iterable<TableRowInfo<ElementT>>> element) {
+ String key = element.getKey().getKey();
+ int count = 0;
+ for (TableRowInfo<ElementT> value :
element.getValue()) {
+ context.output(KV.of(key, value));
+ count = count + 1;
+ }
+ LOG.info(
+ "Writing to BigQuery using Auto-sharding.
Flushing {} rows.", count);
+ }
+ }))
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), valueCoder))
+ // TODO(BEAM-11408): This transform requires stable inputs.
Currently it relies on the
+ // fact that the upstream transform GroupIntoBatches produces
stable outputs as
+ // opposed to using the annotation @RequiresStableInputs, to
avoid potential
+ // performance penalty due to extra data shuffling.
+ .apply(
+ ParDo.of(new BatchAndInsertElements())
.withOutputTags(
mainOutputTag,
TupleTagList.of(failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));
@@ -339,73 +363,6 @@ class BatchedStreamingWrite<ErrorT, ElementT>
}
}
- // TODO(BEAM-11408): This transform requires stable inputs. Currently it
relies on the fact that
- // the upstream transform GroupIntoBatches produces stable outputs as
opposed to using the
- // annotation @RequiresStableInputs, to avoid potential performance penalty
due to extra data
- // shuffling.
- private class InsertBatchedElements
- extends DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>,
Void> {
- private transient @Nullable DatasetService datasetService;
-
- private DatasetService getDatasetService(PipelineOptions pipelineOptions)
throws IOException {
- if (datasetService == null) {
- datasetService =
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
- }
- return datasetService;
- }
-
- @ProcessElement
- public void processElement(
- @Element KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>
input,
- BoundedWindow window,
- ProcessContext context,
- MultiOutputReceiver out)
- throws InterruptedException, IOException {
- List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows = new
ArrayList<>();
- List<String> uniqueIds = new ArrayList<>();
- for (TableRowInfo<ElementT> row : input.getValue()) {
- TableRow tableRow = toTableRow.apply(row.tableRow);
- TableRow failsafeTableRow = toFailsafeTableRow.apply(row.tableRow);
- tableRows.add(
- FailsafeValueInSingleWindow.of(
- tableRow, context.timestamp(), window, context.pane(),
failsafeTableRow));
- uniqueIds.add(row.uniqueId);
- }
- LOG.info("Writing to BigQuery using Auto-sharding. Flushing {} rows.",
tableRows.size());
- BigQueryOptions options =
context.getPipelineOptions().as(BigQueryOptions.class);
- TableReference tableReference =
BigQueryHelpers.parseTableSpec(input.getKey().getKey());
- List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
- List<ValueInSingleWindow<TableRow>> successfulInserts =
Lists.newArrayList();
- flushRows(
- getDatasetService(options),
- tableReference,
- tableRows,
- uniqueIds,
- failedInserts,
- successfulInserts);
-
- for (ValueInSingleWindow<ErrorT> row : failedInserts) {
- out.get(failedOutputTag).output(row.getValue());
- }
- for (ValueInSingleWindow<TableRow> row : successfulInserts) {
- out.get(SUCCESSFUL_ROWS_TAG).output(row.getValue());
- }
- reportStreamingApiLogging(options);
- }
-
- @Teardown
- public void onTeardown() {
- try {
- if (datasetService != null) {
- datasetService.close();
- datasetService = null;
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
/** Writes the accumulated rows into BigQuery with streaming API. */
private void flushRows(
DatasetService datasetService,