y1chi commented on code in PR #29230:
URL: https://github.com/apache/beam/pull/29230#discussion_r1385612930
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +237,115 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names for Storage Write API for streaming
pipelines are what we expect
+ // them to be. This is required since the Windmill backend expects the step
to contain that name.
+ // For a more stable solution, we use use urn, but that is not currently
used in the legacy java
Review Comment:
```suggestion
// For a more stable solution, we should use URN, but that is not
currently used in the legacy java
```
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +237,115 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names for Storage Write API for streaming
pipelines are what we expect
+ // them to be. This is required since the Windmill backend expects the step
to contain that name.
+ // For a more stable solution, we use use urn, but that is not currently
used in the legacy java
+ // worker.
+ // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to GCP Runner.
+ @Test
+ public void testStorageWriteApiTransformNames() throws IOException,
Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ BigQueryIO.Write<String> writeTransform =
+ BigQueryIO.<String>write()
+ .withFormatFunction(
+ (SerializableFunction<String, TableRow>)
+ input1 -> new TableRow().set("description", input1))
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+ .to(
+ new TableReference()
+ .setProjectId("project")
+ .setDatasetId("dataset")
+ .setTableId("table"))
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ new ArrayList<>(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("description").setType("STRING")))));
+
+ p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply("StorageWriteApi", writeTransform);
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineTranslator t =
+ DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+
+ boolean foundStep = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getString(step.getProperties(), PropertyNames.USER_NAME)
+ .contains("StorageWriteApi/StorageApiLoads")) {
+ foundStep = true;
+ }
+ }
+ assertTrue(foundStep);
+ }
+
+ // Test that the transform names added for TextIO writes with autosharding.
This is required since
+ // the Windmill backend expects the file write autosharded step to contain
that name.
+ // For a more stable solution, we use use urn, but that is not currently
used in the legacy java
Review Comment:
```suggestion
// For a more stable solution, we should use URN, but that is not
currently used in the legacy java
```
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +237,115 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names for Storage Write API for streaming
pipelines are what we expect
+ // them to be. This is required since the Windmill backend expects the step
to contain that name.
+ // For a more stable solution, we use use urn, but that is not currently
used in the legacy java
+ // worker.
+ // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to GCP Runner.
+ @Test
+ public void testStorageWriteApiTransformNames() throws IOException,
Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ BigQueryIO.Write<String> writeTransform =
+ BigQueryIO.<String>write()
+ .withFormatFunction(
+ (SerializableFunction<String, TableRow>)
+ input1 -> new TableRow().set("description", input1))
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+ .to(
+ new TableReference()
+ .setProjectId("project")
+ .setDatasetId("dataset")
+ .setTableId("table"))
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ new ArrayList<>(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("description").setType("STRING")))));
+
+ p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply("StorageWriteApi", writeTransform);
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineTranslator t =
+ DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+
+ boolean foundStep = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getString(step.getProperties(), PropertyNames.USER_NAME)
+ .contains("StorageWriteApi/StorageApiLoads")) {
+ foundStep = true;
+ }
+ }
+ assertTrue(foundStep);
+ }
+
+ // Test that the transform names added for TextIO writes with autosharding.
This is required since
+ // the Windmill backend expects the file write autosharded step to contain
that name.
+ // For a more stable solution, we use use urn, but that is not currently
used in the legacy java
+ // worker.
+ // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to GCP Runner.
+ @Test
+ public void testGCSWriteTransformNames() throws IOException, Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply(Window.into(FixedWindows.of(Duration.millis(1))))
+ .apply(
+ "WriteMyFile",
+
TextIO.write().to("gs://bucket/object").withWindowedWrites().withNumShards(0));
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineTranslator t =
+ DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+
+ // Assert that at least one of the steps added was the autosharded write.
This is added to
+ // ensure the name doesn't change.
+ boolean foundStep = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getString(step.getProperties(), PropertyNames.USER_NAME)
+ .contains("/WriteAutoShardedBundlesToTempFiles")) {
Review Comment:
nit:WriteMyFile/WriteAutoShardedBundlesToTempFiles?
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +237,115 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names for Storage Write API for streaming
pipelines are what we expect
+ // them to be. This is required since the Windmill backend expects the step
to contain that name.
+ // For a more stable solution, we use use urn, but that is not currently
used in the legacy java
+ // worker.
+ // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to GCP Runner.
+ @Test
+ public void testStorageWriteApiTransformNames() throws IOException,
Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ BigQueryIO.Write<String> writeTransform =
+ BigQueryIO.<String>write()
+ .withFormatFunction(
+ (SerializableFunction<String, TableRow>)
+ input1 -> new TableRow().set("description", input1))
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+ .to(
+ new TableReference()
+ .setProjectId("project")
+ .setDatasetId("dataset")
+ .setTableId("table"))
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ new ArrayList<>(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("description").setType("STRING")))));
+
+ p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply("StorageWriteApi", writeTransform);
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineTranslator t =
+ DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+
+ boolean foundStep = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getString(step.getProperties(), PropertyNames.USER_NAME)
+ .contains("StorageWriteApi/StorageApiLoads")) {
+ foundStep = true;
+ }
+ }
+ assertTrue(foundStep);
+ }
+
+ // Test that the transform names added for TextIO writes with autosharding.
This is required since
+ // the Windmill backend expects the file write autosharded step to contain
that name.
+ // For a more stable solution, we use use urn, but that is not currently
used in the legacy java
+ // worker.
+ // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to GCP Runner.
Review Comment:
```suggestion
// TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to Dataflow Runner.
```
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java:
##########
@@ -229,6 +237,115 @@ private static DataflowPipelineOptions
buildPipelineOptions() throws IOException
return options;
}
+ // Test that the transform names for Storage Write API for streaming
pipelines are what we expect
+ // them to be. This is required since the Windmill backend expects the step
to contain that name.
+ // For a more stable solution, we use use urn, but that is not currently
used in the legacy java
+ // worker.
+ // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to GCP Runner.
Review Comment:
```suggestion
// TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to Dataflow Runner.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]