Repository: beam Updated Branches: refs/heads/master 6543e56d2 -> ad2c1f1fc
Make batch loads repeatable across different invocations of the same template job. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94d8547d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94d8547d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94d8547d Branch: refs/heads/master Commit: 94d8547d604dc7291cb8a72a12453a3dfe6a5f9a Parents: 6543e56 Author: Reuven Lax <[email protected]> Authored: Sun Apr 23 12:53:00 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Mon Jun 5 10:54:53 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 50 +++++++++++--------- .../io/gcp/bigquery/WriteBundlesToFiles.java | 12 ++--- 2 files changed, 32 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/94d8547d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index c1b202e..4393a63 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -60,10 +60,14 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */ class BatchLoads<DestinationT> extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> { + static final Logger LOG = LoggerFactory.getLogger(BatchLoads.class); + // The maximum number of file writers to keep open in a single bundle at a time, since file // writers default to 64mb buffers. This comes into play when writing dynamic table destinations. // The first 20 tables from a single BatchLoads transform will write files inline in the @@ -161,28 +165,10 @@ class BatchLoads<DestinationT> @Override public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) { Pipeline p = input.getPipeline(); - final String stepUuid = BigQueryHelpers.randomUUIDString(); - - PCollectionView<String> tempFilePrefix = - p.apply("Create", Create.of((Void) null)) - .apply( - "GetTempFilePrefix", - ParDo.of( - new DoFn<Void, String>() { - @ProcessElement - public void getTempFilePrefix(ProcessContext c) { - c.output( - resolveTempLocation( - c.getPipelineOptions().getTempLocation(), - "BigQueryWriteTemp", - stepUuid)); - } - })) - .apply("TempFilePrefixView", View.<String>asSingleton()); // Create a singleton job ID token at execution time. This will be used as the base for all // load jobs issued from this instance of the transform. - PCollectionView<String> jobIdTokenView = + final PCollection<String> jobIdToken = p.apply("TriggerIdCreation", Create.of("ignored")) .apply( "CreateJobId", @@ -190,10 +176,27 @@ class BatchLoads<DestinationT> new SimpleFunction<String, String>() { @Override public String apply(String input) { - return stepUuid; + return BigQueryHelpers.randomUUIDString(); + } + })); + final PCollectionView<String> jobIdTokenView = jobIdToken.apply(View.<String>asSingleton()); + + PCollectionView<String> tempFilePrefix = jobIdToken + .apply( + "GetTempFilePrefix", + ParDo.of( + new DoFn<String, String>() { + @ProcessElement + public void getTempFilePrefix(ProcessContext c) { + String tempLocation = resolveTempLocation( + c.getPipelineOptions().getTempLocation(), + "BigQueryWriteTemp", c.element()); + LOG.info("Writing BigQuery temporary files to {} before loading them.", + tempLocation); + c.output(tempLocation); } })) - .apply(View.<String>asSingleton()); + .apply("TempFilePrefixView", View.<String>asSingleton()); PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow = input.apply( @@ -210,9 +213,10 @@ class BatchLoads<DestinationT> new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {}; PCollectionTuple writeBundlesTuple = inputInGlobalWindow .apply("WriteBundlesToFiles", - ParDo.of(new WriteBundlesToFiles<>(stepUuid, unwrittedRecordsTag, + ParDo.of(new WriteBundlesToFiles<>(tempFilePrefix, unwrittedRecordsTag, maxNumWritersPerBundle, maxFileSize)) - .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag))); + .withSideInputs(tempFilePrefix) + .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag))); PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles = writeBundlesTuple.get(writtenFilesTag) .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); http://git-wip-us.apache.org/repos/asf/beam/blob/94d8547d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 0b5f54b..d68779a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -19,8 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; - import com.google.api.services.bigquery.model.TableRow; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,6 +39,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +63,7 @@ class WriteBundlesToFiles<DestinationT> // Map from tablespec to a writer for that table. private transient Map<DestinationT, TableRowWriter> writers; private transient Map<DestinationT, BoundedWindow> writerWindows; - private final String stepUuid; + private final PCollectionView<String> tempFilePrefixView; private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag; private int maxNumWritersPerBundle; private long maxFileSize; @@ -131,11 +130,11 @@ class WriteBundlesToFiles<DestinationT> } WriteBundlesToFiles( - String stepUuid, + PCollectionView<String> tempFilePrefixView, TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag, int maxNumWritersPerBundle, long maxFileSize) { - this.stepUuid = stepUuid; + this.tempFilePrefixView = tempFilePrefixView; this.unwrittedRecordsTag = unwrittedRecordsTag; this.maxNumWritersPerBundle = maxNumWritersPerBundle; this.maxFileSize = maxFileSize; @@ -159,8 +158,7 @@ class WriteBundlesToFiles<DestinationT> @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - String tempFilePrefix = resolveTempLocation( - c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid); + String tempFilePrefix = c.sideInput(tempFilePrefixView); DestinationT destination = c.element().getKey(); TableRowWriter writer;
