This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 725db64  [BEAM-13268] Change autosharding big query inserts to happen 
in parallel for a bundle.
     new 5656e26  Merge pull request #16003 from [BEAM-13268] Change 
autosharding big query inserts to happen in parallel for a bundle.
725db64 is described below

commit 725db640331788edf23cdbe3298e6b7850e7c2e1
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Nov 17 05:12:23 2021 -0800

    [BEAM-13268] Change autosharding big query inserts to happen in parallel 
for a bundle.
---
 .../sdk/io/gcp/bigquery/BatchedStreamingWrite.java | 107 ++++++---------------
 1 file changed, 32 insertions(+), 75 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index dfe797e..2c1321d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.metrics.MetricsLogger;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -317,19 +316,44 @@ class BatchedStreamingWrite<ErrorT, ElementT>
                   Window.<KV<String, TableRowInfo<ElementT>>>into(new 
GlobalWindows())
                       .triggering(DefaultTrigger.of())
                       .discardingFiredPanes())
-              // Group and batch table rows such that each batch has no more 
than
-              // getMaxStreamingRowsToBatch rows. Also set a buffering time 
limit to avoid being
-              // stuck at a partial batch forever, especially in a global 
window.
+              // Group and batch table rows so that we flush once we have 
enough elements for an
+              // insert rpc batch. Also set a buffering time limit to avoid 
being  stuck at a
+              // partial batch forever, especially in a global window.
               .apply(
                   GroupIntoBatches.<String, TableRowInfo<ElementT>>ofSize(
                           options.getMaxStreamingRowsToBatch())
                       .withMaxBufferingDuration(maxBufferingDuration)
                       .withShardedKey())
-              .setCoder(
-                  KvCoder.of(
-                      ShardedKey.Coder.of(StringUtf8Coder.of()), 
IterableCoder.of(valueCoder)))
+              // Undo the batching performed and flush output using 
finalization.  This may build up
+              // batches of rows to insert beyond the batch size but that is 
handled with parallel
+              // insert requests in the underlying BQ services.
               .apply(
-                  ParDo.of(new InsertBatchedElements())
+                  ParDo.of(
+                      new DoFn<
+                          KV<ShardedKey<String>, 
Iterable<TableRowInfo<ElementT>>>,
+                          KV<String, TableRowInfo<ElementT>>>() {
+                        @ProcessElement
+                        public void apply(
+                            ProcessContext context,
+                            @Element
+                                KV<ShardedKey<String>, 
Iterable<TableRowInfo<ElementT>>> element) {
+                          String key = element.getKey().getKey();
+                          int count = 0;
+                          for (TableRowInfo<ElementT> value : 
element.getValue()) {
+                            context.output(KV.of(key, value));
+                            count = count + 1;
+                          }
+                          LOG.info(
+                              "Writing to BigQuery using Auto-sharding. 
Flushing {} rows.", count);
+                        }
+                      }))
+              .setCoder(KvCoder.of(StringUtf8Coder.of(), valueCoder))
+              // TODO(BEAM-11408): This transform requires stable inputs. 
Currently it relies on the
+              // fact that the upstream transform GroupIntoBatches produces 
stable outputs as
+              // opposed to using the annotation @RequiresStableInputs, to 
avoid potential
+              // performance penalty due to extra data shuffling.
+              .apply(
+                  ParDo.of(new BatchAndInsertElements())
                       .withOutputTags(
                           mainOutputTag,
                           
TupleTagList.of(failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));
@@ -339,73 +363,6 @@ class BatchedStreamingWrite<ErrorT, ElementT>
     }
   }
 
-  // TODO(BEAM-11408): This transform requires stable inputs. Currently it 
relies on the fact that
-  // the upstream transform GroupIntoBatches produces stable outputs as 
opposed to using the
-  // annotation @RequiresStableInputs, to avoid potential performance penalty 
due to extra data
-  // shuffling.
-  private class InsertBatchedElements
-      extends DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>, 
Void> {
-    private transient @Nullable DatasetService datasetService;
-
-    private DatasetService getDatasetService(PipelineOptions pipelineOptions) 
throws IOException {
-      if (datasetService == null) {
-        datasetService = 
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
-      }
-      return datasetService;
-    }
-
-    @ProcessElement
-    public void processElement(
-        @Element KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>> 
input,
-        BoundedWindow window,
-        ProcessContext context,
-        MultiOutputReceiver out)
-        throws InterruptedException, IOException {
-      List<FailsafeValueInSingleWindow<TableRow, TableRow>> tableRows = new 
ArrayList<>();
-      List<String> uniqueIds = new ArrayList<>();
-      for (TableRowInfo<ElementT> row : input.getValue()) {
-        TableRow tableRow = toTableRow.apply(row.tableRow);
-        TableRow failsafeTableRow = toFailsafeTableRow.apply(row.tableRow);
-        tableRows.add(
-            FailsafeValueInSingleWindow.of(
-                tableRow, context.timestamp(), window, context.pane(), 
failsafeTableRow));
-        uniqueIds.add(row.uniqueId);
-      }
-      LOG.info("Writing to BigQuery using Auto-sharding. Flushing {} rows.", 
tableRows.size());
-      BigQueryOptions options = 
context.getPipelineOptions().as(BigQueryOptions.class);
-      TableReference tableReference = 
BigQueryHelpers.parseTableSpec(input.getKey().getKey());
-      List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
-      List<ValueInSingleWindow<TableRow>> successfulInserts = 
Lists.newArrayList();
-      flushRows(
-          getDatasetService(options),
-          tableReference,
-          tableRows,
-          uniqueIds,
-          failedInserts,
-          successfulInserts);
-
-      for (ValueInSingleWindow<ErrorT> row : failedInserts) {
-        out.get(failedOutputTag).output(row.getValue());
-      }
-      for (ValueInSingleWindow<TableRow> row : successfulInserts) {
-        out.get(SUCCESSFUL_ROWS_TAG).output(row.getValue());
-      }
-      reportStreamingApiLogging(options);
-    }
-
-    @Teardown
-    public void onTeardown() {
-      try {
-        if (datasetService != null) {
-          datasetService.close();
-          datasetService = null;
-        }
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
   /** Writes the accumulated rows into BigQuery with streaming API. */
   private void flushRows(
       DatasetService datasetService,

Reply via email to