Add support repeated template invocations for BQIO.Write

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0676cf2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0676cf2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0676cf2e

Branch: refs/heads/master
Commit: 0676cf2eaad5dc51148197dfa1eced8d703222c6
Parents: 25a014f
Author: Sam McVeety <[email protected]>
Authored: Mon Feb 6 13:40:12 2017 -0800
Committer: Thomas Groh <[email protected]>
Committed: Fri Feb 24 08:46:50 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 53 +++++++++++---------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 46 +++++++----------
 2 files changed, 47 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0676cf2e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 5dbec54..be9a786 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -97,9 +97,11 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -1684,9 +1686,6 @@ public class BigQueryIO {
 
       @Nullable private BigQueryServices bigQueryServices;
 
-      @VisibleForTesting @Nullable String stepUuid;
-      @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
-
       private static class TranslateTableSpecFunction implements
           SerializableFunction<BoundedWindow, TableReference> {
         private SerializableFunction<BoundedWindow, String> tableSpecFunction;
@@ -1991,11 +1990,7 @@ public class BigQueryIO {
 
         ValueProvider<TableReference> table = 
getTableWithDefaultProject(options);
 
-        stepUuid = randomUUIDString();
-        jobUuid = NestedValueProvider.of(
-           StaticValueProvider.of(options.getJobName()), new 
CreatePerBeamJobUuid(stepUuid));
-        ValueProvider<String> jobIdToken = NestedValueProvider.of(
-            jobUuid, new BeamJobUuidToBigQueryJobUuid());
+        String stepUuid = randomUUIDString();
 
         String tempLocation = options.getTempLocation();
         String tempFilePrefix;
@@ -2010,7 +2005,18 @@ public class BigQueryIO {
               e);
         }
 
+        // Create a singleton job ID token at execution time.
         PCollection<String> singleton = p.apply("Create", 
Create.of(tempFilePrefix));
+        PCollectionView<String> jobIdTokenView = p
+            .apply("TriggerIdCreation", Create.of("ignored"))
+            .apply("CreateJobId", MapElements.via(
+                new SimpleFunction<String, String>() {
+                  @Override
+                  public String apply(String input) {
+                    return randomUUIDString();
+                  }
+                }))
+            .apply(View.<String>asSingleton());
 
         PCollection<TableRow> inputInGlobalWindow =
             input.apply(
@@ -2043,26 +2049,27 @@ public class BigQueryIO {
             .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
                 false,
                 bqServices,
-                jobIdToken,
+                jobIdTokenView,
                 tempFilePrefix,
                 NestedValueProvider.of(table, new TableRefToJson()),
                 jsonSchema,
                 WriteDisposition.WRITE_EMPTY,
                 CreateDisposition.CREATE_IF_NEEDED,
-                tableDescription)));
+                tableDescription))
+            .withSideInputs(jobIdTokenView));
 
         PCollectionView<Iterable<String>> tempTablesView = tempTables
             .apply("TempTablesView", View.<String>asIterable());
         singleton.apply(ParDo
             .of(new WriteRename(
                 bqServices,
-                jobIdToken,
+                jobIdTokenView,
                 NestedValueProvider.of(table, new TableRefToJson()),
                 writeDisposition,
                 createDisposition,
                 tempTablesView,
                 tableDescription))
-            .withSideInputs(tempTablesView));
+            .withSideInputs(tempTablesView, jobIdTokenView));
 
         // Write single partition to final table
         partitions.get(singlePartitionTag)
@@ -2070,13 +2077,14 @@ public class BigQueryIO {
             .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
                 true,
                 bqServices,
-                jobIdToken,
+                jobIdTokenView,
                 tempFilePrefix,
                 NestedValueProvider.of(table, new TableRefToJson()),
                 jsonSchema,
                 writeDisposition,
                 createDisposition,
-                tableDescription)));
+                tableDescription))
+            .withSideInputs(jobIdTokenView));
 
         return PDone.in(input.getPipeline());
       }
