Repository: incubator-beam
Updated Branches:
  refs/heads/master ecbc64117 -> 9b71f1636


Forward port Dataflow PR-454 to Beam


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ac0caf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ac0caf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ac0caf2

Branch: refs/heads/master
Commit: 0ac0caf2f78064a820f8a6ae23624162dcd1419f
Parents: cca861b
Author: Pei He <[email protected]>
Authored: Mon Oct 3 20:39:32 2016 -0700
Committer: Luke Cwik <[email protected]>
Committed: Thu Oct 6 15:33:03 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 34 +++++++++++---------
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  2 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   |  8 ++---
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 11 +++++--
 4 files changed, 31 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ac0caf2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 6d20c3f..eb98ea8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -553,7 +553,12 @@ public class BigQueryIO {
         } else if (validate && query != null) {
           JobService jobService = bigQueryServices.getJobService(bqOptions);
           try {
-            jobService.dryRunQuery(bqOptions.getProject(), query, 
useLegacySql);
+            jobService.dryRunQuery(
+                bqOptions.getProject(),
+                new JobConfigurationQuery()
+                    .setQuery(query)
+                    .setFlattenResults(flattenResults)
+                    .setUseLegacySql(useLegacySql));
           } catch (Exception e) {
             throw new IllegalArgumentException(
                 String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
@@ -926,10 +931,7 @@ public class BigQueryIO {
       executeQuery(
           executingProject,
           queryJobId,
-          query,
           tableToExtract,
-          flattenResults,
-          useLegacySql,
           bqServices.getJobService(bqOptions));
       return tableToExtract;
     }
@@ -955,34 +957,29 @@ public class BigQueryIO {
     private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions 
bqOptions)
         throws InterruptedException, IOException {
       if (dryRunJobStats.get() == null) {
-        JobStatistics jobStats =
-            bqServices.getJobService(bqOptions).dryRunQuery(executingProject, 
query, useLegacySql);
+        JobStatistics jobStats = 
bqServices.getJobService(bqOptions).dryRunQuery(
+            executingProject, createBasicQueryConfig());
         dryRunJobStats.compareAndSet(null, jobStats);
       }
       return dryRunJobStats.get();
     }
 
-    private static void executeQuery(
+    private void executeQuery(
         String executingProject,
         String jobId,
-        String query,
         TableReference destinationTable,
-        boolean flattenResults,
-        boolean useLegacySql,
         JobService jobService) throws IOException, InterruptedException {
       JobReference jobRef = new JobReference()
           .setProjectId(executingProject)
           .setJobId(jobId);
-      JobConfigurationQuery queryConfig = new JobConfigurationQuery();
-      queryConfig
-          .setQuery(query)
+
+      JobConfigurationQuery queryConfig = createBasicQueryConfig()
           .setAllowLargeResults(true)
           .setCreateDisposition("CREATE_IF_NEEDED")
           .setDestinationTable(destinationTable)
-          .setFlattenResults(flattenResults)
-          .setUseLegacySql(useLegacySql)
           .setPriority("BATCH")
           .setWriteDisposition("WRITE_EMPTY");
+
       jobService.startQueryJob(jobRef, queryConfig);
       Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
       if (parseStatus(job) != Status.SUCCEEDED) {
@@ -990,6 +987,13 @@ public class BigQueryIO {
       }
     }
 
+    private JobConfigurationQuery createBasicQueryConfig() {
+      return new JobConfigurationQuery()
+          .setQuery(query)
+          .setFlattenResults(flattenResults)
+          .setUseLegacySql(useLegacySql);
+    }
+
     private void readObject(ObjectInputStream in) throws 
ClassNotFoundException, IOException {
       in.defaultReadObject();
       dryRunJobStats = new AtomicReference<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ac0caf2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index eb77f12..1d9fb28 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -98,7 +98,7 @@ interface BigQueryServices extends Serializable {
     /**
      * Dry runs the query in the given project.
      */
-    JobStatistics dryRunQuery(String projectId, String query, boolean 
useLegacySql)
+    JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
queryConfig)
         throws InterruptedException, IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ac0caf2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index ad2d4ed..3e057bb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -266,13 +266,11 @@ class BigQueryServicesImpl implements BigQueryServices {
     }
 
     @Override
-    public JobStatistics dryRunQuery(String projectId, String query, boolean 
useLegacySql)
+    public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
queryConfig)
         throws InterruptedException, IOException {
       Job job = new Job()
           .setConfiguration(new JobConfiguration()
-              .setQuery(new JobConfigurationQuery()
-                  .setQuery(query)
-                  .setUseLegacySql(useLegacySql))
+              .setQuery(queryConfig)
               .setDryRun(true));
       BackOff backoff =
           FluentBackoff.DEFAULT
@@ -281,7 +279,7 @@ class BigQueryServicesImpl implements BigQueryServices {
           client.jobs().insert(projectId, job),
           String.format(
               "Unable to dry run query: %s, aborting after %d retries.",
-              query, MAX_RPC_RETRIES),
+              queryConfig, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
           backoff).getStatistics();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ac0caf2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d2c6715..ab9716e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
@@ -129,6 +128,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -362,7 +362,7 @@ public class BigQueryIOTest implements Serializable {
     }
 
     @Override
-    public JobStatistics dryRunQuery(String projectId, String query, boolean 
useLegacySql)
+    public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
query)
         throws InterruptedException, IOException {
       throw new UnsupportedOperationException();
     }
@@ -1226,7 +1226,7 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("testProejct")
         .setDatasetId("testDataset")
         .setTableId("testTable");
-    when(mockJobService.dryRunQuery(anyString(), anyString(), anyBoolean()))
+    when(mockJobService.dryRunQuery(anyString(), 
Mockito.<JobConfigurationQuery>any()))
         .thenReturn(new JobStatistics().setQuery(
             new JobStatistics2()
                 .setTotalBytesProcessed(100L)
@@ -1263,6 +1263,11 @@ public class BigQueryIOTest implements Serializable {
         .startExtractJob(Mockito.<JobReference>any(), 
Mockito.<JobConfigurationExtract>any());
     Mockito.verify(mockDatasetService)
         .createDataset(anyString(), anyString(), anyString(), anyString());
+    ArgumentCaptor<JobConfigurationQuery> queryConfigArg =
+        ArgumentCaptor.forClass(JobConfigurationQuery.class);
+    Mockito.verify(mockJobService).dryRunQuery(anyString(), 
queryConfigArg.capture());
+    assertEquals(true, queryConfigArg.getValue().getFlattenResults());
+    assertEquals(true, queryConfigArg.getValue().getUseLegacySql());
   }
 
   @Test

Reply via email to