Forward port Dataflow PR-431 to Beam

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cca861ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cca861ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cca861ba

Branch: refs/heads/master
Commit: cca861ba82a2e6ba6c6af122be0b8a9932d53cc5
Parents: ecbc641
Author: Pei He <pe...@google.com>
Authored: Mon Oct 3 19:37:02 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 15:33:03 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/PropertyNames.java |   1 +
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 129 +++++++++++++------
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |   5 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   |  16 ++-
 .../gcp/bigquery/BigQueryTableRowIterator.java  |  14 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  44 +++++--
 .../bigquery/BigQueryTableRowIteratorTest.java  |   6 +-
 7 files changed, 149 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
index cc9fa5e..b17bcad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
@@ -30,6 +30,7 @@ public class PropertyNames {
   public static final String BIGQUERY_TABLE = "table";
   public static final String BIGQUERY_QUERY = "bigquery_query";
   public static final String BIGQUERY_FLATTEN_RESULTS = 
"bigquery_flatten_results";
+  public static final String BIGQUERY_USE_LEGACY_SQL = 
"bigquery_use_legacy_sql";
   public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition";
   public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format";
   public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema";

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 91f6073..6d20c3f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -384,6 +384,7 @@ public class BigQueryIO {
        */
       final boolean validate;
       @Nullable final Boolean flattenResults;
+      @Nullable final Boolean useLegacySql;
       @Nullable BigQueryServices bigQueryServices;
 
       private static final String QUERY_VALIDATION_FAILURE_ERROR =
@@ -397,17 +398,20 @@ public class BigQueryIO {
             null /* jsonTableRef */,
             true /* validate */,
             null /* flattenResults */,
+            null /* useLegacySql */,
             null /* bigQueryServices */);
       }
 
       private Bound(
           String name, @Nullable String query, @Nullable String jsonTableRef, 
boolean validate,
-          @Nullable Boolean flattenResults, @Nullable BigQueryServices 
bigQueryServices) {
+          @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
+          @Nullable BigQueryServices bigQueryServices) {
         super(name);
         this.jsonTableRef = jsonTableRef;
         this.query = query;
         this.validate = validate;
         this.flattenResults = flattenResults;
+        this.useLegacySql = useLegacySql;
         this.bigQueryServices = bigQueryServices;
       }
 
@@ -428,7 +432,8 @@ public class BigQueryIO {
        */
       public Bound from(TableReference table) {
         return new Bound(
-            name, query, toJsonString(table), validate, flattenResults, 
bigQueryServices);
+            name, query, toJsonString(table), validate, flattenResults, 
useLegacySql,
+            bigQueryServices);
       }
 
       /**
@@ -440,10 +445,15 @@ public class BigQueryIO {
        * "flattenResults" in the <a 
href="https://cloud.google.com/bigquery/docs/reference/v2/jobs";>
        * Jobs documentation</a> for more information.  To disable flattening, 
use
        * {@link BigQueryIO.Read.Bound#withoutResultFlattening}.
+       *
+       * <p>By default, the query will use BigQuery's legacy SQL dialect. To 
use the BigQuery
+       * Standard SQL dialect, use {@link 
BigQueryIO.Read.Bound#usingStandardSql}.
        */
       public Bound fromQuery(String query) {
         return new Bound(name, query, jsonTableRef, validate,
-            MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), 
bigQueryServices);
+            MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
+            MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE),
+            bigQueryServices);
       }
 
       /**
@@ -452,7 +462,9 @@ public class BigQueryIO {
        * occurs.
        */
       public Bound withoutValidation() {
-        return new Bound(name, query, jsonTableRef, false, flattenResults, 
bigQueryServices);
+        return new Bound(
+            name, query, jsonTableRef, false /* validate */, flattenResults, 
useLegacySql,
+            bigQueryServices);
       }
 
       /**
@@ -463,12 +475,27 @@ public class BigQueryIO {
        * from a table will cause an error during validation.
        */
       public Bound withoutResultFlattening() {
-        return new Bound(name, query, jsonTableRef, validate, false, 
bigQueryServices);
+        return new Bound(
+            name, query, jsonTableRef, validate, false /* flattenResults */, 
useLegacySql,
+            bigQueryServices);
+      }
+
+      /**
+       * Enables BigQuery's Standard SQL dialect when reading from a query.
+       *
+       * <p>Only valid when a query is used ({@link #fromQuery}). Setting this 
option when reading
+       * from a table will cause an error during validation.
+       */
+      public Bound usingStandardSql() {
+        return new Bound(
+            name, query, jsonTableRef, validate, flattenResults, false /* 
useLegacySql */,
+            bigQueryServices);
       }
 
       @VisibleForTesting
       Bound withTestServices(BigQueryServices testServices) {
-        return new Bound(name, query, jsonTableRef, validate, flattenResults, 
testServices);
+        return new Bound(
+            name, query, jsonTableRef, validate, flattenResults, useLegacySql, 
testServices);
       }
 
       @Override
