This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch release-2.33.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.33.0 by this push:
new e0f8f7a Merge pull request #15454 from ihji/BEAM-12838
new 3ff6f9c Merge pull request #15474 from ihji/cherry-pick-12838
e0f8f7a is described below
commit e0f8f7a39bc5057fb2e24923d5ddb0709f0ad0ab
Author: Heejong Lee <[email protected]>
AuthorDate: Tue Sep 7 11:21:36 2021 -0700
Merge pull request #15454 from ihji/BEAM-12838
[BEAM-12838] Update artifact local path for DataflowRunner Java
---
.../beam/runners/dataflow/DataflowRunner.java | 57 +++++++++++++-
.../beam/runners/dataflow/DataflowRunnerTest.java | 91 ++++++++++++++++++++++
2 files changed, 147 insertions(+), 1 deletion(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fa7c1ec..c1220b0 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -109,6 +109,7 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
@@ -849,6 +850,57 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
}
}
+ @VisibleForTesting
+ protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) {
+ RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder();
+ RunnerApi.Components.Builder componentsBuilder =
pipelineBuilder.getComponentsBuilder();
+ componentsBuilder.clearEnvironments();
+ for (Map.Entry<String, RunnerApi.Environment> entry :
+ pipeline.getComponents().getEnvironmentsMap().entrySet()) {
+ RunnerApi.Environment.Builder environmentBuilder =
entry.getValue().toBuilder();
+ environmentBuilder.clearDependencies();
+ for (RunnerApi.ArtifactInformation info :
entry.getValue().getDependenciesList()) {
+ if
(!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn()))
{
+ throw new RuntimeException(
+ String.format("unsupported artifact type %s",
info.getTypeUrn()));
+ }
+ RunnerApi.ArtifactFilePayload filePayload;
+ try {
+ filePayload =
RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Error parsing artifact file payload.",
e);
+ }
+ if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO)
+ .equals(info.getRoleUrn())) {
+ throw new RuntimeException(
+ String.format("unsupported artifact role %s",
info.getRoleUrn()));
+ }
+ RunnerApi.ArtifactStagingToRolePayload stagingPayload;
+ try {
+ stagingPayload =
RunnerApi.ArtifactStagingToRolePayload.parseFrom(info.getRolePayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Error parsing artifact staging_to role
payload.", e);
+ }
+ environmentBuilder.addDependencies(
+ info.toBuilder()
+
.setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+ .setTypePayload(
+ RunnerApi.ArtifactUrlPayload.newBuilder()
+ .setUrl(
+
FileSystems.matchNewResource(options.getStagingLocation(), true)
+ .resolve(
+ stagingPayload.getStagedName(),
+
ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+ .toString())
+ .setSha256(filePayload.getSha256())
+ .build()
+ .toByteString()));
+ }
+ componentsBuilder.putEnvironments(entry.getKey(),
environmentBuilder.build());
+ }
+ return pipelineBuilder.build();
+ }
+
private List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) {
ImmutableList.Builder<StagedFile> filesToStageBuilder =
ImmutableList.builder();
for (Map.Entry<String, RunnerApi.Environment> entry :
@@ -952,6 +1004,10 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
RunnerApi.Pipeline portablePipelineProto =
PipelineTranslation.toProto(pipeline, portableComponents, false);
+ // Note that `stageArtifacts` has to be called before `resolveArtifact`
because
+ // `resolveArtifact` updates local paths to staged paths in pipeline proto.
+ List<DataflowPackage> packages = stageArtifacts(portablePipelineProto);
+ portablePipelineProto = resolveArtifacts(portablePipelineProto);
LOG.debug("Portable pipeline proto:\n{}",
TextFormat.printToString(portablePipelineProto));
// Stage the portable pipeline proto, retrieving the staged pipeline path,
then update
// the options on the new job
@@ -976,7 +1032,6 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
RunnerApi.Pipeline dataflowV1PipelineProto =
PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
LOG.debug("Dataflow v1 pipeline proto:\n{}",
TextFormat.printToString(dataflowV1PipelineProto));
- List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);
// Set a unique client_request_id in the CreateJob request.
// This is used to ensure idempotence of job creation across retried
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 55aa182..f5bde8e 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1179,6 +1179,97 @@ public class DataflowRunnerTest implements Serializable {
}
@Test
+ public void testResolveArtifacts() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
+ String stagingLocation = options.getStagingLocation().replaceFirst("/$",
"");
+ RunnerApi.ArtifactInformation fooLocalArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+
.setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+ .setTypePayload(
+ RunnerApi.ArtifactFilePayload.newBuilder()
+ .setPath("/tmp/foo.jar")
+ .build()
+ .toByteString())
+
.setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("foo_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.ArtifactInformation barLocalArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+
.setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+ .setTypePayload(
+ RunnerApi.ArtifactFilePayload.newBuilder()
+ .setPath("/tmp/bar.jar")
+ .build()
+ .toByteString())
+
.setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("bar_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.Pipeline pipeline =
+ RunnerApi.Pipeline.newBuilder()
+ .setComponents(
+ RunnerApi.Components.newBuilder()
+ .putEnvironments(
+ "env",
+ RunnerApi.Environment.newBuilder()
+ .addAllDependencies(
+ ImmutableList.of(fooLocalArtifact,
barLocalArtifact))
+ .build()))
+ .build();
+
+ RunnerApi.ArtifactInformation fooStagedArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+ .setTypePayload(
+ RunnerApi.ArtifactUrlPayload.newBuilder()
+ .setUrl(stagingLocation + "/foo_staged.jar")
+ .build()
+ .toByteString())
+
.setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("foo_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.ArtifactInformation barStagedArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+ .setTypePayload(
+ RunnerApi.ArtifactUrlPayload.newBuilder()
+ .setUrl(stagingLocation + "/bar_staged.jar")
+ .build()
+ .toByteString())
+
.setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("bar_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.Pipeline expectedPipeline =
+ RunnerApi.Pipeline.newBuilder()
+ .setComponents(
+ RunnerApi.Components.newBuilder()
+ .putEnvironments(
+ "env",
+ RunnerApi.Environment.newBuilder()
+ .addAllDependencies(
+ ImmutableList.of(fooStagedArtifact,
barStagedArtifact))
+ .build()))
+ .build();
+ assertThat(runner.resolveArtifacts(pipeline), equalTo(expectedPipeline));
+ }
+
+ @Test
public void testGcpTempAndNoTempLocationSucceeds() throws Exception {
DataflowPipelineOptions options =
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);