Naireen commented on code in PR #29230:
URL: https://github.com/apache/beam/pull/29230#discussion_r1385603743


##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions 
buildPipelineOptions() throws IOException
     return options;
   }
 
+  // Test that the transform names are what we expect them to be

Review Comment:
   Updated, plus added link to issue.



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions 
buildPipelineOptions() throws IOException
     return options;
   }
 
+  // Test that the transform names are what we expect them to be
+  @Test
+  public void testStorageWriteApiTransformNames() throws IOException, 
Exception {
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    List<String> inputs = Lists.newArrayList();
+    for (int i = 0; i < 100; ++i) {
+      inputs.add("mambo_number_" + i);
+    }
+
+    BigQueryIO.Write<String> writeTransform =
+        BigQueryIO.<String>write()
+            .withFormatFunction(
+                (SerializableFunction<String, TableRow>)
+                    input1 -> new TableRow().set("description", input1))
+            .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+            .to(
+                new TableReference()
+                    .setProjectId("project")
+                    .setDatasetId("dataset")
+                    .setTableId("table"))
+            .withSchema(
+                new TableSchema()
+                    .setFields(
+                        new ArrayList<>(
+                            ImmutableList.of(
+                                new 
TableFieldSchema().setName("description").setType("STRING")))));
+
+    p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply("StorageWriteApi", writeTransform);
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, 
sdkComponents, true);
+    DataflowPipelineTranslator t =
+        DataflowPipelineTranslator.fromOptions(
+            PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+    JobSpecification jobSpecification =
+        t.translate(
+            p,
+            pipelineProto,
+            sdkComponents,
+            DataflowRunner.fromOptions(options),
+            Collections.emptyList());
+    assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
+
+    // Assert that at least one of the steps added was the StorageApiLoads 
step. This is added to
+    // ensure the name doesn't change.
+    boolean foundStep = false;
+    for (Step step : jobSpecification.getJob().getSteps()) {
+      if (getString(step.getProperties(), 
PropertyNames.USER_NAME).contains("/StorageApiLoads")) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to