@@ -494,39 +521,42 @@ public class BigQueryIO {
         }
 
         TableReference table = getTableWithDefaultProject(bqOptions);
-        if (table == null && query == null) {
-          throw new IllegalStateException(
-              "Invalid BigQuery read operation, either table reference or 
query has to be set");
-        } else if (table != null && query != null) {
-          throw new IllegalStateException("Invalid BigQuery read operation. 
Specifies both a"
-              + " query and a table, only one of these should be provided");
-        } else if (table != null && flattenResults != null) {
-          throw new IllegalStateException("Invalid BigQuery read operation. 
Specifies a"
-              + " table with a result flattening preference, which is not 
configurable");
-        } else if (query != null && flattenResults == null) {
-          throw new IllegalStateException("Invalid BigQuery read operation. 
Specifies a"
-              + " query without a result flattening preference");
+
+        checkState(
+            table == null || query == null,
+            "Invalid BigQueryIO.Read: table reference and query may not both 
be set");
+        checkState(
+            table != null || query != null,
+            "Invalid BigQueryIO.Read: one of table reference and query must be 
set");
+        if (table != null) {
+          checkState(
+              flattenResults == null,
+              "Invalid BigQueryIO.Read: Specifies a table with a result 
flattening"
+                  + " preference, which only applies to queries");
+          checkState(
+              useLegacySql == null,
+              "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
+                  + " preference, which only applies to queries");
+        } else /* query != null */ {
+          checkState(flattenResults != null, "flattenResults should not be 
null if query is set");
+          checkState(useLegacySql != null, "useLegacySql should not be null if 
query is set");
         }
 
-        if (validate) {
-          BigQueryServices bqServices = getBigQueryServices();
-          // Check for source table/query presence for early failure 
notification.
-          // Note that a presence check can fail if the table or dataset are 
created by earlier
-          // stages of the pipeline or if a query depends on earlier stages of 
a pipeline. For these
-          // cases the withoutValidation method can be used to disable the 
check.
-          if (table != null) {
-            DatasetService datasetService = 
bqServices.getDatasetService(bqOptions);
-            verifyDatasetPresence(datasetService, table);
-            verifyTablePresence(datasetService, table);
-          }
-          if (query != null) {
-            JobService jobService = bqServices.getJobService(bqOptions);
-            try {
-              jobService.dryRunQuery(bqOptions.getProject(), query);
-            } catch (Exception e) {
-              throw new IllegalArgumentException(
-                  String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
-            }
+        // Note that a table or query check can fail if the table or dataset 
are created by
+        // earlier stages of the pipeline or if a query depends on earlier 
stages of a pipeline.
+        // For these cases the withoutValidation method can be used to disable 
the check.
+        if (validate && table != null) {
+          // Check for source table presence for early failure notification.
+          DatasetService datasetService = 
bigQueryServices.getDatasetService(bqOptions);
+          verifyDatasetPresence(datasetService, table);
+          verifyTablePresence(datasetService, table);
+        } else if (validate && query != null) {
+          JobService jobService = bigQueryServices.getJobService(bqOptions);
+          try {
+            jobService.dryRunQuery(bqOptions.getProject(), query, 
useLegacySql);
+          } catch (Exception e) {
+            throw new IllegalArgumentException(
+                String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
           }
         }
       }
@@ -562,7 +592,7 @@ public class BigQueryIO {
               .setTableId(queryTempTableId);
 
           source = BigQueryQuerySource.create(
-              jobIdToken, query, queryTempTableRef, flattenResults,
+              jobIdToken, query, queryTempTableRef, flattenResults, 
useLegacySql,
               extractDestinationDir, bqServices);
         } else {
           TableReference inputTable = getTableWithDefaultProject(bqOptions);
@@ -621,6 +651,8 @@ public class BigQueryIO {
               .withLabel("Query"))
             .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
               .withLabel("Flatten Query Results"))
+            .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
+              .withLabel("Use Legacy SQL Dialect"))
             .addIfNotDefault(DisplayData.item("validation", validate)
               .withLabel("Validation Enabled"),
                 true);
@@ -670,6 +702,15 @@ public class BigQueryIO {
         return flattenResults;
       }
 
+      /**
+       * Returns true (false) if the query will (will not) use BigQuery's 
legacy SQL mode, or null
+       * if not applicable.
+       */
+      @Nullable
+      public Boolean getUseLegacySql() {
+        return useLegacySql;
+      }
+
       private BigQueryServices getBigQueryServices() {
         if (bigQueryServices == null) {
           bigQueryServices = new BigQueryServicesImpl();
@@ -811,6 +852,7 @@ public class BigQueryIO {
         String query,
         TableReference queryTempTableRef,
         Boolean flattenResults,
+        Boolean useLegacySql,
         String extractDestinationDir,
         BigQueryServices bqServices) {
       return new BigQueryQuerySource(
@@ -818,6 +860,7 @@ public class BigQueryIO {
           query,
           queryTempTableRef,
           flattenResults,
+          useLegacySql,
           extractDestinationDir,
           bqServices);
     }
@@ -825,6 +868,7 @@ public class BigQueryIO {
     private final String query;
     private final String jsonQueryTempTable;
     private final Boolean flattenResults;
+    private final Boolean useLegacySql;
     private transient AtomicReference<JobStatistics> dryRunJobStats;
 
     private BigQueryQuerySource(
@@ -832,6 +876,7 @@ public class BigQueryIO {
         String query,
         TableReference queryTempTableRef,
         Boolean flattenResults,
+        Boolean useLegacySql,
         String extractDestinationDir,
         BigQueryServices bqServices) {
       super(jobIdToken, extractDestinationDir, bqServices,
@@ -839,6 +884,7 @@ public class BigQueryIO {
       this.query = checkNotNull(query, "query");
       this.jsonQueryTempTable = toJsonString(queryTempTableRef);
       this.flattenResults = checkNotNull(flattenResults, "flattenResults");
+      this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
       this.dryRunJobStats = new AtomicReference<>();
     }
 
@@ -852,7 +898,7 @@ public class BigQueryIO {
     public BoundedReader<TableRow> createReader(PipelineOptions options) 
throws IOException {
       BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
       return new BigQueryReader(this, bqServices.getReaderFromQuery(
-          bqOptions, query, executingProject, flattenResults));
+          bqOptions, query, executingProject, flattenResults, useLegacySql));
     }
 
     @Override
@@ -883,6 +929,7 @@ public class BigQueryIO {
           query,
           tableToExtract,
           flattenResults,
+          useLegacySql,
           bqServices.getJobService(bqOptions));
       return tableToExtract;
     }
@@ -909,7 +956,7 @@ public class BigQueryIO {
         throws InterruptedException, IOException {
       if (dryRunJobStats.get() == null) {
         JobStatistics jobStats =
-            bqServices.getJobService(bqOptions).dryRunQuery(executingProject, 
query);
+            bqServices.getJobService(bqOptions).dryRunQuery(executingProject, 
query, useLegacySql);
         dryRunJobStats.compareAndSet(null, jobStats);
       }
       return dryRunJobStats.get();
@@ -921,6 +968,7 @@ public class BigQueryIO {
         String query,
         TableReference destinationTable,
         boolean flattenResults,
+        boolean useLegacySql,
         JobService jobService) throws IOException, InterruptedException {
       JobReference jobRef = new JobReference()
           .setProjectId(executingProject)
@@ -932,6 +980,7 @@ public class BigQueryIO {
           .setCreateDisposition("CREATE_IF_NEEDED")
           .setDestinationTable(destinationTable)
           .setFlattenResults(flattenResults)
+          .setUseLegacySql(useLegacySql)
           .setPriority("BATCH")
           .setWriteDisposition("WRITE_EMPTY");
       jobService.startQueryJob(jobRef, queryConfig);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 16b3a39..eb77f12 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -57,7 +57,8 @@ interface BigQueryServices extends Serializable {
    * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
    */
   BigQueryJsonReader getReaderFromQuery(
-      BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten);
+      BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten,
+      @Nullable Boolean useLegacySql);
 
   /**
    * An interface for the Cloud BigQuery load service.
@@ -97,7 +98,7 @@ interface BigQueryServices extends Serializable {
     /**
      * Dry runs the query in the given project.
      */
-    JobStatistics dryRunQuery(String projectId, String query)
+    JobStatistics dryRunQuery(String projectId, String query, boolean 
useLegacySql)
         throws InterruptedException, IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 3862382..ad2d4ed 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -97,8 +97,9 @@ class BigQueryServicesImpl implements BigQueryServices {
 
   @Override
   public BigQueryJsonReader getReaderFromQuery(
-      BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten) {
-    return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, 
flatten);
+      BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten,
+      @Nullable Boolean useLegacySql) {
+    return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, 
flatten, useLegacySql);
   }
 
   @VisibleForTesting
@@ -265,12 +266,13 @@ class BigQueryServicesImpl implements BigQueryServices {
     }
 
     @Override
-    public JobStatistics dryRunQuery(String projectId, String query)
+    public JobStatistics dryRunQuery(String projectId, String query, boolean 
useLegacySql)
         throws InterruptedException, IOException {
       Job job = new Job()
           .setConfiguration(new JobConfiguration()
               .setQuery(new JobConfigurationQuery()
-                  .setQuery(query))
+                  .setQuery(query)
+                  .setUseLegacySql(useLegacySql))
               .setDryRun(true));
       BackOff backoff =
           FluentBackoff.DEFAULT
@@ -697,10 +699,12 @@ class BigQueryServicesImpl implements BigQueryServices {
         BigQueryOptions bqOptions,
         String query,
         String projectId,
-        @Nullable Boolean flattenResults) {
+        @Nullable Boolean flattenResults,
+        @Nullable Boolean useLegacySql) {
       return new BigQueryJsonReaderImpl(
           BigQueryTableRowIterator.fromQuery(
-              query, projectId, 
Transport.newBigQueryClient(bqOptions).build(), flattenResults));
+              query, projectId, 
Transport.newBigQueryClient(bqOptions).build(), flattenResults,
+              useLegacySql));
     }
 
     private static BigQueryJsonReader fromTable(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 64b1dc6..608995a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -91,6 +91,8 @@ class BigQueryTableRowIterator implements AutoCloseable {
   private final String query;
   // Whether to flatten query results.
   private final boolean flattenResults;
+  // Whether to use the BigQuery legacy SQL dialect..
+  private final boolean useLegacySql;
   // Temporary dataset used to store query results.
   private String temporaryDatasetId = null;
   // Temporary table used to store query results.
@@ -98,12 +100,13 @@ class BigQueryTableRowIterator implements AutoCloseable {
 
   private BigQueryTableRowIterator(
       @Nullable TableReference ref, @Nullable String query, @Nullable String 
projectId,
-      Bigquery client, boolean flattenResults) {
+      Bigquery client, boolean flattenResults, boolean useLegacySql) {
     this.ref = ref;
     this.query = query;
     this.projectId = projectId;
     this.client = checkNotNull(client, "client");
     this.flattenResults = flattenResults;
+    this.useLegacySql = useLegacySql;
   }
 
   /**
@@ -112,7 +115,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
   public static BigQueryTableRowIterator fromTable(TableReference ref, 
Bigquery client) {
     checkNotNull(ref, "ref");
     checkNotNull(client, "client");
-    return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, 
true);
+    return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, 
true, true);
   }
 
   /**
@@ -120,12 +123,14 @@ class BigQueryTableRowIterator implements AutoCloseable {
    * specified query in the specified project.
    */
   public static BigQueryTableRowIterator fromQuery(
-      String query, String projectId, Bigquery client, @Nullable Boolean 
flattenResults) {
+      String query, String projectId, Bigquery client, @Nullable Boolean 
flattenResults,
+      @Nullable Boolean useLegacySql) {
     checkNotNull(query, "query");
     checkNotNull(projectId, "projectId");
     checkNotNull(client, "client");
     return new BigQueryTableRowIterator(null, query, projectId, client,
-        MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
+        MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
+        MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
   }
 
   /**
@@ -416,6 +421,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
     queryConfig.setQuery(query);
     queryConfig.setAllowLargeResults(true);
     queryConfig.setFlattenResults(flattenResults);
+    queryConfig.setFlattenResults(useLegacySql);
 
     TableReference destinationTable = new TableReference();
     destinationTable.setProjectId(projectId);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 78a4c4f..d2c6715 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
@@ -182,7 +183,8 @@ public class BigQueryIOTest implements Serializable {
 
     @Override
     public BigQueryJsonReader getReaderFromQuery(
-        BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten) {
+        BigQueryOptions bqOptions, String query, String projectId, @Nullable 
Boolean flatten,
+        @Nullable Boolean useLegacySql) {
       return new FakeBigQueryReader(jsonTableRowReturns);
     }
 
@@ -360,7 +362,7 @@ public class BigQueryIOTest implements Serializable {
     }
 
     @Override
-    public JobStatistics dryRunQuery(String projectId, String query)
+    public JobStatistics dryRunQuery(String projectId, String query, boolean 
useLegacySql)
         throws InterruptedException, IOException {
       throw new UnsupportedOperationException();
     }
@@ -537,7 +539,7 @@ public class BigQueryIOTest implements Serializable {
     Pipeline p = TestPipeline.create(bqOptions);
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(
-        "Invalid BigQuery read operation, either table reference or query has 
to be set");
+        "Invalid BigQueryIO.Read: one of table reference and query must be 
set");
     p.apply(BigQueryIO.Read.withoutValidation());
     p.run();
   }
@@ -552,8 +554,7 @@ public class BigQueryIOTest implements Serializable {
     Pipeline p = TestPipeline.create(bqOptions);
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(
-        "Invalid BigQuery read operation. Specifies both a query and a table, 
only one of these"
-        + " should be provided");
+        "Invalid BigQueryIO.Read: table reference and query may not both be 
set");
     p.apply("ReadMyTable",
         BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
@@ -571,8 +572,8 @@ public class BigQueryIOTest implements Serializable {
     Pipeline p = TestPipeline.create(bqOptions);
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(
-        "Invalid BigQuery read operation. Specifies a"
-              + " table with a result flattening preference, which is not 
configurable");
+        "Invalid BigQueryIO.Read: Specifies a table with a result flattening 
preference,"
+            + " which only applies to queries");
     p.apply("ReadMyTable",
         BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
@@ -590,8 +591,8 @@ public class BigQueryIOTest implements Serializable {
     Pipeline p = TestPipeline.create(bqOptions);
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(
-        "Invalid BigQuery read operation. Specifies a"
-              + " table with a result flattening preference, which is not 
configurable");
+        "Invalid BigQueryIO.Read: Specifies a table with a result flattening 
preference,"
+            + " which only applies to queries");
     p.apply(
         BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
@@ -601,6 +602,25 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testBuildSourceWithTableAndSqlDialect() {
+    BigQueryOptions bqOptions = 
PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect 
preference,"
+            + " which only applies to queries");
+    p.apply(
+        BigQueryIO.Read
+            .from("foo.com:project:somedataset.sometable")
+            .usingStandardSql());
+    p.run();
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testReadFromTable() throws IOException {
     BigQueryOptions bqOptions = 
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
@@ -712,6 +732,7 @@ public class BigQueryIOTest implements Serializable {
         .from(tableSpec)
         .fromQuery("myQuery")
         .withoutResultFlattening()
+        .usingStandardSql()
         .withoutValidation();
 
     DisplayData displayData = DisplayData.from(read);
@@ -719,6 +740,7 @@ public class BigQueryIOTest implements Serializable {
     assertThat(displayData, hasDisplayItem("table", tableSpec));
     assertThat(displayData, hasDisplayItem("query", "myQuery"));
     assertThat(displayData, hasDisplayItem("flattenResults", false));
+    assertThat(displayData, hasDisplayItem("useLegacySql", false));
     assertThat(displayData, hasDisplayItem("validation", false));
   }
 
@@ -1189,7 +1211,7 @@ public class BigQueryIOTest implements Serializable {
     String extractDestinationDir = "mock://tempLocation";
     TableReference destinationTable = 
BigQueryIO.parseTableSpec("project:data_set.table_name");
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        jobIdToken, "query", destinationTable, true /* flattenResults */,
+        jobIdToken, "query", destinationTable, true /* flattenResults */, true 
/* useLegacySql */,
         extractDestinationDir, fakeBqServices);
 
     List<TableRow> expected = ImmutableList.of(
@@ -1204,7 +1226,7 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("testProejct")
         .setDatasetId("testDataset")
         .setTableId("testTable");
-    when(mockJobService.dryRunQuery(anyString(), anyString()))
+    when(mockJobService.dryRunQuery(anyString(), anyString(), anyBoolean()))
         .thenReturn(new JobStatistics().setQuery(
             new JobStatistics2()
                 .setTotalBytesProcessed(100L)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
index 29a1704..a41b455 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
@@ -199,7 +199,7 @@ public class BigQueryTableRowIteratorTest {
     String query = "SELECT name, count, photo, anniversary_date, "
         + "anniversary_datetime, anniversary_time from table";
     try (BigQueryTableRowIterator iterator =
-            BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null)) {
+            BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null, null)) {
       iterator.open();
       assertTrue(iterator.advance());
       TableRow row = iterator.getCurrent();
@@ -281,7 +281,7 @@ public class BigQueryTableRowIteratorTest {
         "SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo",
         photoBytesEncoded);
     try (BigQueryTableRowIterator iterator =
-        BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null)) {
+        BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, 
null)) {
       iterator.open();
       assertTrue(iterator.advance());
       TableRow row = iterator.getCurrent();
@@ -334,7 +334,7 @@ public class BigQueryTableRowIteratorTest {
 
     String query = "NOT A QUERY";
     try (BigQueryTableRowIterator iterator =
-        BigQueryTableRowIterator.fromQuery(query, "project", mockClient, 
null)) {
+        BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, 
null)) {
 
       try {
         iterator.open();

Reply via email to