Repository: incubator-beam Updated Branches: refs/heads/master ab74eac34 -> 4f580f5f1
[BEAM-377] Validate BigQueryIO.Read is properly configured Previously, using withoutValidation would disable all validation, leading to a NullPointerException if there wasn't a table or schema provided. The intention of the withoutValidation parameter is to bypass more expensive (and possibly incorrect checks, such as the existence of the table prior to pipeline execution in cases where earlier stages create the table). This moves the basic usage validation to always happen, while the extended validation is still disabled by withoutValidation. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7613b9b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7613b9b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7613b9b Branch: refs/heads/master Commit: d7613b9bbb8782a959c12453a5f28dcefeecb102 Parents: ab74eac Author: Ben Chambers <[email protected]> Authored: Sat Jun 25 14:11:17 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Sat Jun 25 23:10:25 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/BigQueryIO.java | 47 ++++++++++++-------- .../org/apache/beam/sdk/io/BigQueryIOTest.java | 31 ++++++++++++- 2 files changed, 58 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7613b9b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 1c666ed..6a36c8d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -389,6 +389,12 @@ public class BigQueryIO { public static class Bound extends PTransform<PInput, PCollection<TableRow>> { @Nullable final String jsonTableRef; @Nullable final String query; + + /** + * Disable validation that the table exists or the query succeeds prior to pipeline + * submission. Basic validation (such as ensuring that a query or table is specified) still + * occurs. + */ final boolean validate; @Nullable final Boolean flattenResults; @Nullable final BigQueryServices testBigQueryServices; @@ -467,7 +473,9 @@ public class BigQueryIO { } /** - * Disable table validation. + * Disable validation that the table exists or the query succeeds prior to pipeline + * submission. Basic validation (such as ensuring that a query or table is specified) still + * occurs. */ public Bound withoutValidation() { return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices); @@ -491,24 +499,27 @@ public class BigQueryIO { @Override public void validate(PInput input) { - if (validate) { - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - - TableReference table = getTableWithDefaultProject(bqOptions); - if (table == null && query == null) { - throw new IllegalStateException( - "Invalid BigQuery read operation, either table reference or query has to be set"); - } else if (table != null && query != null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a" - + " query and a table, only one of these should be provided"); - } else if (table != null && flattenResults != null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" - + " table with a result flattening preference, which is not configurable"); - } else if (query != null && flattenResults == null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" - + " query without a result flattening preference"); - } + // Even if existence validation is disabled, we need to make sure that the BigQueryIO + // read is properly specified. + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + + TableReference table = getTableWithDefaultProject(bqOptions); + if (table == null && query == null) { + throw new IllegalStateException( + "Invalid BigQuery read operation, either table reference or query has to be set"); + } else if (table != null && query != null) { + throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a" + + " query and a table, only one of these should be provided"); + } else if (table != null && flattenResults != null) { + throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" + + " table with a result flattening preference, which is not configurable"); + } else if (query != null && flattenResults == null) { + throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" + + " query without a result flattening preference"); + } + // Only verify existence/correctness if validation is enabled. + if (validate) { // Check for source table/query presence for early failure notification. // Note that a presence check can fail if the table or dataset are created by earlier // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7613b9b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 2a135ec..a1daf72 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -26,8 +26,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.eq; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; import org.apache.beam.sdk.Pipeline; @@ -473,6 +473,17 @@ public class BigQueryIOTest implements Serializable { @Test @Category(RunnableOnService.class) + public void testBuildSourceWithoutTableQueryOrValidation() { + Pipeline p = TestPipeline.create(); + thrown.expect(IllegalStateException.class); + thrown.expectMessage( + "Invalid BigQuery read operation, either table reference or query has to be set"); + p.apply(BigQueryIO.Read.withoutValidation()); + p.run(); + } + + @Test + @Category(RunnableOnService.class) public void testBuildSourceWithTableAndQuery() { Pipeline p = TestPipeline.create(); thrown.expect(IllegalStateException.class); @@ -502,6 +513,22 @@ public class BigQueryIOTest implements Serializable { } @Test + @Category(RunnableOnService.class) + public void testBuildSourceWithTableAndFlattenWithoutValidation() { + Pipeline p = TestPipeline.create(); + thrown.expect(IllegalStateException.class); + thrown.expectMessage( + "Invalid BigQuery read operation. Specifies a" + + " table with a result flattening preference, which is not configurable"); + p.apply( + BigQueryIO.Read.named("ReadMyTable") + .from("foo.com:project:somedataset.sometable") + .withoutValidation() + .withoutResultFlattening()); + p.run(); + } + + @Test @Category(NeedsRunner.class) public void testReadFromTable() { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
