Use tableRefFunction throughout BigQueryIO. Constant table writes use ConstantTableSpecFunction.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c939a436 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c939a436 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c939a436 Branch: refs/heads/DSL_SQL Commit: c939a43617cdb37228625a34b3545377b142fc8a Parents: e0df7d8 Author: Reuven Lax <[email protected]> Authored: Tue Mar 28 11:21:59 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Apr 18 21:12:49 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 57 ++++++++++---------- .../sdk/io/gcp/bigquery/StreamWithDeDup.java | 4 +- .../gcp/bigquery/TagWithUniqueIdsAndTable.java | 57 ++++++-------------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 19 ++----- 4 files changed, 50 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 9753da5..af0d561 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -700,7 +700,8 @@ public class BigQueryIO { abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef); abstract Builder<T> setTableRefFunction( SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction); - abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction); + abstract Builder<T> setFormatFunction( + SerializableFunction<T, TableRow> formatFunction); abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema); abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition); abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition); @@ -781,7 +782,8 @@ public class BigQueryIO { /** Ensures that methods of the to() family are called at most once. */ private void ensureToNotCalledYet() { checkState( - getJsonTableRef() == null && getTable() == null, "to() already called"); + getJsonTableRef() == null && getTable() == null + && getTableRefFunction() == null, "to() already called"); } /** @@ -805,6 +807,8 @@ public class BigQueryIO { NestedValueProvider.of( NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), new TableRefToJson())) + .setTableRefFunction(new TranslateTableSpecFunction<T>( + new ConstantTableSpecFunction<T>(tableSpec))) .build(); } @@ -812,7 +816,8 @@ public class BigQueryIO { * Writes to table specified by the specified table function. The table is a function of * {@link ValueInSingleWindow}, so can be determined by the value or by the window. */ - public Write<T> to(SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) { + public Write<T> to( + SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) { return toTableReference(new TranslateTableSpecFunction<T>(tableSpecFunction)); } @@ -848,6 +853,20 @@ public class BigQueryIO { } } + static class ConstantTableSpecFunction<T> implements + SerializableFunction<ValueInSingleWindow<T>, String> { + private ValueProvider<String> tableSpec; + + ConstantTableSpecFunction(ValueProvider<String> tableSpec) { + this.tableSpec = tableSpec; + } + + @Override + public String apply(ValueInSingleWindow<T> value) { + return tableSpec.get(); + } + } + /** * Uses the specified schema for rows to be written. * @@ -900,13 +919,8 @@ public class BigQueryIO { BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); // Exactly one of the table and table reference can be configured. - checkState( - getJsonTableRef() != null || getTableRefFunction() != null, + checkState(getTableRefFunction() != null, "must set the table reference of a BigQueryIO.Write transform"); - checkState( - getJsonTableRef() == null || getTableRefFunction() == null, - "Cannot set both a table reference and a table function for a BigQueryIO.Write" - + " transform"); checkArgument(getFormatFunction() != null, "A function must be provided to convert type into a TableRow. " @@ -920,6 +934,7 @@ public class BigQueryIO { // The user specified a table. if (getJsonTableRef() != null && getValidate()) { TableReference table = getTableWithDefaultProject(options).get(); + // TODO: This seems wrong - what if the ValueProvider is not accessible? DatasetService datasetService = getBigQueryServices().getDatasetService(options); // Check for destination table presence and emptiness for early failure notification. @@ -935,24 +950,12 @@ public class BigQueryIO { } } - if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || getTableRefFunction() != null) { + if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) { // We will use BigQuery's streaming write API -- validate supported dispositions. - if (getTableRefFunction() != null) { - checkArgument( - getCreateDisposition() != CreateDisposition.CREATE_NEVER, - "CreateDisposition.CREATE_NEVER is not supported when using a tablespec" - + " function."); - } - if (getJsonSchema() == null) { - checkArgument( - getCreateDisposition() == CreateDisposition.CREATE_NEVER, - "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null."); - } - checkArgument( getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE, - "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or" - + " when using a tablespec function."); + "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded" + + " PCollection."); } else { // We will use a BigQuery load job -- validate the temp location. String tempLocation = options.getTempLocation(); @@ -977,7 +980,7 @@ public class BigQueryIO { public WriteResult expand(PCollection<T> input) { // When writing an Unbounded PCollection, or when a tablespec function is defined, we use // StreamWithDeDup and BigQuery's streaming import API. - if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) { + if (input.isBounded() == IsBounded.UNBOUNDED) { return input.apply(new StreamWithDeDup<T>(this)); } else { return input.apply(new BatchLoadBigQuery<T>(this)); @@ -1026,12 +1029,12 @@ public class BigQueryIO { * * <p>If the table's project is not specified, use the executing project. */ - @Nullable ValueProvider<TableReference> getTableWithDefaultProject( - BigQueryOptions bqOptions) { + @Nullable ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) { ValueProvider<TableReference> table = getTable(); if (table == null) { return table; } + if (!table.isAccessible()) { LOG.info("Using a dynamic value for table input. This must contain a project" + " in the table reference: {}", table); http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java index 1fa26d1..506a564 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java @@ -64,8 +64,7 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> { PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>( - input.getPipeline().getOptions().as(BigQueryOptions.class), write.getTable(), - write.getTableRefFunction(), write.getFormatFunction()))); + input.getPipeline().getOptions().as(BigQueryOptions.class), write))); // To prevent having the same TableRow processed more than once with regenerated // different unique ids, this implementation relies on "checkpointing", which is @@ -85,6 +84,7 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> { write.getCreateDisposition(), write.getTableDescription(), write.getBigQueryServices()))); + return WriteResult.in(input.getPipeline()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java index a6608e4..8d7d1e6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java @@ -18,23 +18,18 @@ package org.apache.beam.sdk.io.gcp.bigquery; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; @@ -49,39 +44,22 @@ import org.apache.beam.sdk.values.ValueInSingleWindow; @VisibleForTesting class TagWithUniqueIdsAndTable<T> extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> { - /** TableSpec to write to. */ - private final ValueProvider<String> tableSpec; - - /** User function mapping windowed values to {@link TableReference} in JSON. */ - private final SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction; + /** TableSpec to write to in the case of a single static destination. */ + private ValueProvider<String> tableSpec = null; - /** User function mapping user type to a TableRow. */ - private final SerializableFunction<T, TableRow> formatFunction; + private final Write<T, ?> write; private transient String randomUUID; private transient long sequenceNo = 0L; TagWithUniqueIdsAndTable(BigQueryOptions options, - ValueProvider<TableReference> table, - SerializableFunction<ValueInSingleWindow<T>, TableReference> - tableRefFunction, - SerializableFunction<T, TableRow> formatFunction) { - checkArgument(table == null ^ tableRefFunction == null, - "Exactly one of table or tableRefFunction should be set"); + Write<T, ?> write) { + ValueProvider<TableReference> table = write.getTableWithDefaultProject( + options.as(BigQueryOptions.class)); if (table != null) { - if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) { - TableReference tableRef = table.get() - .setProjectId(options.as(BigQueryOptions.class).getProject()); - table = NestedValueProvider.of( - StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)), - new JsonTableRefToTableRef()); - } this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec()); - } else { - tableSpec = null; } - this.tableRefFunction = tableRefFunction; - this.formatFunction = formatFunction; + this.write = write; } @@ -101,7 +79,7 @@ class TagWithUniqueIdsAndTable<T> // We output on keys 0-50 to ensure that there's enough batching for // BigQuery. context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)), - new TableRowInfo(formatFunction.apply(context.element()), uniqueId))); + new TableRowInfo(write.getFormatFunction().apply(context.element()), uniqueId))); } @Override @@ -109,10 +87,8 @@ class TagWithUniqueIdsAndTable<T> super.populateDisplayData(builder); builder.addIfNotNull(DisplayData.item("table", tableSpec)); - if (tableRefFunction != null) { - builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) + builder.add(DisplayData.item("tableFn", write.getTableRefFunction().getClass()) .withLabel("Table Reference Function")); - } } @VisibleForTesting @@ -120,16 +96,13 @@ class TagWithUniqueIdsAndTable<T> return tableSpec; } + private String tableSpecFromWindowedValue(BigQueryOptions options, ValueInSingleWindow<T> value) { - if (tableSpec != null) { - return tableSpec.get(); - } else { - TableReference table = tableRefFunction.apply(value); - if (table.getProjectId() == null) { - table.setProjectId(options.getProject()); - } - return BigQueryHelpers.toTableSpec(table); + TableReference table = write.getTableRefFunction().apply(value); + if (Strings.isNullOrEmpty(table.getProjectId())) { + table.setProjectId(options.getProject()); } + return BigQueryHelpers.toTableSpec(table); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 83fd8d9..499aa74 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -26,7 +26,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -103,7 +102,6 @@ import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; @@ -150,6 +148,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; @@ -1375,7 +1374,8 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildWriteDefaultProject() { - BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows().to("somedataset.sometable"); + BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows() + .to("somedataset" + ".sometable"); checkWriteObject( write, null, "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, @@ -2350,19 +2350,6 @@ public class BigQueryIOTest implements Serializable { DisplayData.from(write); } - @Test - public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() { - BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); - bqOptions.setProject("project"); - TagWithUniqueIdsAndTable<TableRow> tag = - new TagWithUniqueIdsAndTable<TableRow>( - bqOptions, NestedValueProvider.of( - StaticValueProvider.of("data_set.table_name"), - new TableSpecToTableRef()), null, null); - TableReference table = BigQueryHelpers.parseTableSpec(tag.getTableSpec().get()); - assertNotNull(table.getProjectId()); - } - private static void testNumFiles(File tempDir, int expectedNumFiles) { assertEquals(expectedNumFiles, tempDir.listFiles(new FileFilter() { @Override
