Naireen commented on code in PR #29230:
URL: https://github.com/apache/beam/pull/29230#discussion_r1387055705


##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +237,115 @@ private static DataflowPipelineOptions 
buildPipelineOptions() throws IOException
     return options;
   }
 
+  // Test that the transform names for Storage Write API for streaming 
pipelines are what we expect
+  // them to be. This is required since the Windmill backend expects the step 
to contain that name.
+  // For a more stable solution, we use use urn, but that is not currently 
used in the legacy java
+  // worker.
+  // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN 
information to GCP Runner.
+  @Test
+  public void testStorageWriteApiTransformNames() throws IOException, 
Exception {
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    BigQueryIO.Write<String> writeTransform =
+        BigQueryIO.<String>write()
+            .withFormatFunction(
+                (SerializableFunction<String, TableRow>)
+                    input1 -> new TableRow().set("description", input1))
+            .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+            .to(
+                new TableReference()
+                    .setProjectId("project")
+                    .setDatasetId("dataset")
+                    .setTableId("table"))
+            .withSchema(
+                new TableSchema()
+                    .setFields(
+                        new ArrayList<>(
+                            ImmutableList.of(
+                                new 
TableFieldSchema().setName("description").setType("STRING")))));
+
+    p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply("StorageWriteApi", writeTransform);
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, 
sdkComponents, true);
+    DataflowPipelineTranslator t =
+        DataflowPipelineTranslator.fromOptions(
+            PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+    JobSpecification jobSpecification =
+        t.translate(
+            p,
+            pipelineProto,
+            sdkComponents,
+            DataflowRunner.fromOptions(options),
+            Collections.emptyList());
+
+    boolean foundStep = false;
+    for (Step step : jobSpecification.getJob().getSteps()) {
+      if (getString(step.getProperties(), PropertyNames.USER_NAME)
+          .contains("StorageWriteApi/StorageApiLoads")) {
+        foundStep = true;
+      }
+    }
+    assertTrue(foundStep);
+  }
+
+  // Test that the transform names added for TextIO writes with autosharding. 
This is required since
+  // the Windmill backend expects the file write autosharded step to contain 
that name.
+  // For a more stable solution, we use use urn, but that is not currently 
used in the legacy java
+  // worker.
+  // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN 
information to GCP Runner.
+  @Test
+  public void testGCSWriteTransformNames() throws IOException, Exception {
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply(Window.into(FixedWindows.of(Duration.millis(1))))
+        .apply(
+            "WriteMyFile",
+            
TextIO.write().to("gs://bucket/object").withWindowedWrites().withNumShards(0));
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, 
sdkComponents, true);
+    DataflowPipelineTranslator t =
+        DataflowPipelineTranslator.fromOptions(
+            PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+    JobSpecification jobSpecification =
+        t.translate(
+            p,
+            pipelineProto,
+            sdkComponents,
+            DataflowRunner.fromOptions(options),
+            Collections.emptyList());
+
+    // Assert that at least one of the steps added was the autosharded write. 
This is added to
+    // ensure the name doesn't change.
+    boolean foundStep = false;
+    for (Step step : jobSpecification.getJob().getSteps()) {
+      if (getString(step.getProperties(), PropertyNames.USER_NAME)
+          .contains("/WriteAutoShardedBundlesToTempFiles")) {

Review Comment:
   So its WriteMyFile/WriteFiles/FinalizeTempFileBundles/, I changed this to 
check for "WriteFiles/WriteAutoShardedBundlesToTempFile, since we don't care 
about what the user has set. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to