Add support repeated template invocations for BQIO.Write
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0676cf2e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0676cf2e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0676cf2e Branch: refs/heads/master Commit: 0676cf2eaad5dc51148197dfa1eced8d703222c6 Parents: 25a014f Author: Sam McVeety <[email protected]> Authored: Mon Feb 6 13:40:12 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Fri Feb 24 08:46:50 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 53 +++++++++++--------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 46 +++++++---------- 2 files changed, 47 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0676cf2e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 5dbec54..be9a786 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -97,9 +97,11 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -1684,9 +1686,6 @@ public class BigQueryIO { @Nullable private BigQueryServices bigQueryServices; - @VisibleForTesting @Nullable String stepUuid; - @VisibleForTesting @Nullable ValueProvider<String> jobUuid; - private static class TranslateTableSpecFunction implements SerializableFunction<BoundedWindow, TableReference> { private SerializableFunction<BoundedWindow, String> tableSpecFunction; @@ -1991,11 +1990,7 @@ public class BigQueryIO { ValueProvider<TableReference> table = getTableWithDefaultProject(options); - stepUuid = randomUUIDString(); - jobUuid = NestedValueProvider.of( - StaticValueProvider.of(options.getJobName()), new CreatePerBeamJobUuid(stepUuid)); - ValueProvider<String> jobIdToken = NestedValueProvider.of( - jobUuid, new BeamJobUuidToBigQueryJobUuid()); + String stepUuid = randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; @@ -2010,7 +2005,18 @@ public class BigQueryIO { e); } + // Create a singleton job ID token at execution time. PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix)); + PCollectionView<String> jobIdTokenView = p + .apply("TriggerIdCreation", Create.of("ignored")) + .apply("CreateJobId", MapElements.via( + new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return randomUUIDString(); + } + })) + .apply(View.<String>asSingleton()); PCollection<TableRow> inputInGlobalWindow = input.apply( @@ -2043,26 +2049,27 @@ public class BigQueryIO { .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( false, bqServices, - jobIdToken, + jobIdTokenView, tempFilePrefix, NestedValueProvider.of(table, new TableRefToJson()), jsonSchema, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, - tableDescription))); + tableDescription)) + .withSideInputs(jobIdTokenView)); PCollectionView<Iterable<String>> tempTablesView = tempTables .apply("TempTablesView", View.<String>asIterable()); singleton.apply(ParDo .of(new WriteRename( bqServices, - jobIdToken, + jobIdTokenView, NestedValueProvider.of(table, new TableRefToJson()), writeDisposition, createDisposition, tempTablesView, tableDescription)) - .withSideInputs(tempTablesView)); + .withSideInputs(tempTablesView, jobIdTokenView)); // Write single partition to final table partitions.get(singlePartitionTag) @@ -2070,13 +2077,14 @@ public class BigQueryIO { .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( true, bqServices, - jobIdToken, + jobIdTokenView, tempFilePrefix, NestedValueProvider.of(table, new TableRefToJson()), jsonSchema, writeDisposition, createDisposition, - tableDescription))); + tableDescription)) + .withSideInputs(jobIdTokenView)); return PDone.in(input.getPipeline()); } @@ -2325,7 +2333,7 @@ public class BigQueryIO { static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> { private final boolean singlePartition; private final BigQueryServices bqServices; - private final ValueProvider<String> jobIdToken; + private final PCollectionView<String> jobIdToken; private final String tempFilePrefix; private final ValueProvider<String> jsonTableRef; private final ValueProvider<String> jsonSchema; @@ -2336,7 +2344,7 @@ public class BigQueryIO { public WriteTables( boolean singlePartition, BigQueryServices bqServices, - ValueProvider<String> jobIdToken, + PCollectionView<String> jobIdToken, String tempFilePrefix, ValueProvider<String> jsonTableRef, ValueProvider<String> jsonSchema, @@ -2357,7 +2365,8 @@ public class BigQueryIO { @ProcessElement public void processElement(ProcessContext c) throws Exception { List<String> partition = Lists.newArrayList(c.element().getValue()).get(0); - String jobIdPrefix = String.format(jobIdToken.get() + "_%05d", c.element().getKey()); + String jobIdPrefix = String.format( + c.sideInput(jobIdToken) + "_%05d", c.element().getKey()); TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class); if (!singlePartition) { ref.setTableId(jobIdPrefix); @@ -2460,8 +2469,6 @@ public class BigQueryIO { super.populateDisplayData(builder); builder - .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken) - .withLabel("Job ID Token")) .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) .withLabel("Temporary File Prefix")) .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) @@ -2478,7 +2485,7 @@ public class BigQueryIO { */ static class WriteRename extends DoFn<String, Void> { private final BigQueryServices bqServices; - private final ValueProvider<String> jobIdToken; + private final PCollectionView<String> jobIdToken; private final ValueProvider<String> jsonTableRef; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; @@ -2487,7 +2494,7 @@ public class BigQueryIO { public WriteRename( BigQueryServices bqServices, - ValueProvider<String> jobIdToken, + PCollectionView<String> jobIdToken, ValueProvider<String> jsonTableRef, WriteDisposition writeDisposition, CreateDisposition createDisposition, @@ -2518,7 +2525,7 @@ public class BigQueryIO { copy( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdToken.get(), + c.sideInput(jobIdToken), fromJsonString(jsonTableRef.get(), TableReference.class), tempTables, writeDisposition, @@ -2598,8 +2605,6 @@ public class BigQueryIO { super.populateDisplayData(builder); builder - .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken) - .withLabel("Job ID Token")) .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) .withLabel("Table Reference")) .add(DisplayData.item("writeDisposition", writeDisposition.toString()) http://git-wip-us.apache.org/repos/asf/beam/blob/0676cf2e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 3aa90cf..fe41703 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -140,6 +140,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -2200,6 +2201,8 @@ public class BigQueryIOTest implements Serializable { @Test public void testWriteTables() throws Exception { + p.enableAbandonedNodeEnforcement(false); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() .startJobReturns("done", "done", "done", "done") @@ -2223,10 +2226,18 @@ public class BigQueryIOTest implements Serializable { expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); } + PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView( + p, + WindowingStrategy.globalDefault(), + StringUtf8Coder.of()); + PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId")); + PCollectionView<String> jobIdTokenView = + jobIdTokenCollection.apply(View.<String>asSingleton()); + WriteTables writeTables = new WriteTables( false, fakeBqServices, - StaticValueProvider.of(jobIdToken), + jobIdTokenView, tempFilePrefix, StaticValueProvider.of(jsonTable), StaticValueProvider.of(jsonSchema), @@ -2235,6 +2246,7 @@ public class BigQueryIOTest implements Serializable { null); DoFnTester<KV<Long, Iterable<List<String>>>, String> tester = DoFnTester.of(writeTables); + tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); for (KV<Long, Iterable<List<String>>> partition : partitions) { tester.processElement(partition); } @@ -2296,10 +2308,13 @@ public class BigQueryIOTest implements Serializable { p, WindowingStrategy.globalDefault(), StringUtf8Coder.of()); + PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId")); + PCollectionView<String> jobIdTokenView = + jobIdTokenCollection.apply(View.<String>asSingleton()); WriteRename writeRename = new WriteRename( fakeBqServices, - StaticValueProvider.of(jobIdToken), + jobIdTokenView, StaticValueProvider.of(jsonTable), WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, @@ -2308,6 +2323,7 @@ public class BigQueryIOTest implements Serializable { DoFnTester<String, Void> tester = DoFnTester.of(writeRename); tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); + tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); tester.processElement(null); } @@ -2473,30 +2489,4 @@ public class BigQueryIOTest implements Serializable { assertNotEquals(read1.stepUuid, read2.stepUuid); assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get()); } - - @Test - public void testUniqueStepIdWrite() { - RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - bqOptions.setTempLocation("gs://testbucket/testdir"); - Pipeline pipeline = TestPipeline.create(options); - BigQueryIO.Write.Bound write1 = BigQueryIO.Write - .to(options.getOutputTable()) - .withSchema(NestedValueProvider.of( - options.getOutputSchema(), new JsonSchemaToTableSchema())) - .withoutValidation(); - BigQueryIO.Write.Bound write2 = BigQueryIO.Write - .to(options.getOutputTable()) - .withSchema(NestedValueProvider.of( - options.getOutputSchema(), new JsonSchemaToTableSchema())) - .withoutValidation(); - pipeline - .apply(Create.empty(TableRowJsonCoder.of())) - .apply(write1); - pipeline - .apply(Create.empty(TableRowJsonCoder.of())) - .apply(write2); - assertNotEquals(write1.stepUuid, write2.stepUuid); - assertNotEquals(write1.jobUuid.get(), write2.jobUuid.get()); - } }
