This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 86ba1181896 [BEAM-14232] Only resolve artifacts in expanded 
environments for Java External transform
     new d1600823626 Merge pull request #17249 from ihji/BEAM-14232
86ba1181896 is described below

commit 86ba1181896130394d665ca0e135ab026a136328
Author: Heejong Lee <[email protected]>
AuthorDate: Fri Apr 1 10:40:11 2022 -0700

    [BEAM-14232] Only resolve artifacts in expanded environments for Java 
External transform
---
 .../beam/runners/core/construction/External.java   | 48 ++++++++++++++--------
 1 file changed, 32 insertions(+), 16 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
index ec9f5fcb8c9..5578b0188c9 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.beam.model.expansion.v1.ExpansionApi;
 import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
 import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
@@ -224,9 +225,10 @@ public class External {
         }
       }
 
+      RunnerApi.Components originalComponents = components.toComponents();
       ExpansionApi.ExpansionRequest request =
           ExpansionApi.ExpansionRequest.newBuilder()
-              .setComponents(components.toComponents())
+              .setComponents(originalComponents)
               .setTransform(ptransformBuilder.build())
               .setNamespace(getNamespace())
               .build();
@@ -239,7 +241,20 @@ public class External {
             String.format("expansion service error: %s", response.getError()));
       }
 
-      expandedComponents = resolveArtifacts(response.getComponents());
+      Map<String, RunnerApi.Environment> newEnvironmentsWithDependencies =
+          response.getComponents().getEnvironmentsMap().entrySet().stream()
+              .filter(
+                  kv ->
+                      
!originalComponents.getEnvironmentsMap().containsKey(kv.getKey())
+                          && kv.getValue().getDependenciesCount() != 0)
+              .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+      expandedComponents =
+          response
+              .getComponents()
+              .toBuilder()
+              
.putAllEnvironments(resolveArtifacts(newEnvironmentsWithDependencies))
+              .build();
       expandedTransform = response.getTransform();
       expandedRequirements = response.getRequirementsList();
 
@@ -281,29 +296,30 @@ public class External {
       return toOutputCollection(outputMapBuilder.build());
     }
 
-    private RunnerApi.Components resolveArtifacts(RunnerApi.Components 
components) {
-      if (components.getEnvironmentsMap().values().stream()
-          .allMatch(env -> env.getDependenciesCount() == 0)) {
-        return components;
+    private Map<String, RunnerApi.Environment> resolveArtifacts(
+        Map<String, RunnerApi.Environment> environments) {
+      if (environments.size() == 0) {
+        return environments;
       }
-
       ManagedChannel channel =
           ManagedChannelBuilder.forTarget(endpoint.getUrl())
               .usePlaintext()
               .maxInboundMessageSize(Integer.MAX_VALUE)
               .build();
       try {
-        RunnerApi.Components.Builder componentsBuilder = 
components.toBuilder();
         ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub 
retrievalStub =
             ArtifactRetrievalServiceGrpc.newBlockingStub(channel);
-        for (Map.Entry<String, RunnerApi.Environment> env :
-            componentsBuilder.getEnvironmentsMap().entrySet()) {
-          componentsBuilder.putEnvironments(
-              env.getKey(), resolveArtifacts(retrievalStub, env.getValue()));
-        }
-        return componentsBuilder.build();
-      } catch (IOException exn) {
-        throw new RuntimeException(exn);
+        return environments.entrySet().stream()
+            .collect(
+                Collectors.toMap(
+                    Map.Entry::getKey,
+                    kv -> {
+                      try {
+                        return resolveArtifacts(retrievalStub, kv.getValue());
+                      } catch (IOException e) {
+                        throw new RuntimeException(e);
+                      }
+                    }));
       } finally {
         channel.shutdown();
       }

Reply via email to