BigQueryIO: fix streaming write, typo in API and improve testing
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5fb4f5de Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5fb4f5de Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5fb4f5de Branch: refs/heads/gearpump-runner Commit: 5fb4f5de9515db717818f1e3ffd7ca3c6eba5614 Parents: 4206408 Author: Sam McVeety <s...@google.com> Authored: Fri Dec 16 18:10:28 2016 -0800 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri Dec 16 23:53:49 2016 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 23 +++++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 72 ++++++++++++-------- 2 files changed, 63 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fb4f5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 28049ed..7bb1e51 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -370,7 +370,8 @@ public class BigQueryIO { } } - private static class TableSpecToTableRef + @VisibleForTesting + static class TableSpecToTableRef implements SerializableFunction<String, TableReference> { @Override public TableReference apply(String from) { @@ -807,6 +808,7 @@ public class BigQueryIO { /** * Returns the query to be read, or {@code null} if reading from a table instead. */ + @Nullable public String getQuery() { return query == null ? null : query.get(); } @@ -814,7 +816,8 @@ public class BigQueryIO { /** * Returns the query to be read, or {@code null} if reading from a table instead. */ - public ValueProvider<String> getQueryProivder() { + @Nullable + public ValueProvider<String> getQueryProvider() { return query; } @@ -2813,7 +2816,8 @@ public class BigQueryIO { * a randomUUID is generated only once per bucket of data. The actual unique * id is created by concatenating this randomUUID with a sequential number. */ - private static class TagWithUniqueIdsAndTable + @VisibleForTesting + static class TagWithUniqueIdsAndTable extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> { /** TableSpec to write to. */ private final ValueProvider<String> tableSpec; @@ -2830,8 +2834,12 @@ public class BigQueryIO { checkArgument(table == null ^ tableRefFunction == null, "Exactly one of table or tableRefFunction should be set"); if (table != null) { - if (table.isAccessible() && table.get().getProjectId() == null) { - table.get().setProjectId(options.as(BigQueryOptions.class).getProject()); + if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) { + TableReference tableRef = table.get() + .setProjectId(options.as(BigQueryOptions.class).getProject()); + table = NestedValueProvider.of( + StaticValueProvider.of(toJsonString(tableRef)), + new JsonTableRefToTableRef()); } this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec()); } else { @@ -2870,6 +2878,11 @@ public class BigQueryIO { } } + @VisibleForTesting + ValueProvider<String> getTableSpec() { + return tableSpec; + } + private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) { if (tableSpec != null) { return tableSpec.get(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fb4f5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index b78316f..dc566d2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -26,6 +26,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -2242,43 +2243,60 @@ public class BigQueryIOTest implements Serializable { } @Test - public void testRuntimeOptionsNotCalledInApplyInputTable() throws IOException { + public void testRuntimeOptionsNotCalledInApplyInputTable() { RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); - FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService()); + bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline pipeline = TestPipeline.create(options); - pipeline - .apply(BigQueryIO.Read - .from(options.getInputTable()).withoutValidation() - .withTestServices(fakeBqServices)) - .apply(BigQueryIO.Write - .to(options.getOutputTable()) - .withSchema(NestedValueProvider.of( - options.getOutputSchema(), new JsonSchemaToTableSchema())) - .withTestServices(fakeBqServices) - .withoutValidation()); + BigQueryIO.Read.Bound read = BigQueryIO.Read.from( + options.getInputTable()).withoutValidation(); + pipeline.apply(read); + // Test that this doesn't throw. + DisplayData.from(read); } @Test - public void testRuntimeOptionsNotCalledInApplyInputQuery() throws IOException { + public void testRuntimeOptionsNotCalledInApplyInputQuery() { RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); - FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService(new FakeJobService()); + bqOptions.setTempLocation("gs://testbucket/testdir"); Pipeline pipeline = TestPipeline.create(options); + BigQueryIO.Read.Bound read = BigQueryIO.Read.fromQuery( + options.getInputQuery()).withoutValidation(); + pipeline.apply(read); + // Test that this doesn't throw. + DisplayData.from(read); + } + + @Test + public void testRuntimeOptionsNotCalledInApplyOutput() { + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + bqOptions.setTempLocation("gs://testbucket/testdir"); + Pipeline pipeline = TestPipeline.create(options); + BigQueryIO.Write.Bound write = BigQueryIO.Write + .to(options.getOutputTable()) + .withSchema(NestedValueProvider.of( + options.getOutputSchema(), new JsonSchemaToTableSchema())) + .withoutValidation(); pipeline - .apply(BigQueryIO.Read - .fromQuery(options.getInputQuery()).withoutValidation() - .withTestServices(fakeBqServices)) - .apply(BigQueryIO.Write - .to(options.getOutputTable()) - .withSchema(NestedValueProvider.of( - options.getOutputSchema(), new JsonSchemaToTableSchema())) - .withTestServices(fakeBqServices) - .withoutValidation()); + .apply(Create.<TableRow>of()) + .apply(write); + // Test that this doesn't throw. + DisplayData.from(write); + } + + @Test + public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() { + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("project"); + BigQueryIO.TagWithUniqueIdsAndTable tag = + new BigQueryIO.TagWithUniqueIdsAndTable( + bqOptions, NestedValueProvider.of( + StaticValueProvider.of("data_set.table_name"), + new BigQueryIO.TableSpecToTableRef()), null); + TableReference table = BigQueryIO.parseTableSpec(tag.getTableSpec().get()); + assertNotNull(table.getProjectId()); } private static void testNumFiles(File tempDir, int expectedNumFiles) {