Repository: incubator-beam Updated Branches: refs/heads/master ecbc64117 -> 9b71f1636
Forward port Dataflow PR-454 to Beam Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ac0caf2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ac0caf2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ac0caf2 Branch: refs/heads/master Commit: 0ac0caf2f78064a820f8a6ae23624162dcd1419f Parents: cca861b Author: Pei He <[email protected]> Authored: Mon Oct 3 20:39:32 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu Oct 6 15:33:03 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 34 +++++++++++--------- .../sdk/io/gcp/bigquery/BigQueryServices.java | 2 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 8 ++--- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 11 +++++-- 4 files changed, 31 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ac0caf2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 6d20c3f..eb98ea8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -553,7 +553,12 @@ public class BigQueryIO { } else if (validate && query != null) { JobService jobService = bigQueryServices.getJobService(bqOptions); try { - jobService.dryRunQuery(bqOptions.getProject(), query, useLegacySql); + jobService.dryRunQuery( + bqOptions.getProject(), + new JobConfigurationQuery() + .setQuery(query) + .setFlattenResults(flattenResults) + .setUseLegacySql(useLegacySql)); } catch (Exception e) { throw new IllegalArgumentException( String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e); @@ -926,10 +931,7 @@ public class BigQueryIO { executeQuery( executingProject, queryJobId, - query, tableToExtract, - flattenResults, - useLegacySql, bqServices.getJobService(bqOptions)); return tableToExtract; } @@ -955,34 +957,29 @@ public class BigQueryIO { private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { - JobStatistics jobStats = - bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query, useLegacySql); + JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery( + executingProject, createBasicQueryConfig()); dryRunJobStats.compareAndSet(null, jobStats); } return dryRunJobStats.get(); } - private static void executeQuery( + private void executeQuery( String executingProject, String jobId, - String query, TableReference destinationTable, - boolean flattenResults, - boolean useLegacySql, JobService jobService) throws IOException, InterruptedException { JobReference jobRef = new JobReference() .setProjectId(executingProject) .setJobId(jobId); - JobConfigurationQuery queryConfig = new JobConfigurationQuery(); - queryConfig - .setQuery(query) + + JobConfigurationQuery queryConfig = createBasicQueryConfig() .setAllowLargeResults(true) .setCreateDisposition("CREATE_IF_NEEDED") .setDestinationTable(destinationTable) - .setFlattenResults(flattenResults) - .setUseLegacySql(useLegacySql) .setPriority("BATCH") .setWriteDisposition("WRITE_EMPTY"); + jobService.startQueryJob(jobRef, queryConfig); Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); if (parseStatus(job) != Status.SUCCEEDED) { @@ -990,6 +987,13 @@ public class BigQueryIO { } } + private JobConfigurationQuery createBasicQueryConfig() { + return new JobConfigurationQuery() + .setQuery(query) + .setFlattenResults(flattenResults) + .setUseLegacySql(useLegacySql); + } + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { in.defaultReadObject(); dryRunJobStats = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ac0caf2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index eb77f12..1d9fb28 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -98,7 +98,7 @@ interface BigQueryServices extends Serializable { /** * Dry runs the query in the given project. */ - JobStatistics dryRunQuery(String projectId, String query, boolean useLegacySql) + JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig) throws InterruptedException, IOException; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ac0caf2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index ad2d4ed..3e057bb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -266,13 +266,11 @@ class BigQueryServicesImpl implements BigQueryServices { } @Override - public JobStatistics dryRunQuery(String projectId, String query, boolean useLegacySql) + public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig) throws InterruptedException, IOException { Job job = new Job() .setConfiguration(new JobConfiguration() - .setQuery(new JobConfigurationQuery() - .setQuery(query) - .setUseLegacySql(useLegacySql)) + .setQuery(queryConfig) .setDryRun(true)); BackOff backoff = FluentBackoff.DEFAULT @@ -281,7 +279,7 @@ class BigQueryServicesImpl implements BigQueryServices { client.jobs().insert(projectId, job), String.format( "Unable to dry run query: %s, aborting after %d retries.", - query, MAX_RPC_RETRIES), + queryConfig, MAX_RPC_RETRIES), Sleeper.DEFAULT, backoff).getStatistics(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ac0caf2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index d2c6715..ab9716e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; @@ -129,6 +128,7 @@ import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -362,7 +362,7 @@ public class BigQueryIOTest implements Serializable { } @Override - public JobStatistics dryRunQuery(String projectId, String query, boolean useLegacySql) + public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query) throws InterruptedException, IOException { throw new UnsupportedOperationException(); } @@ -1226,7 +1226,7 @@ public class BigQueryIOTest implements Serializable { .setProjectId("testProejct") .setDatasetId("testDataset") .setTableId("testTable"); - when(mockJobService.dryRunQuery(anyString(), anyString(), anyBoolean())) + when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any())) .thenReturn(new JobStatistics().setQuery( new JobStatistics2() .setTotalBytesProcessed(100L) @@ -1263,6 +1263,11 @@ public class BigQueryIOTest implements Serializable { .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any()); Mockito.verify(mockDatasetService) .createDataset(anyString(), anyString(), anyString(), anyString()); + ArgumentCaptor<JobConfigurationQuery> queryConfigArg = + ArgumentCaptor.forClass(JobConfigurationQuery.class); + Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture()); + assertEquals(true, queryConfigArg.getValue().getFlattenResults()); + assertEquals(true, queryConfigArg.getValue().getUseLegacySql()); } @Test
