Forward port Dataflow PR-453 to Beam
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f33e869 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f33e869 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f33e869 Branch: refs/heads/master Commit: 9f33e8692fe4852b1825c5464eafa8d9e9786425 Parents: 0ac0caf Author: Pei He <[email protected]> Authored: Mon Oct 3 21:05:44 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu Oct 6 15:33:04 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 24 +++--- .../sdk/io/gcp/bigquery/BigQueryServices.java | 3 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 19 +++-- .../gcp/bigquery/BigQueryTableRowIterator.java | 2 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 83 ++++++++++++++++++++ 5 files changed, 112 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index eb98ea8..716fe39 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -528,6 +528,7 @@ public class BigQueryIO { checkState( table != null || query != null, "Invalid BigQueryIO.Read: one of table reference and query must be set"); + if (table != null) { checkState( flattenResults == null, @@ -910,21 +911,26 @@ public class BigQueryIO { protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException, InterruptedException { // 1. Find the location of the query. - TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions) - .getQuery() - .getReferencedTables() - .get(0); + String location = null; + List<TableReference> referencedTables = + dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables(); DatasetService tableService = bqServices.getDatasetService(bqOptions); - String location = tableService.getTable( - dryRunTempTable.getProjectId(), - dryRunTempTable.getDatasetId(), - dryRunTempTable.getTableId()).getLocation(); + if (referencedTables != null && !referencedTables.isEmpty()) { + TableReference queryTable = referencedTables.get(0); + location = tableService.getTable( + queryTable.getProjectId(), + queryTable.getDatasetId(), + queryTable.getTableId()).getLocation(); + } // 2. Create the temporary dataset in the query location. TableReference tableToExtract = JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class); tableService.createDataset( - tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, ""); + tableToExtract.getProjectId(), + tableToExtract.getDatasetId(), + location, + "Dataset for BigQuery query job temporary table"); // 3. Execute the query. String queryJobId = jobIdToken + "-query"; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 1d9fb28..ca7e491 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -141,7 +141,8 @@ interface BigQueryServices extends Serializable { /** * Create a {@link Dataset} with the given {@code location} and {@code description}. */ - void createDataset(String projectId, String datasetId, String location, String description) + void createDataset( + String projectId, String datasetId, @Nullable String location, @Nullable String description) throws IOException, InterruptedException; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 3e057bb..61f1a1a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -472,7 +472,7 @@ class BigQueryServicesImpl implements BigQueryServices { */ @Override public void createDataset( - String projectId, String datasetId, String location, String description) + String projectId, String datasetId, @Nullable String location, @Nullable String description) throws IOException, InterruptedException { BackOff backoff = FluentBackoff.DEFAULT @@ -483,19 +483,22 @@ class BigQueryServicesImpl implements BigQueryServices { private void createDataset( String projectId, String datasetId, - String location, - String description, + @Nullable String location, + @Nullable String description, Sleeper sleeper, BackOff backoff) throws IOException, InterruptedException { DatasetReference datasetRef = new DatasetReference() .setProjectId(projectId) .setDatasetId(datasetId); - Dataset dataset = new Dataset() - .setDatasetReference(datasetRef) - .setLocation(location) - .setFriendlyName(location) - .setDescription(description); + Dataset dataset = new Dataset().setDatasetReference(datasetRef); + if (location != null) { + dataset.setLocation(location); + } + if (description != null) { + dataset.setFriendlyName(description); + dataset.setDescription(description); + } Exception lastException; do { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java index 608995a..92f7542 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java @@ -421,7 +421,7 @@ class BigQueryTableRowIterator implements AutoCloseable { queryConfig.setQuery(query); queryConfig.setAllowLargeResults(true); queryConfig.setFlattenResults(flattenResults); - queryConfig.setFlattenResults(useLegacySql); + queryConfig.setUseLegacySql(useLegacySql); TableReference destinationTable = new TableReference(); destinationTable.setProjectId(projectId); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index ab9716e..05a7c5c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1271,6 +1271,89 @@ public class BigQueryIOTest implements Serializable { } @Test + public void testBigQueryNoTableQuerySourceInitSplit() throws Exception { + TableReference dryRunTable = new TableReference(); + + Job queryJob = new Job(); + JobStatistics queryJobStats = new JobStatistics(); + JobStatistics2 queryStats = new JobStatistics2(); + queryStats.setReferencedTables(ImmutableList.of(dryRunTable)); + queryJobStats.setQuery(queryStats); + queryJob.setStatus(new JobStatus()) + .setStatistics(queryJobStats); + + Job extractJob = new Job(); + JobStatistics extractJobStats = new JobStatistics(); + JobStatistics4 extractStats = new JobStatistics4(); + extractStats.setDestinationUriFileCounts(ImmutableList.of(1L)); + extractJobStats.setExtract(extractStats); + extractJob.setStatus(new JobStatus()) + .setStatistics(extractJobStats); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .withDatasetService(mockDatasetService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String extractDestinationDir = "mock://tempLocation"; + TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name"); + BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( + jobIdToken, "query", destinationTable, true /* flattenResults */, true /* useLegacySql */, + extractDestinationDir, fakeBqServices); + + List<TableRow> expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation(extractDestinationDir); + + when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any())) + .thenReturn(new JobStatistics().setQuery( + new JobStatistics2() + .setTotalBytesProcessed(100L))); + when(mockDatasetService.getTable( + eq(destinationTable.getProjectId()), + eq(destinationTable.getDatasetId()), + eq(destinationTable.getTableId()))) + .thenReturn(new Table().setSchema(new TableSchema())); + IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + when(mockIOChannelFactory.resolve(anyString(), anyString())) + .thenReturn("mock://tempLocation/output"); + when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt())) + .thenReturn(extractJob); + + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + + List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options); + assertEquals(1, sources.size()); + BoundedSource<TableRow> actual = sources.get(0); + assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); + + Mockito.verify(mockJobService) + .startQueryJob( + Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any()); + Mockito.verify(mockJobService) + .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any()); + Mockito.verify(mockDatasetService) + .createDataset(anyString(), anyString(), anyString(), anyString()); + ArgumentCaptor<JobConfigurationQuery> queryConfigArg = + ArgumentCaptor.forClass(JobConfigurationQuery.class); + Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture()); + assertEquals(true, queryConfigArg.getValue().getFlattenResults()); + assertEquals(true, queryConfigArg.getValue().getUseLegacySql()); + } + + @Test public void testTransformingSource() throws Exception { int numElements = 10000; @SuppressWarnings("deprecation")
