Repository: incubator-beam
Updated Branches:
  refs/heads/master ab74eac34 -> 4f580f5f1


[BEAM-377] Validate BigQueryIO.Read is properly configured

Previously, using withoutValidation would disable all validation,
leading to a NullPointerException if there wasn't a table or schema
provided.

The intention of the withoutValidation parameter is to bypass more
expensive (and possibly incorrect checks, such as the existence of
the table prior to pipeline execution in cases where earlier stages
create the table).

This moves the basic usage validation to always happen, while the
extended validation is still disabled by withoutValidation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7613b9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7613b9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7613b9b

Branch: refs/heads/master
Commit: d7613b9bbb8782a959c12453a5f28dcefeecb102
Parents: ab74eac
Author: Ben Chambers <[email protected]>
Authored: Sat Jun 25 14:11:17 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Sat Jun 25 23:10:25 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 47 ++++++++++++--------
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 31 ++++++++++++-
 2 files changed, 58 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7613b9b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 1c666ed..6a36c8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -389,6 +389,12 @@ public class BigQueryIO {
     public static class Bound extends PTransform<PInput, 
PCollection<TableRow>> {
       @Nullable final String jsonTableRef;
       @Nullable final String query;
+
+      /**
+       * Disable validation that the table exists or the query succeeds prior 
to pipeline
+       * submission. Basic validation (such as ensuring that a query or table 
is specified) still
+       * occurs.
+       */
       final boolean validate;
       @Nullable final Boolean flattenResults;
       @Nullable final BigQueryServices testBigQueryServices;
@@ -467,7 +473,9 @@ public class BigQueryIO {
       }
 
       /**
-       * Disable table validation.
+       * Disable validation that the table exists or the query succeeds prior 
to pipeline
+       * submission. Basic validation (such as ensuring that a query or table 
is specified) still
+       * occurs.
        */
       public Bound withoutValidation() {
         return new Bound(name, query, jsonTableRef, false, flattenResults, 
testBigQueryServices);
@@ -491,24 +499,27 @@ public class BigQueryIO {
 
       @Override
       public void validate(PInput input) {
-        if (validate) {
-          BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-          TableReference table = getTableWithDefaultProject(bqOptions);
-          if (table == null && query == null) {
-            throw new IllegalStateException(
-                "Invalid BigQuery read operation, either table reference or 
query has to be set");
-          } else if (table != null && query != null) {
-            throw new IllegalStateException("Invalid BigQuery read operation. 
Specifies both a"
-                + " query and a table, only one of these should be provided");
-          } else if (table != null && flattenResults != null) {
-            throw new IllegalStateException("Invalid BigQuery read operation. 
Specifies a"
-                + " table with a result flattening preference, which is not 
configurable");
-          } else if (query != null && flattenResults == null) {
-            throw new IllegalStateException("Invalid BigQuery read operation. 
Specifies a"
-                + " query without a result flattening preference");
-          }
+        // Even if existence validation is disabled, we need to make sure that 
the BigQueryIO
+        // read is properly specified.
+        BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+        TableReference table = getTableWithDefaultProject(bqOptions);
+        if (table == null && query == null) {
+          throw new IllegalStateException(
+              "Invalid BigQuery read operation, either table reference or 
query has to be set");
+        } else if (table != null && query != null) {
+          throw new IllegalStateException("Invalid BigQuery read operation. 
Specifies both a"
+              + " query and a table, only one of these should be provided");
+        } else if (table != null && flattenResults != null) {
+          throw new IllegalStateException("Invalid BigQuery read operation. 
Specifies a"
+              + " table with a result flattening preference, which is not 
configurable");
+        } else if (query != null && flattenResults == null) {
+          throw new IllegalStateException("Invalid BigQuery read operation. 
Specifies a"
+              + " query without a result flattening preference");
+        }
 
+        // Only verify existence/correctness if validation is enabled.
+        if (validate) {
           // Check for source table/query presence for early failure 
notification.
           // Note that a presence check can fail if the table or dataset are 
created by earlier
           // stages of the pipeline or if a query depends on earlier stages of 
a pipeline. For these

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7613b9b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 2a135ec..a1daf72 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -26,8 +26,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.eq;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.Pipeline;
@@ -473,6 +473,17 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
+  public void testBuildSourceWithoutTableQueryOrValidation() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation, either table reference or query has 
to be set");
+    p.apply(BigQueryIO.Read.withoutValidation());
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
   public void testBuildSourceWithTableAndQuery() {
     Pipeline p = TestPipeline.create();
     thrown.expect(IllegalStateException.class);
@@ -502,6 +513,22 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testBuildSourceWithTableAndFlattenWithoutValidation() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation. Specifies a"
+              + " table with a result flattening preference, which is not 
configurable");
+    p.apply(
+        BigQueryIO.Read.named("ReadMyTable")
+            .from("foo.com:project:somedataset.sometable")
+            .withoutValidation()
+            .withoutResultFlattening());
+    p.run();
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testReadFromTable() {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()

Reply via email to