y1chi commented on code in PR #29230:
URL: https://github.com/apache/beam/pull/29230#discussion_r1384063689


##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions 
buildPipelineOptions() throws IOException
     return options;
   }
 
+  // Test that the transform names are what we expect them to be
+  @Test
+  public void testStorageWriteApiTransformNames() throws IOException, 
Exception {
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    List<String> inputs = Lists.newArrayList();

Review Comment:
   you could probably just inline some input elements with Create.of(), since 
the input is not important we can make it much simpler.



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions 
buildPipelineOptions() throws IOException
     return options;
   }
 
+  // Test that the transform names are what we expect them to be

Review Comment:
   Can we make the comment more clear what is expected and why. Also please add 
a TODO comment with github issue link for more stable solution.



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions 
buildPipelineOptions() throws IOException
     return options;
   }
 
+  // Test that the transform names are what we expect them to be
+  @Test
+  public void testStorageWriteApiTransformNames() throws IOException, 
Exception {
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    List<String> inputs = Lists.newArrayList();
+    for (int i = 0; i < 100; ++i) {
+      inputs.add("mambo_number_" + i);
+    }
+
+    BigQueryIO.Write<String> writeTransform =
+        BigQueryIO.<String>write()
+            .withFormatFunction(
+                (SerializableFunction<String, TableRow>)
+                    input1 -> new TableRow().set("description", input1))
+            .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+            .to(
+                new TableReference()
+                    .setProjectId("project")
+                    .setDatasetId("dataset")
+                    .setTableId("table"))
+            .withSchema(
+                new TableSchema()
+                    .setFields(
+                        new ArrayList<>(
+                            ImmutableList.of(
+                                new 
TableFieldSchema().setName("description").setType("STRING")))));
+
+    p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply("StorageWriteApi", writeTransform);
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, 
sdkComponents, true);
+    DataflowPipelineTranslator t =
+        DataflowPipelineTranslator.fromOptions(
+            PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+    JobSpecification jobSpecification =
+        t.translate(
+            p,
+            pipelineProto,
+            sdkComponents,
+            DataflowRunner.fromOptions(options),
+            Collections.emptyList());
+    assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());

Review Comment:
   Is this check relevant to what we are testing?



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions 
buildPipelineOptions() throws IOException
     return options;
   }
 
+  // Test that the transform names are what we expect them to be
+  @Test
+  public void testStorageWriteApiTransformNames() throws IOException, 
Exception {
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    List<String> inputs = Lists.newArrayList();
+    for (int i = 0; i < 100; ++i) {
+      inputs.add("mambo_number_" + i);
+    }
+
+    BigQueryIO.Write<String> writeTransform =
+        BigQueryIO.<String>write()
+            .withFormatFunction(
+                (SerializableFunction<String, TableRow>)
+                    input1 -> new TableRow().set("description", input1))
+            .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+            .to(
+                new TableReference()
+                    .setProjectId("project")
+                    .setDatasetId("dataset")
+                    .setTableId("table"))
+            .withSchema(
+                new TableSchema()
+                    .setFields(
+                        new ArrayList<>(
+                            ImmutableList.of(
+                                new 
TableFieldSchema().setName("description").setType("STRING")))));
+
+    p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply("StorageWriteApi", writeTransform);
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, 
sdkComponents, true);
+    DataflowPipelineTranslator t =
+        DataflowPipelineTranslator.fromOptions(
+            PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+    JobSpecification jobSpecification =
+        t.translate(
+            p,
+            pipelineProto,
+            sdkComponents,
+            DataflowRunner.fromOptions(options),
+            Collections.emptyList());
+    assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
+
+    // Assert that at least one of the steps added was the StorageApiLoads 
step. This is added to
+    // ensure the name doesn't change.
+    boolean foundStep = false;
+    for (Step step : jobSpecification.getJob().getSteps()) {
+      if (getString(step.getProperties(), 
PropertyNames.USER_NAME).contains("/StorageApiLoads")) {

Review Comment:
   I think StorageWriteApi should also be included in this step name? We can be 
more specific that StorageWriteApi step contains "/StorageApiLoads"



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions 
buildPipelineOptions() throws IOException
     return options;
   }
 
+  // Test that the transform names are what we expect them to be
+  @Test
+  public void testStorageWriteApiTransformNames() throws IOException, 
Exception {
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    List<String> inputs = Lists.newArrayList();
+    for (int i = 0; i < 100; ++i) {
+      inputs.add("mambo_number_" + i);
+    }
+
+    BigQueryIO.Write<String> writeTransform =
+        BigQueryIO.<String>write()
+            .withFormatFunction(
+                (SerializableFunction<String, TableRow>)
+                    input1 -> new TableRow().set("description", input1))
+            .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+            .to(
+                new TableReference()
+                    .setProjectId("project")
+                    .setDatasetId("dataset")
+                    .setTableId("table"))
+            .withSchema(
+                new TableSchema()
+                    .setFields(
+                        new ArrayList<>(
+                            ImmutableList.of(
+                                new 
TableFieldSchema().setName("description").setType("STRING")))));
+
+    p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()))

Review Comment:
   can we combine these two tests (StorageApiLoads and 
WriteAutoSharededBundlesToTempFiles) into 1. We can just use different branches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to