Fully general dynamic tables (including schemas) in BigQueryIO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/35db7457 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/35db7457 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/35db7457 Branch: refs/heads/master Commit: 35db7457bf8c95ac693c2c36c5f8909afc3f16ab Parents: 17f0843 Author: Reuven Lax <[email protected]> Authored: Wed Apr 19 07:56:52 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Wed May 3 16:06:47 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 129 +++--- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 74 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 422 +++++++++++-------- .../sdk/io/gcp/bigquery/CalculateSchemas.java | 79 ++++ .../beam/sdk/io/gcp/bigquery/CreateTables.java | 65 ++- .../io/gcp/bigquery/DynamicDestinations.java | 178 ++++++++ .../bigquery/DynamicDestinationsHelpers.java | 192 +++++++++ .../beam/sdk/io/gcp/bigquery/PrepareWrite.java | 36 +- .../sdk/io/gcp/bigquery/StreamingInserts.java | 59 +-- .../io/gcp/bigquery/StreamingWriteTables.java | 8 + .../io/gcp/bigquery/TableDestinationCoder.java | 2 +- .../io/gcp/bigquery/WriteBundlesToFiles.java | 56 +-- .../sdk/io/gcp/bigquery/WritePartition.java | 69 ++- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 112 ++--- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 336 +++++++++++---- 15 files changed, 1241 insertions(+), 576 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 593c580..4e14696 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -21,26 +21,24 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import java.io.IOException; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; @@ -58,27 +56,31 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; /** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */ -class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> { - BigQueryIO.Write<?> write; +class BatchLoads<DestinationT> + extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> { + private BigQueryServices bigQueryServices; + private final WriteDisposition writeDisposition; + private final CreateDisposition createDisposition; + // Indicates that we are writing to a constant single table. If this is the case, we will create + // the table, even if there is no data in it. + private final boolean singletonTable; + private final DynamicDestinations<?, DestinationT> dynamicDestinations; + private final Coder<DestinationT> destinationCoder; - private static class ConstantSchemaFunction - implements SerializableFunction<TableDestination, TableSchema> { - private final @Nullable ValueProvider<String> jsonSchema; - - ConstantSchemaFunction(ValueProvider<String> jsonSchema) { - this.jsonSchema = jsonSchema; - } - - @Override - @Nullable - public TableSchema apply(TableDestination table) { - return BigQueryHelpers.fromJsonString( - jsonSchema == null ? null : jsonSchema.get(), TableSchema.class); - } + BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition, + boolean singletonTable, + DynamicDestinations<?, DestinationT> dynamicDestinations, + Coder<DestinationT> destinationCoder) { + bigQueryServices = new BigQueryServicesImpl(); + this.writeDisposition = writeDisposition; + this.createDisposition = createDisposition; + this.singletonTable = singletonTable; + this.dynamicDestinations = dynamicDestinations; + this.destinationCoder = destinationCoder; } - BatchLoads(BigQueryIO.Write<?> write) { - this.write = write; + void setTestServices(BigQueryServices bigQueryServices) { + this.bigQueryServices = bigQueryServices; } @Override @@ -88,7 +90,7 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, checkArgument( !Strings.isNullOrEmpty(tempLocation), "BigQueryIO.Write needs a GCS temp location to store temp files."); - if (write.getBigQueryServices() == null) { + if (bigQueryServices == null) { try { GcsPath.fromUri(tempLocation); } catch (IllegalArgumentException e) { @@ -102,7 +104,7 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, } @Override - public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) { + public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) { Pipeline p = input.getPipeline(); BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); @@ -137,28 +139,32 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, })) .apply(View.<String>asSingleton()); - PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow = + PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow = input.apply( "rewindowIntoGlobal", - Window.<KV<TableDestination, TableRow>>into(new GlobalWindows()) + Window.<KV<DestinationT, TableRow>>into(new GlobalWindows()) .triggering(DefaultTrigger.of()) .discardingFiredPanes()); + PCollectionView<Map<DestinationT, String>> schemasView = + inputInGlobalWindow.apply(new CalculateSchemas<>(dynamicDestinations)); // PCollection of filename, file byte size, and table destination. - PCollection<WriteBundlesToFiles.Result> results = + PCollection<WriteBundlesToFiles.Result<DestinationT>> results = inputInGlobalWindow - .apply("WriteBundlesToFiles", ParDo.of(new WriteBundlesToFiles(tempFilePrefix))) - .setCoder(WriteBundlesToFiles.ResultCoder.of()); + .apply("WriteBundlesToFiles", ParDo.of( + new WriteBundlesToFiles<DestinationT>(tempFilePrefix))) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); - TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag = - new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {}; - TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag = - new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {}; + TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag = + new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {}; + TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag = + new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {}; // Turn the list of files and record counts in a PCollectionView that can be used as a // side input. - PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = - results.apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable()); + PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView = + results.apply("ResultsView", + View.<WriteBundlesToFiles.Result<DestinationT>>asIterable()); // This transform will look at the set of files written for each table, and if any table has // too many files or bytes, will partition that table's files into multiple partitions for // loading. @@ -166,24 +172,23 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, singleton.apply( "WritePartition", ParDo.of( - new WritePartition( - write.getJsonTableRef(), - write.getTableDescription(), + new WritePartition<>( + singletonTable, resultsView, multiPartitionsTag, singlePartitionTag)) .withSideInputs(resultsView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant - // schema function here. If no schema is specified, this function will return null. - // TODO: Turn this into a side-input instead. - SerializableFunction<TableDestination, TableSchema> schemaFunction = - new ConstantSchemaFunction(write.getJsonSchema()); + List<PCollectionView<?>> writeTablesSideInputs = + Lists.newArrayList(jobIdTokenView, schemasView); + writeTablesSideInputs.addAll(dynamicDestinations.getSideInputs()); + + Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder = + KvCoder.of( + ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), + ListCoder.of(StringUtf8Coder.of())); - Coder<KV<ShardedKey<TableDestination>, List<String>>> partitionsCoder = - KvCoder.of( - ShardedKeyCoder.of(TableDestinationCoder.of()), ListCoder.of(StringUtf8Coder.of())); // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then // the import needs to be split into multiple partitions, and those partitions will be // specified in multiPartitionsTag. @@ -195,19 +200,20 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, // reexecution of the WritePartitions step once WriteTables has begun. .apply( "MultiPartitionsReshuffle", - Reshuffle.<ShardedKey<TableDestination>, List<String>>of()) + Reshuffle.<ShardedKey<DestinationT>, List<String>>of()) .apply( "MultiPartitionsWriteTables", ParDo.of( - new WriteTables( + new WriteTables<>( false, - write.getBigQueryServices(), + bigQueryServices, jobIdTokenView, + schemasView, tempFilePrefix, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, - schemaFunction)) - .withSideInputs(jobIdTokenView)); + dynamicDestinations)) + .withSideInputs(writeTablesSideInputs)); // This view maps each final table destination to the set of temporary partitioned tables // the PCollection was loaded into. @@ -218,10 +224,10 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, "WriteRename", ParDo.of( new WriteRename( - write.getBigQueryServices(), + bigQueryServices, jobIdTokenView, - write.getWriteDisposition(), - write.getCreateDisposition(), + writeDisposition, + createDisposition, tempTablesView)) .withSideInputs(tempTablesView, jobIdTokenView)); @@ -232,19 +238,20 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, // Reshuffle will distribute this among multiple workers, and also guard against // reexecution of the WritePartitions step once WriteTables has begun. .apply( - "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<TableDestination>, List<String>>of()) + "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of()) .apply( "SinglePartitionWriteTables", ParDo.of( - new WriteTables( + new WriteTables<>( true, - write.getBigQueryServices(), + bigQueryServices, jobIdTokenView, + schemasView, tempFilePrefix, - write.getWriteDisposition(), - write.getCreateDisposition(), - schemaFunction)) - .withSideInputs(jobIdTokenView)); + writeDisposition, + createDisposition, + dynamicDestinations)) + .withSideInputs(writeTablesSideInputs)); return WriteResult.in(input.getPipeline()); } http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 3850cbd..70e7a5f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -41,9 +41,7 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; -/** - * A set of helper functions and classes used by {@link BigQueryIO}. - */ +/** A set of helper functions and classes used by {@link BigQueryIO}. */ public class BigQueryHelpers { private static final String RESOURCE_NOT_FOUND_ERROR = "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline" @@ -55,9 +53,7 @@ public class BigQueryHelpers { + " an earlier stage of the pipeline, this validation can be disabled using" + " #withoutValidation."; - /** - * Status of a BigQuery job or request. - */ + /** Status of a BigQuery job or request. */ enum Status { SUCCEEDED, FAILED, @@ -65,20 +61,15 @@ public class BigQueryHelpers { } @Nullable - /** - * Return a displayable string representation for a {@link TableReference}. - */ - static ValueProvider<String> displayTable( - @Nullable ValueProvider<TableReference> table) { + /** Return a displayable string representation for a {@link TableReference}. */ + static ValueProvider<String> displayTable(@Nullable ValueProvider<TableReference> table) { if (table == null) { return null; } return NestedValueProvider.of(table, new TableRefToTableSpec()); } - /** - * Returns a canonical string representation of the {@link TableReference}. - */ + /** Returns a canonical string representation of the {@link TableReference}. */ public static String toTableSpec(TableReference ref) { StringBuilder sb = new StringBuilder(); if (ref.getProjectId() != null) { @@ -100,8 +91,8 @@ public class BigQueryHelpers { } /** - * Parse a table specification in the form - * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}. + * Parse a table specification in the form {@code "[project_id]:[dataset_id].[table_id]"} or + * {@code "[dataset_id].[table_id]"}. * * <p>If the project id is omitted, the default project id is used. */ @@ -110,7 +101,8 @@ public class BigQueryHelpers { if (!match.matches()) { throw new IllegalArgumentException( "Table reference is not in [project_id]:[dataset_id].[table_id] " - + "format: " + tableSpec); + + "format: " + + tableSpec); } TableReference ref = new TableReference(); @@ -164,8 +156,7 @@ public class BigQueryHelpers { return BigQueryIO.JSON_FACTORY.fromString(json, clazz); } catch (IOException e) { throw new RuntimeException( - String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json), - e); + String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json), e); } } @@ -178,9 +169,7 @@ public class BigQueryHelpers { return UUID.randomUUID().toString().replaceAll("-", ""); } - static void verifyTableNotExistOrEmpty( - DatasetService datasetService, - TableReference tableRef) { + static void verifyTableNotExistOrEmpty(DatasetService datasetService, TableReference tableRef) { try { if (datasetService.getTable(tableRef) != null) { checkState( @@ -193,8 +182,7 @@ public class BigQueryHelpers { Thread.currentThread().interrupt(); } throw new RuntimeException( - "unable to confirm BigQuery table emptiness for table " - + toTableSpec(tableRef), e); + "unable to confirm BigQuery table emptiness for table " + toTableSpec(tableRef), e); } } @@ -206,12 +194,12 @@ public class BigQueryHelpers { if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { throw new IllegalArgumentException( String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", toTableSpec(table)), e); - } else if (e instanceof RuntimeException) { + } else if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new RuntimeException( - String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", - toTableSpec(table)), + String.format( + UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)), e); } } @@ -225,12 +213,13 @@ public class BigQueryHelpers { if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { throw new IllegalArgumentException( String.format(RESOURCE_NOT_FOUND_ERROR, "table", toTableSpec(table)), e); - } else if (e instanceof RuntimeException) { + } else if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new RuntimeException( - String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", - toTableSpec(table)), e); + String.format( + UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", toTableSpec(table)), + e); } } } @@ -248,40 +237,42 @@ public class BigQueryHelpers { } @VisibleForTesting - static class JsonSchemaToTableSchema - implements SerializableFunction<String, TableSchema> { + static class JsonSchemaToTableSchema implements SerializableFunction<String, TableSchema> { @Override public TableSchema apply(String from) { return fromJsonString(from, TableSchema.class); } } - static class TableSchemaToJsonSchema - implements SerializableFunction<TableSchema, String> { + static class TableSchemaToJsonSchema implements SerializableFunction<TableSchema, String> { @Override public String apply(TableSchema from) { return toJsonString(from); } } - static class JsonTableRefToTableRef - implements SerializableFunction<String, TableReference> { + static class JsonTableRefToTableRef implements SerializableFunction<String, TableReference> { @Override public TableReference apply(String from) { return fromJsonString(from, TableReference.class); } } - static class TableRefToTableSpec - implements SerializableFunction<TableReference, String> { + static class JsonTableRefToTableSpec implements SerializableFunction<String, String> { + @Override + public String apply(String from) { + return toTableSpec(fromJsonString(from, TableReference.class)); + } + } + + static class TableRefToTableSpec implements SerializableFunction<TableReference, String> { @Override public String apply(TableReference from) { return toTableSpec(from); } } - static class TableRefToJson - implements SerializableFunction<TableReference, String> { + static class TableRefToJson implements SerializableFunction<TableReference, String> { @Override public String apply(TableReference from) { return toJsonString(from); @@ -289,8 +280,7 @@ public class BigQueryHelpers { } @VisibleForTesting - static class TableSpecToTableRef - implements SerializableFunction<String, TableReference> { + static class TableSpecToTableRef implements SerializableFunction<String, TableReference> { @Override public TableReference apply(String from) { return parseTableSpec(from); http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 2ff5cd7..29491d8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -32,15 +32,18 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicates; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; - +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; - +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; @@ -52,6 +55,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSche import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -68,6 +74,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,12 +93,11 @@ import org.slf4j.LoggerFactory; * <li>{@code tableId}: a table id, unique within a dataset. * </ul> * - * <p>BigQuery table references are stored as a {@link TableReference}, which comes - * from the <a href="https://cloud.google.com/bigquery/client-libraries"> - * BigQuery Java Client API</a>. - * Tables can be referred to as Strings, with or without the {@code projectId}. - * A helper function is provided ({@link BigQueryHelpers#parseTableSpec(String)}) - * that parses the following string forms into a {@link TableReference}: + * <p>BigQuery table references are stored as a {@link TableReference}, which comes from the <a + * href="https://cloud.google.com/bigquery/client-libraries">BigQuery Java Client API</a>. Tables + * can be referred to as Strings, with or without the {@code projectId}. A helper function is + * provided ({@link BigQueryHelpers#parseTableSpec(String)}) that parses the following string forms + * into a {@link TableReference}: * * <ul> * <li>[{@code project_id}]:[{@code dataset_id}].[{@code table_id}] @@ -102,6 +108,7 @@ import org.slf4j.LoggerFactory; * * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation. * This produces a {@link PCollection} of {@link TableRow TableRows} as output: + * * <pre>{@code * PCollection<TableRow> weatherData = pipeline.apply( * BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations")); @@ -129,6 +136,7 @@ import org.slf4j.LoggerFactory; * {@link BigQueryIO#write()}. When using a user-defined type, a function must be provided to * turn this type into a {@link TableRow} using * {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)}. + * * <pre>{@code * PCollection<TableRow> quotes = ... * @@ -143,19 +151,18 @@ import org.slf4j.LoggerFactory; * .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); * }</pre> * - * <p>See {@link BigQueryIO.Write} for details on how to specify if a write should - * append to an existing table, replace the table, or verify that the table is - * empty. Note that the dataset being written to must already exist. Unbounded PCollections can only - * be written using {@link Write.WriteDisposition#WRITE_EMPTY} or - * {@link Write.WriteDisposition#WRITE_APPEND}. + * <p>See {@link BigQueryIO.Write} for details on how to specify if a write should append to an + * existing table, replace the table, or verify that the table is empty. Note that the dataset being + * written to must already exist. Unbounded PCollections can only be written using {@link + * Write.WriteDisposition#WRITE_EMPTY} or {@link Write.WriteDisposition#WRITE_APPEND}. * * <h3>Sharding BigQuery output tables</h3> * - * <p>A common use case is to dynamically generate BigQuery table names based on - * the current window or the current value. To support this, - * {@link BigQueryIO.Write#to(SerializableFunction)} - * accepts a function mapping the current element to a tablespec. For example, - * here's code that outputs daily tables to BigQuery: + * <p>A common use case is to dynamically generate BigQuery table names based on the current window + * or the current value. To support this, {@link BigQueryIO.Write#to(SerializableFunction)} accepts + * a function mapping the current element to a tablespec. For example, here's code that outputs + * daily tables to BigQuery: + * * <pre>{@code * PCollection<TableRow> quotes = ... * quotes.apply(Window.<TableRow>into(CalendarWindows.days(1))) @@ -173,20 +180,31 @@ import org.slf4j.LoggerFactory; * }</pre> * * <p>Note that this also allows the table to be a function of the element as well as the current - * pane, in the case of triggered windows. In this case it might be convenient to call - * {@link BigQueryIO#write()} directly instead of using the {@link BigQueryIO#writeTableRows()} - * helper. This will allow the mapping function to access the element of the user-defined type. - * In this case, a formatting function must be specified using - * {@link BigQueryIO.Write#withFormatFunction} to convert each element into a {@link TableRow} - * object. + * pane, in the case of triggered windows. In this case it might be convenient to call {@link + * BigQueryIO#write()} directly instead of using the {@link BigQueryIO#writeTableRows()} helper. + * This will allow the mapping function to access the element of the user-defined type. In this + * case, a formatting function must be specified using {@link BigQueryIO.Write#withFormatFunction} + * to convert each element into a {@link TableRow} object. * - * <p>Per-value tables currently do not perform well in batch mode. + * <p>Per-table schemas can also be provided using {@link BigQueryIO.Write#withSchemaFromView}. This + * allows you the schemas to be calculated based on a previous pipeline stage or statically via a + * {@link org.apache.beam.sdk.transforms.Create} transform. This method expects to receive a + * map-valued {@link PCollectionView}, mapping table specifications (project:dataset.table-id), to + * JSON formatted {@link TableSchema} objects. All destination tables must be present in this map, + * or the pipeline will fail to create tables. Care should be taken if the map value is based on a + * triggered aggregation over and unbounded {@link PCollection}; the side input will contain the + * entire history of all table schemas ever generated, which might blow up memory usage. This method + * can also be useful when writing to a single table, as it allows a previous stage to calculate the + * schema (possibly based on the full collection of records being written to BigQuery). + * + * <p>For the most general form of dynamic table destinations and schemas, look at + * {@link BigQueryIO.Write#to(DynamicDestinations)}. * * <h3>Permissions</h3> * * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the - * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for - * more details. + * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more + * details. * * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control * </a> for security and permission related information specific to BigQuery. @@ -195,8 +213,7 @@ public class BigQueryIO { private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class); /** - * Singleton instance of the JSON factory used to read and write JSON - * formatted rows. + * Singleton instance of the JSON factory used to read and write JSON formatted rows. */ static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); @@ -232,13 +249,13 @@ public class BigQueryIO { * A formatting function that maps a TableRow to itself. This allows sending a * {@code PCollection<TableRow>} directly to BigQueryIO.Write. */ - static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = + static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = new SerializableFunction<TableRow, TableRow>() { - @Override - public TableRow apply(TableRow input) { - return input; - } - }; + @Override + public TableRow apply(TableRow input) { + return input; + } + }; /** * A {@link PTransform} that reads from a BigQuery table and returns a @@ -246,6 +263,7 @@ public class BigQueryIO { * * <p>Each {@link TableRow} contains values indexed by column name. Here is a * sample processing function that processes a "line" column from rows: + * * <pre>{@code * static class ExtractWordsFn extends DoFn<TableRow, String> { * public void processElement(ProcessContext c) { @@ -258,7 +276,8 @@ public class BigQueryIO { * } * } * } - * }}</pre> + * } + * }</pre> */ public static Read read() { return new AutoValue_BigQueryIO_Read.Builder() @@ -276,7 +295,6 @@ public class BigQueryIO { @Nullable abstract Boolean getFlattenResults(); @Nullable abstract Boolean getUseLegacySql(); abstract BigQueryServices getBigQueryServices(); - abstract Builder toBuilder(); @AutoValue.Builder @@ -287,7 +305,6 @@ public class BigQueryIO { abstract Builder setFlattenResults(Boolean flattenResults); abstract Builder setUseLegacySql(Boolean useLegacySql); abstract Builder setBigQueryServices(BigQueryServices bigQueryServices); - abstract Read build(); } @@ -305,28 +322,26 @@ public class BigQueryIO { return from(StaticValueProvider.of(tableSpec)); } - /** - * Same as {@code from(String)}, but with a {@link ValueProvider}. - */ + /** Same as {@code from(String)}, but with a {@link ValueProvider}. */ public Read from(ValueProvider<String> tableSpec) { ensureFromNotCalledYet(); return toBuilder() .setJsonTableRef( NestedValueProvider.of( NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), - new TableRefToJson())).build(); + new TableRefToJson())) + .build(); } /** * Reads results received after executing the given query. * - * <p>By default, the query results will be flattened -- see - * "flattenResults" in the <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs"> - * Jobs documentation</a> for more information. To disable flattening, use - * {@link BigQueryIO.Read#withoutResultFlattening}. + * <p>By default, the query results will be flattened -- see "flattenResults" in the <a + * href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">Jobs documentation</a> for + * more information. To disable flattening, use {@link BigQueryIO.Read#withoutResultFlattening}. * - * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery - * Standard SQL dialect, use {@link BigQueryIO.Read#usingStandardSql}. + * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery Standard + * SQL dialect, use {@link BigQueryIO.Read#usingStandardSql}. */ public Read fromQuery(String query) { return fromQuery(StaticValueProvider.of(query)); @@ -352,17 +367,16 @@ public class BigQueryIO { + " pipeline, This validation can be disabled using #withoutValidation."; /** - * Disable validation that the table exists or the query succeeds prior to pipeline - * submission. Basic validation (such as ensuring that a query or table is specified) still - * occurs. + * Disable validation that the table exists or the query succeeds prior to pipeline submission. + * Basic validation (such as ensuring that a query or table is specified) still occurs. */ public Read withoutValidation() { return toBuilder().setValidate(false).build(); } /** - * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs"> - * flattening of query results</a>. + * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">flattening of + * query results</a>. * * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading * from a table will cause an error during validation. @@ -441,8 +455,8 @@ public class BigQueryIO { // Note that a table or query check can fail if the table or dataset are created by // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline. // For these cases the withoutValidation method can be used to disable the check. - if (getValidate() && table != null && table.isAccessible() && table.get().getProjectId() - != null) { + if (getValidate() && table != null && table.isAccessible() + && table.get().getProjectId() != null) { checkState(table.isAccessible(), "Cannot call validate if table is dynamically set."); // Check for source table presence for early failure notification. DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); @@ -543,15 +557,15 @@ public class BigQueryIO { super.populateDisplayData(builder); builder .addIfNotNull(DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider())) - .withLabel("Table")) + .withLabel("Table")) .addIfNotNull(DisplayData.item("query", getQuery()) - .withLabel("Query")) + .withLabel("Query")) .addIfNotNull(DisplayData.item("flattenResults", getFlattenResults()) - .withLabel("Flatten Query Results")) + .withLabel("Flatten Query Results")) .addIfNotNull(DisplayData.item("useLegacySql", getUseLegacySql()) - .withLabel("Use Legacy SQL Dialect")) + .withLabel("Use Legacy SQL Dialect")) .addIfNotDefault(DisplayData.item("validation", getValidate()) - .withLabel("Validation Enabled"), + .withLabel("Validation Enabled"), true); } @@ -563,9 +577,7 @@ public class BigQueryIO { return getJsonTableRef() == null ? null : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef()); } - /** - * Returns the table to read, or {@code null} if reading from a query instead. - */ + /** Returns the table to read, or {@code null} if reading from a query instead. */ @Nullable public TableReference getTable() { ValueProvider<TableReference> provider = getTableProvider(); @@ -583,9 +595,10 @@ public class BigQueryIO { List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts(); if (counts.size() != 1) { String errorMessage = (counts.size() == 0 - ? "No destination uri file count received." - : String.format("More than one destination uri file count received. First two are %s, %s", - counts.get(0), counts.get(1))); + ? "No destination uri file count received." + : String.format( + "More than one destination uri file count received. First two are %s, %s", + counts.get(0), counts.get(1))); throw new RuntimeException(errorMessage); } long filesCount = counts.get(0); @@ -616,11 +629,12 @@ public class BigQueryIO { * or else the transform may fail at runtime with an {@link IllegalArgumentException}. * * <p>By default, writes require an empty table, which corresponds to - * a {@link Write.WriteDisposition#WRITE_EMPTY} disposition that matches the - * default of BigQuery's Jobs API. + * a {@link Write.WriteDisposition#WRITE_EMPTY} disposition that matches the default of + * BigQuery's Jobs API. + * + * <p>Here is a sample transform that produces TableRow values containing "word" and "count" + * columns: * - * <p>Here is a sample transform that produces TableRow values containing - * "word" and "count" columns: * <pre>{@code * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> { * public void processElement(ProcessContext c) { @@ -629,7 +643,8 @@ public class BigQueryIO { * .set("count", c.element().getValue().intValue()); * c.output(row); * } - * }}</pre> + * } + * }</pre> */ public static <T> Write<T> write() { return new AutoValue_BigQueryIO_Write.Builder<T>() @@ -642,8 +657,8 @@ public class BigQueryIO { } /** - * A {@link PTransform} that writes a {@link PCollection} containing {@link TableRow TableRows} - * to a BigQuery table. + * A {@link PTransform} that writes a {@link PCollection} containing {@link TableRow TableRows} to + * a BigQuery table. */ public static Write<TableRow> writeTableRows() { return BigQueryIO.<TableRow>write().withFormatFunction(IDENTITY_FORMATTER); @@ -671,7 +686,8 @@ public class BigQueryIO { @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination> getTableFunction(); @Nullable abstract SerializableFunction<T, TableRow> getFormatFunction(); - /** Table schema. The schema is required only if the table does not exist. */ + @Nullable abstract DynamicDestinations<T, ?> getDynamicDestinations(); + @Nullable abstract PCollectionView<Map<String, String>> getSchemaFromView(); @Nullable abstract ValueProvider<String> getJsonSchema(); abstract CreateDisposition getCreateDisposition(); abstract WriteDisposition getWriteDisposition(); @@ -688,8 +704,9 @@ public class BigQueryIO { abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef); abstract Builder<T> setTableFunction( SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction); - abstract Builder<T> setFormatFunction( - SerializableFunction<T, TableRow> formatFunction); + abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction); + abstract Builder<T> setDynamicDestinations(DynamicDestinations<T, ?> dynamicDestinations); + abstract Builder<T> setSchemaFromView(PCollectionView<Map<String, String>> view); abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema); abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition); abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition); @@ -703,8 +720,9 @@ public class BigQueryIO { /** * An enumeration type for the BigQuery create disposition strings. * - * @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition"> - * <code>configuration.query.createDisposition</code> in the BigQuery Jobs API</a> + * @see + * <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition"> + * <code>configuration.query.createDisposition</code> in the BigQuery Jobs API</a> */ public enum CreateDisposition { /** @@ -715,17 +733,15 @@ public class BigQueryIO { CREATE_NEVER, /** - * Specifies that tables should be created if needed. This is the default - * behavior. + * Specifies that tables should be created if needed. This is the default behavior. * * <p>Requires that a table schema is provided via {@link BigQueryIO.Write#withSchema}. - * This precondition is checked before starting a job. The schema is - * not required to match an existing table's schema. + * This precondition is checked before starting a job. The schema is not required to match an + * existing table's schema. * - * <p>When this transformation is executed, if the output table does not - * exist, the table is created from the provided schema. Note that even if - * the table exists, it may be recreated if necessary when paired with a - * {@link WriteDisposition#WRITE_TRUNCATE}. + * <p>When this transformation is executed, if the output table does not exist, the table is + * created from the provided schema. Note that even if the table exists, it may be recreated + * if necessary when paired with a {@link WriteDisposition#WRITE_TRUNCATE}. */ CREATE_IF_NEEDED } @@ -733,50 +749,39 @@ public class BigQueryIO { /** * An enumeration type for the BigQuery write disposition strings. * - * @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition"> - * <code>configuration.query.writeDisposition</code> in the BigQuery Jobs API</a> + * @see <a + * href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition"> + * <code>configuration.query.writeDisposition</code> in the BigQuery Jobs API</a> */ public enum WriteDisposition { /** * Specifies that write should replace a table. * - * <p>The replacement may occur in multiple steps - for instance by first - * removing the existing table, then creating a replacement, then filling - * it in. This is not an atomic operation, and external programs may - * see the table in any of these intermediate steps. + * <p>The replacement may occur in multiple steps - for instance by first removing the + * existing table, then creating a replacement, then filling it in. This is not an atomic + * operation, and external programs may see the table in any of these intermediate steps. */ WRITE_TRUNCATE, - /** - * Specifies that rows may be appended to an existing table. - */ + /** Specifies that rows may be appended to an existing table. */ WRITE_APPEND, /** - * Specifies that the output table must be empty. This is the default - * behavior. + * Specifies that the output table must be empty. This is the default behavior. * * <p>If the output table is not empty, the write fails at runtime. * - * <p>This check may occur long before data is written, and does not - * guarantee exclusive access to the table. If two programs are run - * concurrently, each specifying the same output table and - * a {@link WriteDisposition} of {@link WriteDisposition#WRITE_EMPTY}, it is possible - * for both to succeed. + * <p>This check may occur long before data is written, and does not guarantee exclusive + * access to the table. If two programs are run concurrently, each specifying the same output + * table and a {@link WriteDisposition} of {@link WriteDisposition#WRITE_EMPTY}, it is + * possible for both to succeed. */ WRITE_EMPTY } - /** Ensures that methods of the to() family are called at most once. */ - private void ensureToNotCalledYet() { - checkState( - getJsonTableRef() == null && getTable() == null - && getTableFunction() == null, "to() already called"); - } - /** - * Writes to the given table, specified in the format described in - * {@link BigQueryHelpers#parseTableSpec}. + * Writes to the given table, specified in the format described in {@link + * BigQueryHelpers#parseTableSpec}. */ public Write<T> to(String tableSpec) { return to(StaticValueProvider.of(tableSpec)); @@ -789,14 +794,11 @@ public class BigQueryIO { /** Same as {@link #to(String)}, but with a {@link ValueProvider}. */ public Write<T> to(ValueProvider<String> tableSpec) { - ensureToNotCalledYet(); - String tableDescription = getTableDescription(); return toBuilder() .setJsonTableRef( NestedValueProvider.of( NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), new TableRefToJson())) - .setTableFunction(new ConstantTableFunction<T>(tableSpec, tableDescription)) .build(); } @@ -806,18 +808,14 @@ public class BigQueryIO { */ public Write<T> to( SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) { - ensureToNotCalledYet(); return toBuilder().setTableFunction(tableFunction).build(); } /** - * Like {@link BigQueryIO.Write#to(SerializableFunction)}, but the function returns a - * {@link TableReference} instead of a string table specification. + * Writes to the table and schema specified by the {@link DynamicDestinations} object. */ - private Write<T> toTableReference( - SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) { - ensureToNotCalledYet(); - return toBuilder().setTableFunction(tableFunction).build(); + public Write<T> to(DynamicDestinations<T, ?> dynamicDestinations) { + return toBuilder().setDynamicDestinations(dynamicDestinations).build(); } /** @@ -827,42 +825,45 @@ public class BigQueryIO { return toBuilder().setFormatFunction(formatFunction).build(); } - static class ConstantTableFunction<T> implements - SerializableFunction<ValueInSingleWindow<T>, TableDestination> { - private final ValueProvider<String> tableSpec; - private final String tableDescription; + /** + * Uses the specified schema for rows to be written. + * + * <p>The schema is <i>required</i> only if writing to a table that does not already exist, and + * {@link CreateDisposition} is set to {@link CreateDisposition#CREATE_IF_NEEDED}. + */ + public Write<T> withSchema(TableSchema schema) { + return withJsonSchema(StaticValueProvider.of(BigQueryHelpers.toJsonString(schema))); + } - ConstantTableFunction(ValueProvider<String> tableSpec, String tableDescription) { - this.tableSpec = tableSpec; - this.tableDescription = tableDescription; - } + /** Same as {@link #withSchema(TableSchema)} but using a deferred {@link ValueProvider}. */ + public Write<T> withSchema(ValueProvider<TableSchema> schema) { + return withJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema())); + } - @Override - public TableDestination apply(ValueInSingleWindow<T> value) { - return new TableDestination(tableSpec.get(), tableDescription); - } + /** + * Similar to {@link #withSchema(TableSchema)} but takes in a JSON-serialized {@link + * TableSchema}. + */ + public Write<T> withJsonSchema(String jsonSchema) { + return withJsonSchema(StaticValueProvider.of(jsonSchema)); } /** - * Uses the specified schema for rows to be written. - * - * <p>The schema is <i>required</i> only if writing to a table that does not already - * exist, and {@link CreateDisposition} is set to - * {@link CreateDisposition#CREATE_IF_NEEDED}. + * Same as {@link #withJsonSchema(String)} but using a deferred {@link ValueProvider}. */ - public Write<T> withSchema(TableSchema schema) { - return toBuilder() - .setJsonSchema(StaticValueProvider.of(BigQueryHelpers.toJsonString(schema))) - .build(); + public Write<T> withJsonSchema(ValueProvider<String> jsonSchema) { + return toBuilder().setJsonSchema(jsonSchema).build(); } /** - * Use the specified schema for rows to be written. + * Allows the schemas for each table to be computed within the pipeline itself. + * + * <p>The input is a map-valued {@link PCollectionView} mapping string tablespecs to + * JSON-formatted {@link TableSchema}s. Tablespecs must be in the same format as taken by + * {@link #to(String)}. */ - public Write<T> withSchema(ValueProvider<TableSchema> schema) { - return toBuilder() - .setJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema())) - .build(); + public Write<T> withSchemaFromView(PCollectionView<Map<String, String>> view) { + return toBuilder().setSchemaFromView(view).build(); } /** Specifies whether the table should be created if it does not exist. */ @@ -895,23 +896,38 @@ public class BigQueryIO { BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class); // We must have a destination to write to! - checkState(getTableFunction() != null, + checkState( + getTableFunction() != null || getJsonTableRef() != null + || getDynamicDestinations() != null, "must set the table reference of a BigQueryIO.Write transform"); checkArgument(getFormatFunction() != null, - "A function must be provided to convert type into a TableRow. " - + "use BigQueryIO.Write.withFormatFunction to provide a formatting function."); + "A function must be provided to convert type into a TableRow. " + + "use BigQueryIO.Write.withFormatFunction to provide a formatting function."); // Require a schema if creating one or more tables. - checkArgument( - getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED || getJsonSchema() != null, + checkArgument(getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED + || getJsonSchema() != null + || getDynamicDestinations() != null + || getSchemaFromView() != null, "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); + List<?> allToArgs = Lists.newArrayList(getJsonTableRef(), getTableFunction(), + getDynamicDestinations()); + checkArgument(1 + == Iterables.size(Iterables.filter(allToArgs, Predicates.notNull())), + "Exactly one of jsonTableRef, tableFunction, or " + "dynamicDestinations must be set"); + + List<?> allSchemaArgs = Lists.newArrayList(getJsonSchema(), getSchemaFromView(), + getDynamicDestinations()); + checkArgument(2 + > Iterables.size(Iterables.filter(allSchemaArgs, Predicates.notNull())), + "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may " + + "be set"); + // The user specified a table. - if (getJsonTableRef() != null && getValidate()) { + if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) { TableReference table = getTableWithDefaultProject(options).get(); - // TODO: This seems wrong - what if the ValueProvider is not accessible? - DatasetService datasetService = getBigQueryServices().getDatasetService(options); // Check for destination table presence and emptiness for early failure notification. // Note that a presence check can fail when the table or dataset is created by an earlier @@ -929,26 +945,66 @@ public class BigQueryIO { @Override public WriteResult expand(PCollection<T> input) { - validate(input.getPipeline().getOptions()); - PCollection<KV<TableDestination, TableRow>> rowsWithDestination = - input.apply("PrepareWrite", new PrepareWrite<T>( - getTableFunction(), getFormatFunction())) - .setCoder(KvCoder.of(TableDestinationCoder.of(), TableRowJsonCoder.of())); + DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations(); + if (dynamicDestinations == null) { + if (getJsonTableRef() != null) { + dynamicDestinations = + DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef( + getJsonTableRef(), getTableDescription()); + } else if (getTableFunction() != null) { + dynamicDestinations = new TableFunctionDestinations(getTableFunction()); + } + // Wrap with a DynamicDestinations class that will provide a schema. There might be no + // schema provided if the create disposition is CREATE_NEVER. + if (getJsonSchema() != null) { + dynamicDestinations = + new ConstantSchemaDestinations(dynamicDestinations, getJsonSchema()); + } else if (getSchemaFromView() != null) { + dynamicDestinations = + new SchemaFromViewDestinations(dynamicDestinations, getSchemaFromView()); + } + } + return expandTyped(input, dynamicDestinations); + } - // When writing an Unbounded PCollection, or when a tablespec function is defined, we use - // StreamingInserts and BigQuery's streaming import API. + private <DestinationT> WriteResult expandTyped( + PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) { + Coder<DestinationT> destinationCoder = null; + try { + destinationCoder = dynamicDestinations.getDestinationCoderWithDefault( + input.getPipeline().getCoderRegistry()); + } catch (CannotProvideCoderException e) { + throw new RuntimeException(e); + } + + PCollection<KV<DestinationT, TableRow>> rowsWithDestination = + input + .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, getFormatFunction())) + .setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of())); + + // When writing an Unbounded PCollection, we use StreamingInserts and BigQuery's streaming + // import API. if (input.isBounded() == IsBounded.UNBOUNDED) { checkArgument( getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE, "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded" + " PCollection."); - - return rowsWithDestination.apply(new StreamingInserts(this)); + StreamingInserts<DestinationT> streamingInserts = + new StreamingInserts<>(getCreateDisposition(), dynamicDestinations); + streamingInserts.setTestServices(getBigQueryServices()); + return rowsWithDestination.apply(streamingInserts); } else { - return rowsWithDestination.apply(new BatchLoads(this)); + BatchLoads<DestinationT> batchLoads = new BatchLoads<>( + getWriteDisposition(), + getCreateDisposition(), + getJsonTableRef() != null, + dynamicDestinations, + destinationCoder); + batchLoads.setTestServices(getBigQueryServices()); + return rowsWithDestination.apply(batchLoads); } } @@ -961,32 +1017,28 @@ public class BigQueryIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("table", getJsonTableRef()) - .withLabel("Table Reference")) - .addIfNotNull(DisplayData.item("schema", getJsonSchema()) - .withLabel("Table Schema")); + builder.addIfNotNull(DisplayData.item("table", getJsonTableRef()) + .withLabel("Table Reference")); + if (getJsonSchema() != null) { + builder.addIfNotNull(DisplayData.item("schema", getJsonSchema()).withLabel("Table Schema")); + } else { + builder.add(DisplayData.item("schema", "Custom Schema Function").withLabel("Table Schema")); + } if (getTableFunction() != null) { builder.add(DisplayData.item("tableFn", getTableFunction().getClass()) - .withLabel("Table Reference Function")); + .withLabel("Table Reference Function")); } builder .add(DisplayData.item("createDisposition", getCreateDisposition().toString()) - .withLabel("Table CreateDisposition")) + .withLabel("Table CreateDisposition")) .add(DisplayData.item("writeDisposition", getWriteDisposition().toString()) - .withLabel("Table WriteDisposition")) + .withLabel("Table WriteDisposition")) .addIfNotDefault(DisplayData.item("validation", getValidate()) - .withLabel("Validation Enabled"), true) + .withLabel("Validation Enabled"), true) .addIfNotDefault(DisplayData.item("tableDescription", getTableDescription()) - .withLabel("Table Description"), ""); - } - - /** Returns the table schema. */ - public TableSchema getSchema() { - return BigQueryHelpers.fromJsonString( - getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class); + .withLabel("Table Description"), ""); } /** @@ -994,7 +1046,8 @@ public class BigQueryIO { * * <p>If the table's project is not specified, use the executing project. */ - @Nullable ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) { + @Nullable + ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) { ValueProvider<TableReference> table = getTable(); if (table == null) { return table; @@ -1002,7 +1055,7 @@ public class BigQueryIO { if (!table.isAccessible()) { LOG.info("Using a dynamic value for table input. This must contain a project" - + " in the table reference: {}", table); + + " in the table reference: {}", table); return table; } if (Strings.isNullOrEmpty(table.get().getProjectId())) { @@ -1022,8 +1075,6 @@ public class BigQueryIO { return getJsonTableRef() == null ? null : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef()); } - - } /** @@ -1038,5 +1089,4 @@ public class BigQueryIO { /** Disallow construction of utility class. */ private BigQueryIO() {} - } http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java new file mode 100644 index 0000000..db172dc --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CalculateSchemas.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.transforms.Distinct; +import org.apache.beam.sdk.transforms.DoFn; + +import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Compute the mapping of destinations to json-formatted schema objects. */ +class CalculateSchemas<DestinationT> + extends PTransform< + PCollection<KV<DestinationT, TableRow>>, PCollectionView<Map<DestinationT, String>>> { + private static final Logger LOG = LoggerFactory.getLogger(CalculateSchemas.class); + + private final DynamicDestinations<?, DestinationT> dynamicDestinations; + + public CalculateSchemas(DynamicDestinations<?, DestinationT> dynamicDestinations) { + this.dynamicDestinations = dynamicDestinations; + } + + @Override + public PCollectionView<Map<DestinationT, String>> expand( + PCollection<KV<DestinationT, TableRow>> input) { + List<PCollectionView<?>> sideInputs = Lists.newArrayList(); + sideInputs.addAll(dynamicDestinations.getSideInputs()); + + return input + .apply("Keys", Keys.<DestinationT>create()) + .apply("Distinct Keys", Distinct.<DestinationT>create()) + .apply( + "GetSchemas", + ParDo.of( + new DoFn<DestinationT, KV<DestinationT, String>>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + dynamicDestinations.setSideInputAccessorFromProcessContext(c); + TableSchema tableSchema = dynamicDestinations.getSchema(c.element()); + if (tableSchema != null) { + // If the createDisposition is CREATE_NEVER, then there's no need for a + // schema, and getSchema might return null. In this case, we simply + // leave it out of the map. + c.output(KV.of(c.element(), BigQueryHelpers.toJsonString(tableSchema))); + } + } + }) + .withSideInputs(sideInputs)) + .apply("asMap", View.<DestinationT, String>asMap()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index a377af7..210a072 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -22,8 +22,11 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -32,20 +35,20 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; /** * Creates any tables needed before performing streaming writes to the tables. This is a side-effect * {@link DoFn}, and returns the original collection unchanged. */ -public class CreateTables +public class CreateTables<DestinationT> extends PTransform< - PCollection<KV<TableDestination, TableRow>>, PCollection<KV<TableDestination, TableRow>>> { + PCollection<KV<DestinationT, TableRow>>, PCollection<KV<TableDestination, TableRow>>> { private final CreateDisposition createDisposition; private final BigQueryServices bqServices; - private final SerializableFunction<TableDestination, TableSchema> schemaFunction; + private final DynamicDestinations<?, DestinationT> dynamicDestinations; /** * The list of tables created so far, so we don't try the creation each time. @@ -57,40 +60,59 @@ public class CreateTables public CreateTables( CreateDisposition createDisposition, - SerializableFunction<TableDestination, TableSchema> schemaFunction) { - this(createDisposition, new BigQueryServicesImpl(), schemaFunction); + DynamicDestinations<?, DestinationT> dynamicDestinations) { + this(createDisposition, new BigQueryServicesImpl(), dynamicDestinations); } private CreateTables( CreateDisposition createDisposition, BigQueryServices bqServices, - SerializableFunction<TableDestination, TableSchema> schemaFunction) { + DynamicDestinations<?, DestinationT> dynamicDestinations) { this.createDisposition = createDisposition; this.bqServices = bqServices; - this.schemaFunction = schemaFunction; + this.dynamicDestinations = dynamicDestinations; } - CreateTables withTestServices(BigQueryServices bqServices) { - return new CreateTables(createDisposition, bqServices, schemaFunction); + CreateTables<DestinationT> withTestServices(BigQueryServices bqServices) { + return new CreateTables<DestinationT>(createDisposition, bqServices, dynamicDestinations); } @Override public PCollection<KV<TableDestination, TableRow>> expand( - PCollection<KV<TableDestination, TableRow>> input) { + PCollection<KV<DestinationT, TableRow>> input) { + List<PCollectionView<?>> sideInputs = Lists.newArrayList(); + sideInputs.addAll(dynamicDestinations.getSideInputs()); + return input.apply( ParDo.of( - new DoFn<KV<TableDestination, TableRow>, KV<TableDestination, TableRow>>() { - @ProcessElement - public void processElement(ProcessContext context) - throws InterruptedException, IOException { - BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); - possibleCreateTable(options, context.element().getKey()); - context.output(context.element()); - } - })); + new DoFn<KV<DestinationT, TableRow>, KV<TableDestination, TableRow>>() { + @ProcessElement + public void processElement(ProcessContext context) + throws InterruptedException, IOException { + dynamicDestinations.setSideInputAccessorFromProcessContext(context); + TableDestination tableDestination = + dynamicDestinations.getTable(context.element().getKey()); + TableReference tableReference = tableDestination.getTableReference(); + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId( + context.getPipelineOptions().as(BigQueryOptions.class).getProject()); + tableDestination = + new TableDestination( + tableReference, tableDestination.getTableDescription()); + } + TableSchema tableSchema = + dynamicDestinations.getSchema(context.element().getKey()); + BigQueryOptions options = + context.getPipelineOptions().as(BigQueryOptions.class); + possibleCreateTable(options, tableDestination, tableSchema); + context.output(KV.of(tableDestination, context.element().getValue())); + } + }) + .withSideInputs(sideInputs)); } - private void possibleCreateTable(BigQueryOptions options, TableDestination tableDestination) + private void possibleCreateTable( + BigQueryOptions options, TableDestination tableDestination, TableSchema tableSchema) throws InterruptedException, IOException { String tableSpec = tableDestination.getTableSpec(); TableReference tableReference = tableDestination.getTableReference(); @@ -102,7 +124,6 @@ public class CreateTables // every thread from attempting a create and overwhelming our BigQuery quota. DatasetService datasetService = bqServices.getDatasetService(options); if (!createdTables.contains(tableSpec)) { - TableSchema tableSchema = schemaFunction.apply(tableDestination); if (datasetService.getTable(tableReference) == null) { datasetService.createTable( new Table() http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java new file mode 100644 index 0000000..dc8bcff --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ValueInSingleWindow; + +/** + * This class provides the most general way of specifying dynamic BigQuery table destinations. + * Destinations can be extracted from the input element, and stored as a custom type. Mappings are + * provided to convert the destination into a BigQuery table reference and a BigQuery schema. The + * class can read side inputs while performing these mappings. + * + * <p>For example, consider a PCollection of events, each containing a user-id field. You want to + * write each user's events to a separate table with a separate schema per user. Since the user-id + * field is a string, you will represent the destination as a string. + * + * <pre>{@code + * events.apply(BigQueryIO.<UserEvent>write() + * .to(new DynamicDestinations<UserEvent, String>() { + * public String getDestination(ValueInSingleWindow<String> element) { + * return element.getValue().getUserId(); + * } + * public TableDestination getTable(String user) { + * return new TableDestination(tableForUser(user), "Table for user " + user); + * } + * public TableSchema getSchema(String user) { + * return tableSchemaForUser(user); + * } + * }) + * .withFormatFunction(new SerializableFunction<UserEvent, TableRow>() { + * public TableRow apply(UserEvent event) { + * return convertUserEventToTableRow(event); + * } + * })); + * }</pre> + * + * <p>An instance of {@link DynamicDestinations} can also use side inputs using {@link + * #sideInput(PCollectionView)}. The side inputs must be present in {@link #getSideInputs()}. + * Side inputs are accessed in the global window, so they must be globally windowed. + * + * <p>{@code DestinationT} is expected to provide proper hash and equality members. Ideally it will + * be a compact type with an efficient coder, as these objects may be used as a key in a {@link + * org.apache.beam.sdk.transforms.GroupByKey}. + */ +public abstract class DynamicDestinations<T, DestinationT> implements Serializable { + interface SideInputAccessor { + <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view); + } + + private SideInputAccessor sideInputAccessor; + + static class SideInputAccessorViaProcessContext implements SideInputAccessor { + private DoFn<?, ?>.ProcessContext processContext; + + SideInputAccessorViaProcessContext(DoFn<?, ?>.ProcessContext processContext) { + this.processContext = processContext; + } + + @Override + public <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) { + return processContext.sideInput(view); + } + } + + /** + * Specifies that this object needs access to one or more side inputs. This side inputs must be + * globally windowed, as they will be accessed from the global window. + */ + public List<PCollectionView<?>> getSideInputs() { + return Lists.newArrayList(); + } + + /** + * Returns the value of a given side input. The view must be present in {@link #getSideInputs()}. + */ + protected <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) { + checkArgument( + getSideInputs().contains(view), + "View %s not declared in getSideInputs() (%s)", + view, + getSideInputs()); + return sideInputAccessor.sideInput(view); + } + + void setSideInputAccessor(SideInputAccessor sideInputAccessor) { + this.sideInputAccessor = sideInputAccessor; + } + + void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) { + this.sideInputAccessor = new SideInputAccessorViaProcessContext(context); + } + + /** + * Returns an object that represents at a high level which table is being written to. May not + * return null. + */ + public abstract DestinationT getDestination(ValueInSingleWindow<T> element); + + /** + * Returns the coder for {@link DestinationT}. If this is not overridden, then + * {@link BigQueryIO} will look in the coder registry for a suitable coder. This must be a + * deterministic coder, as {@link DestinationT} will be used as a key type in a + * {@link org.apache.beam.sdk.transforms.GroupByKey}. + */ + @Nullable + public Coder<DestinationT> getDestinationCoder() { + return null; + } + + /** + * Returns a {@link TableDestination} object for the destination. May not return null. + */ + public abstract TableDestination getTable(DestinationT destination); + + /** + * Returns the table schema for the destination. May not return null. + */ + public abstract TableSchema getSchema(DestinationT destination); + + + // Gets the destination coder. If the user does not provide one, try to find one in the coder + // registry. If no coder can be found, throws CannotProvideCoderException. + Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry) + throws CannotProvideCoderException { + Coder<DestinationT> destinationCoder = getDestinationCoder(); + if (destinationCoder != null) { + return destinationCoder; + } + // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry. + // We must first use reflection to figure out what the type parameter is. + for (Type superclass = getClass().getGenericSuperclass(); + superclass != null; + superclass = ((Class) superclass).getGenericSuperclass()) { + if (superclass instanceof ParameterizedType) { + ParameterizedType parameterized = (ParameterizedType) superclass; + if (parameterized.getRawType() == DynamicDestinations.class) { + // DestinationT is the second parameter. + Type parameter = parameterized.getActualTypeArguments()[1]; + @SuppressWarnings("unchecked") + Class<DestinationT> parameterClass = (Class<DestinationT>) parameter; + return registry.getDefaultCoder(parameterClass); + } + } + } + throw new AssertionError( + "Couldn't find the DynamicDestinations superclass of " + this.getClass()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java new file mode 100644 index 0000000..72a3314 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableSpec; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.ValueInSingleWindow; + +/** + * Contains some useful helper instances of {@link DynamicDestinations}. + */ +class DynamicDestinationsHelpers { + /** + * Always returns a constant table destination. + */ + static class ConstantTableDestinations<T> extends DynamicDestinations<T, TableDestination> { + private final ValueProvider<String> tableSpec; + private final String tableDescription; + + ConstantTableDestinations(ValueProvider<String> tableSpec, String tableDescription) { + this.tableSpec = tableSpec; + this.tableDescription = tableDescription; + } + + static <T> ConstantTableDestinations<T> fromTableSpec( + ValueProvider<String> tableSpec, String tableDescription) { + return new ConstantTableDestinations<T>(tableSpec, tableDescription); + } + + static <T> ConstantTableDestinations<T> fromJsonTableRef( + ValueProvider<String> jsonTableRef, String tableDescription) { + return new ConstantTableDestinations<T>( + NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableSpec()), tableDescription); + } + + @Override + public TableDestination getDestination(ValueInSingleWindow<T> element) { + return new TableDestination(tableSpec.get(), tableDescription); + } + + @Override + public TableSchema getSchema(TableDestination destination) { + return null; + } + + @Override + public TableDestination getTable(TableDestination destination) { + return destination; + } + + @Override + public Coder<TableDestination> getDestinationCoder() { + return TableDestinationCoder.of(); + } + } + + /** + * Returns a tables based on a user-supplied function. + */ + static class TableFunctionDestinations<T> extends DynamicDestinations<T, TableDestination> { + private final SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction; + + TableFunctionDestinations( + SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) { + this.tableFunction = tableFunction; + } + + @Override + public TableDestination getDestination(ValueInSingleWindow<T> element) { + return tableFunction.apply(element); + } + + @Override + public TableSchema getSchema(TableDestination destination) { + return null; + } + + @Override + public TableDestination getTable(TableDestination destination) { + return destination; + } + + @Override + public Coder<TableDestination> getDestinationCoder() { + return TableDestinationCoder.of(); + } + } + + /** + * Delegates all calls to an inner instance of {@link DynamicDestinations}. This allows + * subclasses to modify another instance of {@link DynamicDestinations} by subclassing and + * overriding just the methods they want to alter. + */ + static class DelegatingDynamicDestinations<T, DestinationT> + extends DynamicDestinations<T, DestinationT> { + private final DynamicDestinations<T, DestinationT> inner; + DelegatingDynamicDestinations(DynamicDestinations<T, DestinationT> inner) { + this.inner = inner; + } + @Override + public DestinationT getDestination(ValueInSingleWindow<T> element) { + return inner.getDestination(element); + } + + @Override + public TableSchema getSchema(DestinationT destination) { + return inner.getSchema(destination); + } + + @Override + public TableDestination getTable(DestinationT destination) { + return inner.getTable(destination); + } + + @Override + public Coder<DestinationT> getDestinationCoder() { + return inner.getDestinationCoder(); + } + } + + /** + * Returns the same schema for every table. + */ + static class ConstantSchemaDestinations<T> + extends DelegatingDynamicDestinations<T, TableDestination> { + @Nullable + private final ValueProvider<String> jsonSchema; + + ConstantSchemaDestinations(DynamicDestinations<T, TableDestination> inner, + ValueProvider<String> jsonSchema) { + super(inner); + this.jsonSchema = jsonSchema; + } + + @Override + public TableSchema getSchema(TableDestination destination) { + return BigQueryHelpers.fromJsonString(jsonSchema.get(), TableSchema.class); + } + } + + /** + * Takes in a side input mapping tablespec to json table schema, and always returns the + * matching schema from the side input. + */ + static class SchemaFromViewDestinations<T> + extends DelegatingDynamicDestinations<T, TableDestination> { + PCollectionView<Map<String, String>> schemaView; + SchemaFromViewDestinations(DynamicDestinations<T, TableDestination> inner, + PCollectionView<Map<String, String>> schemaView) { + super(inner); + this.schemaView = schemaView; + } + + + @Override + public List<PCollectionView<?>> getSideInputs() { + return ImmutableList.<PCollectionView<?>>builder().add(schemaView).build(); + } + + @Override + public TableSchema getSchema(TableDestination destination) { + Map<String, String> mapValue = sideInput(schemaView); + return BigQueryHelpers.fromJsonString(mapValue.get(destination.getTableSpec()), + TableSchema.class); + } + } +}
