This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 86ba1181896 [BEAM-14232] Only resolve artifacts in expanded
environments for Java External transform
new d1600823626 Merge pull request #17249 from ihji/BEAM-14232
86ba1181896 is described below
commit 86ba1181896130394d665ca0e135ab026a136328
Author: Heejong Lee <[email protected]>
AuthorDate: Fri Apr 1 10:40:11 2022 -0700
[BEAM-14232] Only resolve artifacts in expanded environments for Java
External transform
---
.../beam/runners/core/construction/External.java | 48 ++++++++++++++--------
1 file changed, 32 insertions(+), 16 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
index ec9f5fcb8c9..5578b0188c9 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
@@ -224,9 +225,10 @@ public class External {
}
}
+ RunnerApi.Components originalComponents = components.toComponents();
ExpansionApi.ExpansionRequest request =
ExpansionApi.ExpansionRequest.newBuilder()
- .setComponents(components.toComponents())
+ .setComponents(originalComponents)
.setTransform(ptransformBuilder.build())
.setNamespace(getNamespace())
.build();
@@ -239,7 +241,20 @@ public class External {
String.format("expansion service error: %s", response.getError()));
}
- expandedComponents = resolveArtifacts(response.getComponents());
+ Map<String, RunnerApi.Environment> newEnvironmentsWithDependencies =
+ response.getComponents().getEnvironmentsMap().entrySet().stream()
+ .filter(
+ kv ->
+
!originalComponents.getEnvironmentsMap().containsKey(kv.getKey())
+ && kv.getValue().getDependenciesCount() != 0)
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ expandedComponents =
+ response
+ .getComponents()
+ .toBuilder()
+
.putAllEnvironments(resolveArtifacts(newEnvironmentsWithDependencies))
+ .build();
expandedTransform = response.getTransform();
expandedRequirements = response.getRequirementsList();
@@ -281,29 +296,30 @@ public class External {
return toOutputCollection(outputMapBuilder.build());
}
- private RunnerApi.Components resolveArtifacts(RunnerApi.Components
components) {
- if (components.getEnvironmentsMap().values().stream()
- .allMatch(env -> env.getDependenciesCount() == 0)) {
- return components;
+ private Map<String, RunnerApi.Environment> resolveArtifacts(
+ Map<String, RunnerApi.Environment> environments) {
+ if (environments.size() == 0) {
+ return environments;
}
-
ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint.getUrl())
.usePlaintext()
.maxInboundMessageSize(Integer.MAX_VALUE)
.build();
try {
- RunnerApi.Components.Builder componentsBuilder =
components.toBuilder();
ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub
retrievalStub =
ArtifactRetrievalServiceGrpc.newBlockingStub(channel);
- for (Map.Entry<String, RunnerApi.Environment> env :
- componentsBuilder.getEnvironmentsMap().entrySet()) {
- componentsBuilder.putEnvironments(
- env.getKey(), resolveArtifacts(retrievalStub, env.getValue()));
- }
- return componentsBuilder.build();
- } catch (IOException exn) {
- throw new RuntimeException(exn);
+ return environments.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ kv -> {
+ try {
+ return resolveArtifacts(retrievalStub, kv.getValue());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
} finally {
channel.shutdown();
}