y1chi commented on code in PR #29230:
URL: https://github.com/apache/beam/pull/29230#discussion_r1384063689
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names are what we expect them to be
+ @Test
+ public void testStorageWriteApiTransformNames() throws IOException,
Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ List<String> inputs = Lists.newArrayList();
Review Comment:
you could probably just inline some input elements with Create.of(), since
the input is not important we can make it much simpler.
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names are what we expect them to be
Review Comment:
Can we make the comment more clear what is expected and why. Also please add
a TODO comment with github issue link for more stable solution.
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names are what we expect them to be
+ @Test
+ public void testStorageWriteApiTransformNames() throws IOException,
Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ List<String> inputs = Lists.newArrayList();
+ for (int i = 0; i < 100; ++i) {
+ inputs.add("mambo_number_" + i);
+ }
+
+ BigQueryIO.Write<String> writeTransform =
+ BigQueryIO.<String>write()
+ .withFormatFunction(
+ (SerializableFunction<String, TableRow>)
+ input1 -> new TableRow().set("description", input1))
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+ .to(
+ new TableReference()
+ .setProjectId("project")
+ .setDatasetId("dataset")
+ .setTableId("table"))
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ new ArrayList<>(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("description").setType("STRING")))));
+
+ p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply("StorageWriteApi", writeTransform);
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineTranslator t =
+ DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+ assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
Review Comment:
Is this check relevant to what we are testing?
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names are what we expect them to be
+ @Test
+ public void testStorageWriteApiTransformNames() throws IOException,
Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ List<String> inputs = Lists.newArrayList();
+ for (int i = 0; i < 100; ++i) {
+ inputs.add("mambo_number_" + i);
+ }
+
+ BigQueryIO.Write<String> writeTransform =
+ BigQueryIO.<String>write()
+ .withFormatFunction(
+ (SerializableFunction<String, TableRow>)
+ input1 -> new TableRow().set("description", input1))
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+ .to(
+ new TableReference()
+ .setProjectId("project")
+ .setDatasetId("dataset")
+ .setTableId("table"))
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ new ArrayList<>(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("description").setType("STRING")))));
+
+ p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply("StorageWriteApi", writeTransform);
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineTranslator t =
+ DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+ assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
+
+ // Assert that at least one of the steps added was the StorageApiLoads
step. This is added to
+ // ensure the name doesn't change.
+ boolean foundStep = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getString(step.getProperties(),
PropertyNames.USER_NAME).contains("/StorageApiLoads")) {
Review Comment:
I think StorageWriteApi should also be included in this step name? We can be
more specific that StorageWriteApi step contains "/StorageApiLoads"
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +238,126 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names are what we expect them to be
+ @Test
+ public void testStorageWriteApiTransformNames() throws IOException,
Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ List<String> inputs = Lists.newArrayList();
+ for (int i = 0; i < 100; ++i) {
+ inputs.add("mambo_number_" + i);
+ }
+
+ BigQueryIO.Write<String> writeTransform =
+ BigQueryIO.<String>write()
+ .withFormatFunction(
+ (SerializableFunction<String, TableRow>)
+ input1 -> new TableRow().set("description", input1))
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+ .to(
+ new TableReference()
+ .setProjectId("project")
+ .setDatasetId("dataset")
+ .setTableId("table"))
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ new ArrayList<>(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("description").setType("STRING")))));
+
+ p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of()))
Review Comment:
can we combine these two tests (StorageApiLoads and
WriteAutoSharededBundlesToTempFiles) into 1. We can just use different branches.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]