[BEAM-48] Add BigQueryTornadoes integration test
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89d20a2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89d20a2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89d20a2d Branch: refs/heads/master Commit: 89d20a2d66319269082cdead70eb3cf10309b9e8 Parents: d795295 Author: Pei He <pe...@google.com> Authored: Wed May 18 17:46:24 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri May 20 16:37:39 2016 -0700 ---------------------------------------------------------------------- .../examples/cookbook/BigQueryTornadoes.java | 2 +- .../examples/cookbook/BigQueryTornadoesIT.java | 52 ++++++++++++ .../java/org/apache/beam/sdk/io/BigQueryIO.java | 87 ++++++++++++-------- .../org/apache/beam/sdk/io/BigQueryIOTest.java | 17 ++-- 4 files changed, 112 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java index 80a2f25..4c69efb 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java @@ -143,7 +143,7 @@ public class BigQueryTornadoes { * * <p>Inherits standard configuration options. */ - private static interface Options extends PipelineOptions { + static interface Options extends PipelineOptions { @Description("Table to read from, specified as " + "<project_id>:<dataset_id>.<table_id>") @Default.String(WEATHER_SAMPLES_TABLE) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java new file mode 100644 index 0000000..fbd775c --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.examples.cookbook; + +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * End-to-end tests of BigQueryTornadoes. + */ +@RunWith(JUnit4.class) +public class BigQueryTornadoesIT { + + /** + * Options for the BigQueryTornadoes Integration Test. + */ + public interface BigQueryTornadoesITOptions + extends TestPipelineOptions, BigQueryTornadoes.Options { + } + + @Test + public void testE2EBigQueryTornadoes() throws Exception { + PipelineOptionsFactory.register(BigQueryTornadoesITOptions.class); + BigQueryTornadoesITOptions options = + TestPipeline.testingPipelineOptions().as(BigQueryTornadoesITOptions.class); + options.setOutput(String.format("%s.%s", + "BigQueryTornadoesIT", "monthly_tornadoes_" + System.currentTimeMillis())); + + BigQueryTornadoes.main(TestPipeline.convertToArgs(options)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index e4a306a..030dde0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -559,34 +559,23 @@ public class BigQueryIO { String.format("Failed to resolve extract destination directory in %s", tempLocation)); } + final String executingProject = bqOptions.getProject(); if (!Strings.isNullOrEmpty(query)) { - String projectId = bqOptions.getProject(); String queryTempDatasetId = "temp_dataset_" + uuid; String queryTempTableId = "temp_table_" + uuid; TableReference queryTempTableRef = new TableReference() - .setProjectId(projectId) + .setProjectId(executingProject) .setDatasetId(queryTempDatasetId) .setTableId(queryTempTableId); - String jsonQueryTempTable; - try { - jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef); - } catch (IOException e) { - throw new RuntimeException("Cannot initialize table to JSON strings.", e); - } source = BigQueryQuerySource.create( - jobIdToken, query, jsonQueryTempTable, flattenResults, + jobIdToken, query, queryTempTableRef, flattenResults, extractDestinationDir, bqServices); } else { - String jsonTable; - try { - jsonTable = JSON_FACTORY.toString(getTableWithDefaultProject(bqOptions)); - } catch (IOException e) { - throw new RuntimeException("Cannot initialize table to JSON strings.", e); - } + TableReference inputTable = getTableWithDefaultProject(bqOptions); source = BigQueryTableSource.create( - jobIdToken, jsonTable, extractDestinationDir, bqServices); + jobIdToken, inputTable, extractDestinationDir, bqServices, executingProject); } PassThroughThenCleanup.CleanupOperation cleanupOperation = new PassThroughThenCleanup.CleanupOperation() { @@ -595,7 +584,7 @@ public class BigQueryIO { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); JobReference jobRef = new JobReference() - .setProjectId(bqOptions.getProject()) + .setProjectId(executingProject) .setJobId(getExtractJobId(jobIdToken)); Job extractJob = bqServices.getJobService(bqOptions).pollJob( jobRef, CLEANUP_JOB_POLL_MAX_RETRIES); @@ -759,10 +748,12 @@ public class BigQueryIO { static BigQueryTableSource create( String jobIdToken, - String jsonTable, + TableReference table, String extractDestinationDir, - BigQueryServices bqServices) { - return new BigQueryTableSource(jobIdToken, jsonTable, extractDestinationDir, bqServices); + BigQueryServices bqServices, + String executingProject) { + return new BigQueryTableSource( + jobIdToken, table, extractDestinationDir, bqServices, executingProject); } private final String jsonTable; @@ -770,11 +761,17 @@ public class BigQueryIO { private BigQueryTableSource( String jobIdToken, - String jsonTable, + TableReference table, String extractDestinationDir, - BigQueryServices bqServices) { - super(jobIdToken, extractDestinationDir, bqServices); - this.jsonTable = checkNotNull(jsonTable, "jsonTable"); + BigQueryServices bqServices, + String executingProject) { + super(jobIdToken, extractDestinationDir, bqServices, executingProject); + checkNotNull(table, "table"); + try { + this.jsonTable = JSON_FACTORY.toString(table); + } catch (IOException e) { + throw new RuntimeException("Cannot initialize table to JSON strings.", e); + } this.tableSizeBytes = new AtomicReference<>(); } @@ -824,12 +821,17 @@ public class BigQueryIO { static BigQueryQuerySource create( String jobIdToken, String query, - String jsonQueryTempTable, + TableReference queryTempTableRef, Boolean flattenResults, String extractDestinationDir, BigQueryServices bqServices) { return new BigQueryQuerySource( - jobIdToken, query, jsonQueryTempTable, flattenResults, extractDestinationDir, bqServices); + jobIdToken, + query, + queryTempTableRef, + flattenResults, + extractDestinationDir, + bqServices); } private final String query; @@ -840,13 +842,18 @@ public class BigQueryIO { private BigQueryQuerySource( String jobIdToken, String query, - String jsonQueryTempTable, + TableReference queryTempTableRef, Boolean flattenResults, String extractDestinationDir, BigQueryServices bqServices) { - super(jobIdToken, extractDestinationDir, bqServices); + super(jobIdToken, extractDestinationDir, bqServices, + checkNotNull(queryTempTableRef, "queryTempTableRef").getProjectId()); this.query = checkNotNull(query, "query"); - this.jsonQueryTempTable = checkNotNull(jsonQueryTempTable, "jsonQueryTempTable"); + try { + this.jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef); + } catch (IOException e) { + throw new RuntimeException("Cannot initialize table to JSON strings.", e); + } this.flattenResults = checkNotNull(flattenResults, "flattenResults"); this.dryRunJobStats = new AtomicReference<>(); } @@ -861,7 +868,7 @@ public class BigQueryIO { public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, query, bqOptions.getProject(), flattenResults)); + bqOptions, query, executingProject, flattenResults)); } @Override @@ -887,7 +894,12 @@ public class BigQueryIO { // 3. Execute the query. String queryJobId = jobIdToken + "-query"; executeQuery( - queryJobId, query, tableToExtract, flattenResults, bqServices.getJobService(bqOptions)); + executingProject, + queryJobId, + query, + tableToExtract, + flattenResults, + bqServices.getJobService(bqOptions)); return tableToExtract; } @@ -912,22 +924,22 @@ public class BigQueryIO { private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { - String projectId = bqOptions.getProject(); JobStatistics jobStats = - bqServices.getJobService(bqOptions).dryRunQuery(projectId, query); + bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query); dryRunJobStats.compareAndSet(null, jobStats); } return dryRunJobStats.get(); } private static void executeQuery( + String executingProject, String jobId, String query, TableReference destinationTable, boolean flattenResults, JobService jobService) throws IOException, InterruptedException { JobReference jobRef = new JobReference() - .setProjectId(destinationTable.getProjectId()) + .setProjectId(executingProject) .setJobId(jobId); JobConfigurationQuery queryConfig = new JobConfigurationQuery(); queryConfig @@ -978,14 +990,17 @@ public class BigQueryIO { protected final String jobIdToken; protected final String extractDestinationDir; protected final BigQueryServices bqServices; + protected final String executingProject; private BigQuerySourceBase( String jobIdToken, String extractDestinationDir, - BigQueryServices bqServices) { + BigQueryServices bqServices, + String executingProject) { this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir"); this.bqServices = checkNotNull(bqServices, "bqServices"); + this.executingProject = checkNotNull(executingProject, "executingProject"); } @Override @@ -1029,7 +1044,7 @@ public class BigQueryIO { String jobId, TableReference table, JobService jobService) throws InterruptedException, IOException { JobReference jobRef = new JobReference() - .setProjectId(table.getProjectId()) + .setProjectId(executingProject) .setJobId(jobId); String destinationUri = getExtractDestinationUri(extractDestinationDir); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 6849018..2d1b550 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -870,10 +870,10 @@ public class BigQueryIOTest implements Serializable { toJsonString(new TableRow().set("name", "c").set("number", "3"))); String jobIdToken = "testJobIdToken"; - String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name"); String extractDestinationDir = "mock://tempLocation"; - BoundedSource<TableRow> bqSource = - BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices); + BoundedSource<TableRow> bqSource = BigQueryTableSource.create( + jobIdToken, table, extractDestinationDir, fakeBqServices, "project"); List<TableRow> expected = ImmutableList.of( new TableRow().set("name", "a").set("number", "1"), @@ -907,10 +907,10 @@ public class BigQueryIOTest implements Serializable { toJsonString(new TableRow().set("name", "c").set("number", "3"))); String jobIdToken = "testJobIdToken"; - String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name"); String extractDestinationDir = "mock://tempLocation"; - BoundedSource<TableRow> bqSource = - BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices); + BoundedSource<TableRow> bqSource = BigQueryTableSource.create( + jobIdToken, table, extractDestinationDir, fakeBqServices, "project"); List<TableRow> expected = ImmutableList.of( new TableRow().set("name", "a").set("number", "1"), @@ -973,10 +973,9 @@ public class BigQueryIOTest implements Serializable { String jobIdToken = "testJobIdToken"; String extractDestinationDir = "mock://tempLocation"; - TableReference destinationTable = BigQueryIO.parseTableSpec("project.data_set.table_name"); - String jsonDestinationTable = toJsonString(destinationTable); + TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name"); BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( - jobIdToken, "query", jsonDestinationTable, true /* flattenResults */, + jobIdToken, "query", destinationTable, true /* flattenResults */, extractDestinationDir, fakeBqServices); List<TableRow> expected = ImmutableList.of(