@@ -2325,7 +2333,7 @@ public class BigQueryIO {
     static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, 
String> {
       private final boolean singlePartition;
       private final BigQueryServices bqServices;
-      private final ValueProvider<String> jobIdToken;
+      private final PCollectionView<String> jobIdToken;
       private final String tempFilePrefix;
       private final ValueProvider<String> jsonTableRef;
       private final ValueProvider<String> jsonSchema;
@@ -2336,7 +2344,7 @@ public class BigQueryIO {
       public WriteTables(
           boolean singlePartition,
           BigQueryServices bqServices,
-          ValueProvider<String> jobIdToken,
+          PCollectionView<String> jobIdToken,
           String tempFilePrefix,
           ValueProvider<String> jsonTableRef,
           ValueProvider<String> jsonSchema,
@@ -2357,7 +2365,8 @@ public class BigQueryIO {
       @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         List<String> partition = 
Lists.newArrayList(c.element().getValue()).get(0);
-        String jobIdPrefix = String.format(jobIdToken.get() + "_%05d", 
c.element().getKey());
+        String jobIdPrefix = String.format(
+            c.sideInput(jobIdToken) + "_%05d", c.element().getKey());
         TableReference ref = fromJsonString(jsonTableRef.get(), 
TableReference.class);
         if (!singlePartition) {
           ref.setTableId(jobIdPrefix);
@@ -2460,8 +2469,6 @@ public class BigQueryIO {
         super.populateDisplayData(builder);
 
         builder
-            .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken)
-                .withLabel("Job ID Token"))
             .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
                 .withLabel("Temporary File Prefix"))
             .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
@@ -2478,7 +2485,7 @@ public class BigQueryIO {
      */
     static class WriteRename extends DoFn<String, Void> {
       private final BigQueryServices bqServices;
-      private final ValueProvider<String> jobIdToken;
+      private final PCollectionView<String> jobIdToken;
       private final ValueProvider<String> jsonTableRef;
       private final WriteDisposition writeDisposition;
       private final CreateDisposition createDisposition;
@@ -2487,7 +2494,7 @@ public class BigQueryIO {
 
       public WriteRename(
           BigQueryServices bqServices,
-          ValueProvider<String> jobIdToken,
+          PCollectionView<String> jobIdToken,
           ValueProvider<String> jsonTableRef,
           WriteDisposition writeDisposition,
           CreateDisposition createDisposition,
@@ -2518,7 +2525,7 @@ public class BigQueryIO {
         copy(
             
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
             
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
-            jobIdToken.get(),
+            c.sideInput(jobIdToken),
             fromJsonString(jsonTableRef.get(), TableReference.class),
             tempTables,
             writeDisposition,
@@ -2598,8 +2605,6 @@ public class BigQueryIO {
         super.populateDisplayData(builder);
 
         builder
-            .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken)
-                .withLabel("Job ID Token"))
             .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
                 .withLabel("Table Reference"))
             .add(DisplayData.item("writeDisposition", 
writeDisposition.toString())

http://git-wip-us.apache.org/repos/asf/beam/blob/0676cf2e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 3aa90cf..fe41703 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -140,6 +140,7 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -2200,6 +2201,8 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWriteTables() throws Exception {
+    p.enableAbandonedNodeEnforcement(false);
+
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
             .startJobReturns("done", "done", "done", "done")
@@ -2223,10 +2226,18 @@ public class BigQueryIOTest implements Serializable {
       expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", 
jobIdToken, i));
     }
 
+    PCollectionView<Iterable<String>> tempTablesView = 
PCollectionViews.iterableView(
+        p,
+        WindowingStrategy.globalDefault(),
+        StringUtf8Coder.of());
+    PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", 
Create.of("jobId"));
+    PCollectionView<String> jobIdTokenView =
+        jobIdTokenCollection.apply(View.<String>asSingleton());
+
     WriteTables writeTables = new WriteTables(
         false,
         fakeBqServices,
-        StaticValueProvider.of(jobIdToken),
+        jobIdTokenView,
         tempFilePrefix,
         StaticValueProvider.of(jsonTable),
         StaticValueProvider.of(jsonSchema),
@@ -2235,6 +2246,7 @@ public class BigQueryIOTest implements Serializable {
         null);
 
     DoFnTester<KV<Long, Iterable<List<String>>>, String> tester = 
DoFnTester.of(writeTables);
+    tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
     for (KV<Long, Iterable<List<String>>> partition : partitions) {
       tester.processElement(partition);
     }
@@ -2296,10 +2308,13 @@ public class BigQueryIOTest implements Serializable {
         p,
         WindowingStrategy.globalDefault(),
         StringUtf8Coder.of());
+    PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", 
Create.of("jobId"));
+    PCollectionView<String> jobIdTokenView =
+        jobIdTokenCollection.apply(View.<String>asSingleton());
 
     WriteRename writeRename = new WriteRename(
         fakeBqServices,
-        StaticValueProvider.of(jobIdToken),
+        jobIdTokenView,
         StaticValueProvider.of(jsonTable),
         WriteDisposition.WRITE_EMPTY,
         CreateDisposition.CREATE_IF_NEEDED,
@@ -2308,6 +2323,7 @@ public class BigQueryIOTest implements Serializable {
 
     DoFnTester<String, Void> tester = DoFnTester.of(writeRename);
     tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
+    tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
     tester.processElement(null);
   }
 
@@ -2473,30 +2489,4 @@ public class BigQueryIOTest implements Serializable {
     assertNotEquals(read1.stepUuid, read2.stepUuid);
     assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get());
   }
-
-  @Test
-  public void testUniqueStepIdWrite() {
-    RuntimeTestOptions options = 
PipelineOptionsFactory.as(RuntimeTestOptions.class);
-    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    bqOptions.setTempLocation("gs://testbucket/testdir");
-    Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Write.Bound write1 = BigQueryIO.Write
-        .to(options.getOutputTable())
-        .withSchema(NestedValueProvider.of(
-            options.getOutputSchema(), new JsonSchemaToTableSchema()))
-        .withoutValidation();
-    BigQueryIO.Write.Bound write2 = BigQueryIO.Write
-        .to(options.getOutputTable())
-        .withSchema(NestedValueProvider.of(
-            options.getOutputSchema(), new JsonSchemaToTableSchema()))
-        .withoutValidation();
-    pipeline
-        .apply(Create.empty(TableRowJsonCoder.of()))
-        .apply(write1);
-    pipeline
-        .apply(Create.empty(TableRowJsonCoder.of()))
-        .apply(write2);
-    assertNotEquals(write1.stepUuid, write2.stepUuid);
-    assertNotEquals(write1.jobUuid.get(), write2.jobUuid.get());
-  }
 }

Reply via email to