Add PrepareWrite transform.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/67a5f827 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/67a5f827 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/67a5f827 Branch: refs/heads/DSL_SQL Commit: 67a5f82706e52fe025b63aa2e9652368f22c8344 Parents: c939a43 Author: Reuven Lax <[email protected]> Authored: Tue Mar 28 12:53:27 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Apr 18 21:12:49 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/PrepareWrite.java | 58 ++++++++++++++++++++ .../sdk/io/gcp/bigquery/TableDestination.java | 7 +++ .../gcp/bigquery/TagWithUniqueIdsAndTable.java | 15 ++--- 3 files changed, 69 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/67a5f827/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java new file mode 100644 index 0000000..0c08e18 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java @@ -0,0 +1,58 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.base.Strings; +import java.io.IOException; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.ValueInSingleWindow; + +/** + * Prepare an input {@link PCollection<T>} for writing to BigQuery. Use the table-reference + * function to determine which tables each element is written to, and format the element into a + * {@link TableRow} using the user-supplied format function. + */ +public class PrepareWrite<T> extends PTransform<PCollection<T>, PCollection<KV<String, TableRow>>> { + private static final String NAME = "PrepareWrite"; + private SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction; + private SerializableFunction<T, TableRow> formatFunction; + + public PrepareWrite(SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction, + SerializableFunction<T, TableRow> formatFunction) { + super(NAME); + this.tableRefFunction = tableRefFunction; + this.formatFunction = formatFunction; + } + + @Override + public PCollection<KV<String, TableRow>> expand(PCollection<T> input) { + PCollection<KV<String, TableRow>> elementsByTable = + input.apply(ParDo.of(new DoFn<T, KV<String, TableRow>>() { + @ProcessElement + public void processElement(ProcessContext context, BoundedWindow window) throws IOException { + String tableSpec = tableSpecFromWindowedValue( + context.getPipelineOptions().as(BigQueryOptions.class), + ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane())); + TableRow tableRow = formatFunction.apply(context.element()); + context.output(KV.of(tableSpec, tableRow)); + } + })); + return elementsByTable; + } + + private String tableSpecFromWindowedValue(BigQueryOptions options, + ValueInSingleWindow<T> value) { + TableReference table = tableRefFunction.apply(value); + if (Strings.isNullOrEmpty(table.getProjectId())) { + table.setProjectId(options.getProject()); + } + return BigQueryHelpers.toTableSpec(table); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/67a5f827/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java new file mode 100644 index 0000000..3cbbf3b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -0,0 +1,7 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +/** + * Created by relax on 3/28/17. + */ +public class TableDestination { +} http://git-wip-us.apache.org/repos/asf/beam/blob/67a5f827/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java index 8d7d1e6..4e50f7c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java @@ -73,9 +73,9 @@ class TagWithUniqueIdsAndTable<T> public void processElement(ProcessContext context, BoundedWindow window) throws IOException { String uniqueId = randomUUID + sequenceNo++; ThreadLocalRandom randomGenerator = ThreadLocalRandom.current(); - String tableSpec = tableSpecFromWindowedValue( - context.getPipelineOptions().as(BigQueryOptions.class), - ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane())); + String tableSpec = tableSpecFromWindowedValue( + context.getPipelineOptions().as(BigQueryOptions.class), + ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane())); // We output on keys 0-50 to ensure that there's enough batching for // BigQuery. context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)), @@ -97,12 +97,5 @@ class TagWithUniqueIdsAndTable<T> } - private String tableSpecFromWindowedValue(BigQueryOptions options, - ValueInSingleWindow<T> value) { - TableReference table = write.getTableRefFunction().apply(value); - if (Strings.isNullOrEmpty(table.getProjectId())) { - table.setProjectId(options.getProject()); - } - return BigQueryHelpers.toTableSpec(table); - } + }
