Repository: beam Updated Branches: refs/heads/master ec052bb44 -> a5cbd764b
Add custom tempLocation support to BigQueryIO. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ec58a80c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ec58a80c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ec58a80c Branch: refs/heads/master Commit: ec58a80ca0f913c85d5f17cba3535243cd010876 Parents: ec052bb Author: Yunqing Zhou <[email protected]> Authored: Fri Oct 13 15:52:18 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Mon Oct 16 15:17:59 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 25 ++++++++-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 49 ++++++++++++++++---- 2 files changed, 61 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ec58a80c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 6d832e4..1ccd5d6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -127,11 +128,13 @@ class BatchLoads<DestinationT> private long maxFileSize; private int numFileShards; private Duration triggeringFrequency; + private ValueProvider<String> customGcsTempLocation; BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition, boolean singletonTable, DynamicDestinations<?, DestinationT> dynamicDestinations, - Coder<DestinationT> destinationCoder) { + Coder<DestinationT> destinationCoder, + ValueProvider<String> customGcsTempLocation) { bigQueryServices = new BigQueryServicesImpl(); this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; @@ -142,6 +145,7 @@ class BatchLoads<DestinationT> this.maxFileSize = DEFAULT_MAX_FILE_SIZE; this.numFileShards = DEFAULT_NUM_FILE_SHARDS; this.triggeringFrequency = null; + this.customGcsTempLocation = customGcsTempLocation; } void setTestServices(BigQueryServices bigQueryServices) { @@ -174,7 +178,16 @@ class BatchLoads<DestinationT> @Override public void validate(PipelineOptions options) { // We will use a BigQuery load job -- validate the temp location. - String tempLocation = options.getTempLocation(); + String tempLocation; + if (customGcsTempLocation == null) { + tempLocation = options.getTempLocation(); + } else { + if (!customGcsTempLocation.isAccessible()) { + // Can't perform verification in this case. + return; + } + tempLocation = customGcsTempLocation.get(); + } checkArgument( !Strings.isNullOrEmpty(tempLocation), "BigQueryIO.Write needs a GCS temp location to store temp files."); @@ -359,9 +372,15 @@ class BatchLoads<DestinationT> new DoFn<String, String>() { @ProcessElement public void getTempFilePrefix(ProcessContext c) { + String tempLocationRoot; + if (customGcsTempLocation != null) { + tempLocationRoot = customGcsTempLocation.get(); + } else { + tempLocationRoot = c.getPipelineOptions().getTempLocation(); + } String tempLocation = resolveTempLocation( - c.getPipelineOptions().getTempLocation(), + tempLocationRoot, "BigQueryWriteTemp", c.element()); LOG.info( http://git-wip-us.apache.org/repos/asf/beam/blob/ec58a80c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 2f99643..3dfd8b8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -240,15 +240,6 @@ import org.slf4j.LoggerFactory; * <p>For the most general form of dynamic table destinations and schemas, look at {@link * BigQueryIO.Write#to(DynamicDestinations)}. * - * <h3>Permissions</h3> - * - * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the - * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more - * details. - * - * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control - * </a> for security and permission related information specific to BigQuery. - * * <h3>Insertion Method</h3> * * {@link BigQueryIO.Write} supports two methods of inserting data into BigQuery specified using @@ -257,6 +248,30 @@ import org.slf4j.LoggerFactory; * about the methods. The different insertion methods provide different tradeoffs of cost, quota, * and data consistency; please see BigQuery documentation for more information about these * tradeoffs. + * + * <h3>Usage with templates</h3> + * + * <p>When using {@link #read} or {@link #readTableRows()} in a template, it's required to specify + * {@link Read#withTemplateCompatibility()}. Specifying this in a non-template pipeline is not + * recommended because it has somewhat lower performance. + * + * <p>When using {@link #write()} or {@link #writeTableRows()} with batch loads in a template, it is + * recommended to specify {@link Write#withCustomGcsTempLocation}. Writing to BigQuery via batch + * loads involves writing temporary files to this location, so the location must be accessible at + * pipeline execution time. By default, this location is captured at pipeline <i>construction</i> + * time, may be inaccessible if the template may be reused from a different project or at a moment + * when the original location no longer exists. + * {@link Write#withCustomGcsTempLocation(ValueProvider)} allows specifying the location as an + * argument to the template invocation. + * + * <h3>Permissions</h3> + * + * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the + * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more + * details. + * + * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control + * </a> for security and permission related information specific to BigQuery. */ public class BigQueryIO { private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class); @@ -1031,6 +1046,8 @@ public class BigQueryIO { @Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy(); + @Nullable abstract ValueProvider<String> getCustomGcsTempLocation(); + abstract Builder<T> toBuilder(); @AutoValue.Builder @@ -1059,6 +1076,8 @@ public class BigQueryIO { abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy); + abstract Builder<T> setCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation); + abstract Write<T> build(); } @@ -1303,6 +1322,15 @@ public class BigQueryIO { return toBuilder().setNumFileShards(numFileShards).build(); } + /** + * Provides a custom location on GCS for storing temporary files to be loaded via BigQuery + * batch load jobs. See "Usage with templates" in {@link BigQueryIO} documentation for + * discussion. + */ + public Write<T> withCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation) { + return toBuilder().setCustomGcsTempLocation(customGcsTempLocation).build(); + } + @VisibleForTesting Write<T> withTestServices(BigQueryServices testServices) { return toBuilder().setBigQueryServices(testServices).build(); @@ -1479,7 +1507,8 @@ public class BigQueryIO { getCreateDisposition(), getJsonTableRef() != null, dynamicDestinations, - destinationCoder); + destinationCoder, + getCustomGcsTempLocation()); batchLoads.setTestServices(getBigQueryServices()); if (getMaxFilesPerBundle() != null) { batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
