This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b0edf42489 Add DataflowRunner unit test to guard against change on 
important IO transform step names (#29230)
8b0edf42489 is described below

commit 8b0edf424893e2686ef2d76e08b451f84e628ca8
Author: Naireen Hussain <[email protected]>
AuthorDate: Thu Nov 16 09:45:28 2023 -0800

    Add DataflowRunner unit test to guard against change on important IO 
transform step names (#29230)
    
    Co-authored-by: Naireen <[email protected]>
---
 .../dataflow/DataflowPipelineTranslatorTest.java   | 119 +++++++++++++++++++++
 1 file changed, 119 insertions(+)

diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index e5fb13875cd..6787885d751 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -38,6 +38,10 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.Step;
@@ -85,6 +89,8 @@ import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -101,6 +107,7 @@ import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
@@ -115,6 +122,7 @@ import org.apache.beam.sdk.util.DoFnInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
@@ -229,6 +237,117 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     return options;
   }
 
+  // Test that the transform names for Storage Write API for streaming 
pipelines are what we expect
+  // them to be. This is required since the Windmill backend expects the step 
to contain that name.
+  // For a more stable solution, we should use URN, but that is not currently 
used in the legacy
+  // java
+  // worker.
+  // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN 
information to Dataflow Runner.
+  @Test
+  public void testStorageWriteApiTransformNames() throws IOException, 
Exception {
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    BigQueryIO.Write<String> writeTransform =
+        BigQueryIO.<String>write()
+            .withFormatFunction(
+                (SerializableFunction<String, TableRow>)
+                    input1 -> new TableRow().set("description", input1))
+            .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+            .to(
+                new TableReference()
+                    .setProjectId("project")
+                    .setDatasetId("dataset")
+                    .setTableId("table"))
+            .withSchema(
+                new TableSchema()
+                    .setFields(
+                        new ArrayList<>(
+                            ImmutableList.of(
+                                new 
TableFieldSchema().setName("description").setType("STRING")))));
+
+    p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply("StorageWriteApi", writeTransform);
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, 
sdkComponents, true);
+    DataflowPipelineTranslator t =
+        DataflowPipelineTranslator.fromOptions(
+            PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+    JobSpecification jobSpecification =
+        t.translate(
+            p,
+            pipelineProto,
+            sdkComponents,
+            DataflowRunner.fromOptions(options),
+            Collections.emptyList());
+
+    boolean foundStep = false;
+    for (Step step : jobSpecification.getJob().getSteps()) {
+      if (getString(step.getProperties(), PropertyNames.USER_NAME)
+          .contains("StorageWriteApi/StorageApiLoads")) {
+        foundStep = true;
+      }
+    }
+    assertTrue(foundStep);
+  }
+
+  // Test that the transform names added for TextIO writes with autosharding. 
This is required since
+  // the Windmill backend expects the file write autosharded step to contain 
that name.
+  // For a more stable solution, we should use URN, but that is not currently 
used in the legacy
+  // java
+  // worker.
+  // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN 
information to Dataflow Runner.
+  @Test
+  public void testGCSWriteTransformNames() throws IOException, Exception {
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply(Window.into(FixedWindows.of(Duration.millis(1))))
+        .apply(
+            "WriteMyFile",
+            
TextIO.write().to("gs://bucket/object").withWindowedWrites().withNumShards(0));
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, 
sdkComponents, true);
+    DataflowPipelineTranslator t =
+        DataflowPipelineTranslator.fromOptions(
+            PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+    JobSpecification jobSpecification =
+        t.translate(
+            p,
+            pipelineProto,
+            sdkComponents,
+            DataflowRunner.fromOptions(options),
+            Collections.emptyList());
+
+    // Assert that at least one of the steps added was the autosharded write. 
This is added to
+    // ensure the name doesn't change.
+    boolean foundStep = false;
+    for (Step step : jobSpecification.getJob().getSteps()) {
+      if (getString(step.getProperties(), PropertyNames.USER_NAME)
+          .contains("WriteFiles/WriteAutoShardedBundlesToTempFiles")) {
+        foundStep = true;
+      }
+    }
+    assertTrue(foundStep);
+  }
+
   @Test
   public void testNetworkConfig() throws IOException {
     final String testNetwork = "test-network";

Reply via email to