This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch release-2.33.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.33.0 by this push:
     new e0f8f7a  Merge pull request #15454 from ihji/BEAM-12838
     new 3ff6f9c  Merge pull request #15474 from ihji/cherry-pick-12838
e0f8f7a is described below

commit e0f8f7a39bc5057fb2e24923d5ddb0709f0ad0ab
Author: Heejong Lee <[email protected]>
AuthorDate: Tue Sep 7 11:21:36 2021 -0700

    Merge pull request #15454 from ihji/BEAM-12838
    
    [BEAM-12838] Update artifact local path for DataflowRunner Java
---
 .../beam/runners/dataflow/DataflowRunner.java      | 57 +++++++++++++-
 .../beam/runners/dataflow/DataflowRunnerTest.java  | 91 ++++++++++++++++++++++
 2 files changed, 147 insertions(+), 1 deletion(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fa7c1ec..c1220b0 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -109,6 +109,7 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
@@ -849,6 +850,57 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     }
   }
 
+  @VisibleForTesting
+  protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) {
+    RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder();
+    RunnerApi.Components.Builder componentsBuilder = 
pipelineBuilder.getComponentsBuilder();
+    componentsBuilder.clearEnvironments();
+    for (Map.Entry<String, RunnerApi.Environment> entry :
+        pipeline.getComponents().getEnvironmentsMap().entrySet()) {
+      RunnerApi.Environment.Builder environmentBuilder = 
entry.getValue().toBuilder();
+      environmentBuilder.clearDependencies();
+      for (RunnerApi.ArtifactInformation info : 
entry.getValue().getDependenciesList()) {
+        if 
(!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn()))
 {
+          throw new RuntimeException(
+              String.format("unsupported artifact type %s", 
info.getTypeUrn()));
+        }
+        RunnerApi.ArtifactFilePayload filePayload;
+        try {
+          filePayload = 
RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing artifact file payload.", 
e);
+        }
+        if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO)
+            .equals(info.getRoleUrn())) {
+          throw new RuntimeException(
+              String.format("unsupported artifact role %s", 
info.getRoleUrn()));
+        }
+        RunnerApi.ArtifactStagingToRolePayload stagingPayload;
+        try {
+          stagingPayload = 
RunnerApi.ArtifactStagingToRolePayload.parseFrom(info.getRolePayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing artifact staging_to role 
payload.", e);
+        }
+        environmentBuilder.addDependencies(
+            info.toBuilder()
+                
.setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+                .setTypePayload(
+                    RunnerApi.ArtifactUrlPayload.newBuilder()
+                        .setUrl(
+                            
FileSystems.matchNewResource(options.getStagingLocation(), true)
+                                .resolve(
+                                    stagingPayload.getStagedName(),
+                                    
ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+                                .toString())
+                        .setSha256(filePayload.getSha256())
+                        .build()
+                        .toByteString()));
+      }
+      componentsBuilder.putEnvironments(entry.getKey(), 
environmentBuilder.build());
+    }
+    return pipelineBuilder.build();
+  }
+
   private List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) {
     ImmutableList.Builder<StagedFile> filesToStageBuilder = 
ImmutableList.builder();
     for (Map.Entry<String, RunnerApi.Environment> entry :
@@ -952,6 +1004,10 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
     RunnerApi.Pipeline portablePipelineProto =
         PipelineTranslation.toProto(pipeline, portableComponents, false);
+    // Note that `stageArtifacts` has to be called before `resolveArtifact` 
because
+    // `resolveArtifact` updates local paths to staged paths in pipeline proto.
+    List<DataflowPackage> packages = stageArtifacts(portablePipelineProto);
+    portablePipelineProto = resolveArtifacts(portablePipelineProto);
     LOG.debug("Portable pipeline proto:\n{}", 
TextFormat.printToString(portablePipelineProto));
     // Stage the portable pipeline proto, retrieving the staged pipeline path, 
then update
     // the options on the new job
@@ -976,7 +1032,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     RunnerApi.Pipeline dataflowV1PipelineProto =
         PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
     LOG.debug("Dataflow v1 pipeline proto:\n{}", 
TextFormat.printToString(dataflowV1PipelineProto));
-    List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);
 
     // Set a unique client_request_id in the CreateJob request.
     // This is used to ensure idempotence of job creation across retried
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 55aa182..f5bde8e 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1179,6 +1179,97 @@ public class DataflowRunnerTest implements Serializable {
   }
 
   @Test
+  public void testResolveArtifacts() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    String stagingLocation = options.getStagingLocation().replaceFirst("/$", 
"");
+    RunnerApi.ArtifactInformation fooLocalArtifact =
+        RunnerApi.ArtifactInformation.newBuilder()
+            
.setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+            .setTypePayload(
+                RunnerApi.ArtifactFilePayload.newBuilder()
+                    .setPath("/tmp/foo.jar")
+                    .build()
+                    .toByteString())
+            
.setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+            .setRolePayload(
+                RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                    .setStagedName("foo_staged.jar")
+                    .build()
+                    .toByteString())
+            .build();
+    RunnerApi.ArtifactInformation barLocalArtifact =
+        RunnerApi.ArtifactInformation.newBuilder()
+            
.setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+            .setTypePayload(
+                RunnerApi.ArtifactFilePayload.newBuilder()
+                    .setPath("/tmp/bar.jar")
+                    .build()
+                    .toByteString())
+            
.setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+            .setRolePayload(
+                RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                    .setStagedName("bar_staged.jar")
+                    .build()
+                    .toByteString())
+            .build();
+    RunnerApi.Pipeline pipeline =
+        RunnerApi.Pipeline.newBuilder()
+            .setComponents(
+                RunnerApi.Components.newBuilder()
+                    .putEnvironments(
+                        "env",
+                        RunnerApi.Environment.newBuilder()
+                            .addAllDependencies(
+                                ImmutableList.of(fooLocalArtifact, 
barLocalArtifact))
+                            .build()))
+            .build();
+
+    RunnerApi.ArtifactInformation fooStagedArtifact =
+        RunnerApi.ArtifactInformation.newBuilder()
+            .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+            .setTypePayload(
+                RunnerApi.ArtifactUrlPayload.newBuilder()
+                    .setUrl(stagingLocation + "/foo_staged.jar")
+                    .build()
+                    .toByteString())
+            
.setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+            .setRolePayload(
+                RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                    .setStagedName("foo_staged.jar")
+                    .build()
+                    .toByteString())
+            .build();
+    RunnerApi.ArtifactInformation barStagedArtifact =
+        RunnerApi.ArtifactInformation.newBuilder()
+            .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+            .setTypePayload(
+                RunnerApi.ArtifactUrlPayload.newBuilder()
+                    .setUrl(stagingLocation + "/bar_staged.jar")
+                    .build()
+                    .toByteString())
+            
.setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+            .setRolePayload(
+                RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                    .setStagedName("bar_staged.jar")
+                    .build()
+                    .toByteString())
+            .build();
+    RunnerApi.Pipeline expectedPipeline =
+        RunnerApi.Pipeline.newBuilder()
+            .setComponents(
+                RunnerApi.Components.newBuilder()
+                    .putEnvironments(
+                        "env",
+                        RunnerApi.Environment.newBuilder()
+                            .addAllDependencies(
+                                ImmutableList.of(fooStagedArtifact, 
barStagedArtifact))
+                            .build()))
+            .build();
+    assertThat(runner.resolveArtifacts(pipeline), equalTo(expectedPipeline));
+  }
+
+  @Test
   public void testGcpTempAndNoTempLocationSucceeds() throws Exception {
     DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);

Reply via email to