This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8b0edf42489 Add DataflowRunner unit test to guard against change on
important IO transform step names (#29230)
8b0edf42489 is described below
commit 8b0edf424893e2686ef2d76e08b451f84e628ca8
Author: Naireen Hussain <[email protected]>
AuthorDate: Thu Nov 16 09:45:28 2023 -0800
Add DataflowRunner unit test to guard against change on important IO
transform step names (#29230)
Co-authored-by: Naireen <[email protected]>
---
.../dataflow/DataflowPipelineTranslatorTest.java | 119 +++++++++++++++++++++
1 file changed, 119 insertions(+)
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index e5fb13875cd..6787885d751 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -38,6 +38,10 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
@@ -85,6 +89,8 @@ import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -101,6 +107,7 @@ import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
@@ -115,6 +122,7 @@ import org.apache.beam.sdk.util.DoFnInfo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
@@ -229,6 +237,117 @@ public class DataflowPipelineTranslatorTest implements
Serializable {
return options;
}
+ // Test that the transform names for Storage Write API for streaming
pipelines are what we expect
+ // them to be. This is required since the Windmill backend expects the step
to contain that name.
+ // For a more stable solution, we should use URN, but that is not currently
used in the legacy
+ // java
+ // worker.
+ // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to Dataflow Runner.
+ @Test
+ public void testStorageWriteApiTransformNames() throws IOException,
Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ BigQueryIO.Write<String> writeTransform =
+ BigQueryIO.<String>write()
+ .withFormatFunction(
+ (SerializableFunction<String, TableRow>)
+ input1 -> new TableRow().set("description", input1))
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withTriggeringFrequency(org.joda.time.Duration.standardSeconds(5))
+ .to(
+ new TableReference()
+ .setProjectId("project")
+ .setDatasetId("dataset")
+ .setTableId("table"))
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ new ArrayList<>(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("description").setType("STRING")))));
+
+ p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply("StorageWriteApi", writeTransform);
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineTranslator t =
+ DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+
+ boolean foundStep = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getString(step.getProperties(), PropertyNames.USER_NAME)
+ .contains("StorageWriteApi/StorageApiLoads")) {
+ foundStep = true;
+ }
+ }
+ assertTrue(foundStep);
+ }
+
+ // Test that the transform names added for TextIO writes with autosharding.
This is required since
+ // the Windmill backend expects the file write autosharded step to contain
that name.
+ // For a more stable solution, we should use URN, but that is not currently
used in the legacy
+ // java
+ // worker.
+ // TODO:(https://github.com/apache/beam/issues/29338) Pass in URN
information to Dataflow Runner.
+ @Test
+ public void testGCSWriteTransformNames() throws IOException, Exception {
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ p.apply(Create.of("1", "2", "3", "4").withCoder(StringUtf8Coder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply(Window.into(FixedWindows.of(Duration.millis(1))))
+ .apply(
+ "WriteMyFile",
+
TextIO.write().to("gs://bucket/object").withWindowedWrites().withNumShards(0));
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineTranslator t =
+ DataflowPipelineTranslator.fromOptions(
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+
+ // Assert that at least one of the steps added was the autosharded write.
This is added to
+ // ensure the name doesn't change.
+ boolean foundStep = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getString(step.getProperties(), PropertyNames.USER_NAME)
+ .contains("WriteFiles/WriteAutoShardedBundlesToTempFiles")) {
+ foundStep = true;
+ }
+ }
+ assertTrue(foundStep);
+ }
+
@Test
public void testNetworkConfig() throws IOException {
final String testNetwork = "test-network";