Repository: incubator-beam Updated Branches: refs/heads/master b304d037f -> 84e8bfb13
Forward port DataflowJavaSDK-337 to Beam Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31bd5ba4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31bd5ba4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31bd5ba4 Branch: refs/heads/master Commit: 31bd5ba4bc44862c8042cdf6a79be8a6ebda49a4 Parents: b304d03 Author: Pei He <[email protected]> Authored: Mon Sep 12 20:38:28 2016 -0700 Committer: Pei He <[email protected]> Committed: Mon Sep 12 20:49:50 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 16 ++++++ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 57 ++++++++++++++------ 2 files changed, 58 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31bd5ba4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 6dde581..1306e59 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -481,6 +481,22 @@ public class BigQueryIO { // read is properly specified. BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + String tempLocation = bqOptions.getTempLocation(); + checkArgument( + !Strings.isNullOrEmpty(tempLocation), + "BigQueryIO.Read needs a GCS temp location to store temp files."); + if (bigQueryServices == null) { + try { + GcsPath.fromUri(tempLocation); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format( + "BigQuery temp location expected a valid 'gs://' path, but was given '%s'", + tempLocation), + e); + } + } + TableReference table = getTableWithDefaultProject(bqOptions); if (table == null && query == null) { throw new IllegalStateException( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31bd5ba4/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 57eb4ff..19eeca5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -360,8 +360,6 @@ public class BigQueryIOTest implements Serializable { @Mock private transient IOChannelFactory mockIOChannelFactory; @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService; - private transient BigQueryOptions bqOptions; - private void checkReadTableObject( BigQueryIO.Read.Bound bound, String project, String dataset, String table) { checkReadTableObjectWithValidate(bound, project, dataset, table, true); @@ -410,10 +408,6 @@ public class BigQueryIOTest implements Serializable { @Before public void setUp() throws IOException { - bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - bqOptions.setProject("defaultProject"); - bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); - MockitoAnnotations.initMocks(this); } @@ -468,8 +462,9 @@ public class BigQueryIOTest implements Serializable { public void testValidateReadSetsDefaultProject() throws Exception { String projectId = "someproject"; String datasetId = "somedataset"; - BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - options.setProject(projectId); + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject(projectId); + bqOptions.setTempLocation("gs://testbucket/testdir"); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(mockJobService) @@ -477,7 +472,7 @@ public class BigQueryIOTest implements Serializable { when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow( new RuntimeException("Unable to confirm BigQuery dataset presence")); - Pipeline p = TestPipeline.create(options); + Pipeline p = TestPipeline.create(bqOptions); TableReference tableRef = new TableReference(); tableRef.setDatasetId(datasetId); @@ -495,7 +490,11 @@ public class BigQueryIOTest implements Serializable { @Test @Category(RunnableOnService.class) public void testBuildSourceWithoutTableQueryOrValidation() { - Pipeline p = TestPipeline.create(); + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation("gs://testbucket/testdir"); + + Pipeline p = TestPipeline.create(bqOptions); thrown.expect(IllegalStateException.class); thrown.expectMessage( "Invalid BigQuery read operation, either table reference or query has to be set"); @@ -506,7 +505,11 @@ public class BigQueryIOTest implements Serializable { @Test @Category(RunnableOnService.class) public void testBuildSourceWithTableAndQuery() { - Pipeline p = TestPipeline.create(); + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation("gs://testbucket/testdir"); + + Pipeline p = TestPipeline.create(bqOptions); thrown.expect(IllegalStateException.class); thrown.expectMessage( "Invalid BigQuery read operation. Specifies both a query and a table, only one of these" @@ -521,7 +524,11 @@ public class BigQueryIOTest implements Serializable { @Test @Category(RunnableOnService.class) public void testBuildSourceWithTableAndFlatten() { - Pipeline p = TestPipeline.create(); + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation("gs://testbucket/testdir"); + + Pipeline p = TestPipeline.create(bqOptions); thrown.expect(IllegalStateException.class); thrown.expectMessage( "Invalid BigQuery read operation. Specifies a" @@ -536,7 +543,11 @@ public class BigQueryIOTest implements Serializable { @Test @Category(RunnableOnService.class) public void testBuildSourceWithTableAndFlattenWithoutValidation() { - Pipeline p = TestPipeline.create(); + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation("gs://testbucket/testdir"); + + Pipeline p = TestPipeline.create(bqOptions); thrown.expect(IllegalStateException.class); thrown.expectMessage( "Invalid BigQuery read operation. Specifies a" @@ -551,7 +562,11 @@ public class BigQueryIOTest implements Serializable { @Test @Category(NeedsRunner.class) - public void testReadFromTable() { + public void testReadFromTable() throws IOException { + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() .startJobReturns("done", "done") @@ -583,6 +598,10 @@ public class BigQueryIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWrite() throws Exception { + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() .startJobReturns("done", "done", "done") @@ -617,6 +636,10 @@ public class BigQueryIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWriteUnknown() throws Exception { + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() .startJobReturns("done", "done") @@ -1347,7 +1370,7 @@ public class BigQueryIOTest implements Serializable { DoFnTester<String, KV<Long, List<String>>> tester = DoFnTester.of(writePartition); tester.setSideInput(filesView, GlobalWindow.INSTANCE, files); - tester.processElement(bqOptions.getTempLocation()); + tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); List<KV<Long, List<String>>> partitions; if (expectedNumPartitions > 1) { @@ -1428,6 +1451,10 @@ public class BigQueryIOTest implements Serializable { @Test public void testRemoveTemporaryFiles() throws Exception { + BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + int numFiles = 10; List<String> fileNames = Lists.newArrayList(); String tempFilePrefix = bqOptions.getTempLocation() + "/";
