Choose BigQuery Write implementation based on Input PCollection Stop using PipelineOptions, and instead use the boundedness of the input to choose how to write to BigQuery. This means that runners that don't use the streaming flag still function appropriately.
Fixes BEAM-746 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75d137b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75d137b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75d137b8 Branch: refs/heads/gearpump-runner Commit: 75d137b81ae240ad1a2e8942627738a6871581c1 Parents: 6d9d8bc Author: Thomas Groh <[email protected]> Authored: Wed Oct 12 17:58:57 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Oct 24 07:37:41 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 7 ++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 35 +++++++++++++++----- 2 files changed, 31 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75d137b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 5626067..50c5ae9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -119,6 +119,7 @@ import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; @@ -1748,9 +1749,9 @@ public class BigQueryIO { BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); BigQueryServices bqServices = getBigQueryServices(); - // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup - // and BigQuery's streaming import API. - if (options.isStreaming() || tableRefFunction != null) { + // When writing an Unbounded PCollection, or when a tablespec function is defined, we use + // StreamWithDeDup and BigQuery's streaming import API. + if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) { return input.apply( new StreamWithDeDup(getTable(), tableRefFunction, getSchema(), bqServices)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75d137b8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 74c35a6..51a69a2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -91,6 +91,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource; @@ -120,8 +121,10 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -1190,13 +1193,12 @@ public class BigQueryIOTest implements Serializable { assertThat(displayData, hasDisplayItem("validation", false)); } - private void testWriteValidatesDataset(boolean streaming) throws Exception { + private void testWriteValidatesDataset(boolean unbounded) throws Exception { String projectId = "someproject"; String datasetId = "somedataset"; BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); options.setProject(projectId); - options.setStreaming(streaming); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(mockJobService) @@ -1210,17 +1212,34 @@ public class BigQueryIOTest implements Serializable { tableRef.setDatasetId(datasetId); tableRef.setTableId("sometable"); + PCollection<TableRow> tableRows; + if (unbounded) { + tableRows = + p.apply(CountingInput.unbounded()) + .apply( + MapElements.via( + new SimpleFunction<Long, TableRow>() { + @Override + public TableRow apply(Long input) { + return null; + } + })) + .setCoder(TableRowJsonCoder.of()); + } else { + tableRows = p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of())); + } + thrown.expect(RuntimeException.class); // Message will be one of following depending on the execution environment. thrown.expectMessage( Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); - p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write - .to(tableRef) - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withSchema(new TableSchema()) - .withTestServices(fakeBqServices)); + tableRows + .apply( + BigQueryIO.Write.to(tableRef) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema()) + .withTestServices(fakeBqServices)); } @Test
