Naireen commented on code in PR #29230:
URL: https://github.com/apache/beam/pull/29230#discussion_r1385603743
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names are what we expect them to be
Review Comment:
Updated, plus added link to issue.
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names are what we expect them to be
+ @Test
+ public void testStorageWriteApiTransformNames() throws IOException,
Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ List<String> inputs = Lists.newArrayList();
+ for (int i = 0; i < 100; ++i) {
+ inputs.add("mambo_number_" + i);
+ }
+
+ BigQueryIO.Write<String> writeTransform =
+ BigQueryIO.<String>write()
+ .withFormatFunction(
+ (SerializableFunction<String, TableRow>)
+ input1 -> new TableRow().set("description", input1))
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+ .to(
+ new TableReference()
+ .setProjectId("project")
+ .setDatasetId("dataset")
+ .setTableId("table"))
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ new ArrayList<>(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("description").setType("STRING")))));
+
+ p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply("StorageWriteApi", writeTransform);
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineTranslator t =
+ DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+ assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
+
+ // Assert that at least one of the steps added was the StorageApiLoads
step. This is added to
+ // ensure the name doesn't change.
+ boolean foundStep = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getString(step.getProperties(),
PropertyNames.USER_NAME).contains("/StorageApiLoads")) {
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]