lukecwik commented on a change in pull request #11205: [BEAM-9578] Enumerating
artifacts is too expensive in Java
URL: https://github.com/apache/beam/pull/11205#discussion_r402700601
##########
File path:
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
##########
@@ -214,24 +220,87 @@ public static Environment createProcessEnvironment(
pathsToStage.addAll(stagingFiles);
}
- ImmutableList.Builder<ArtifactInformation> filesToStage =
ImmutableList.builder();
+ ImmutableList.Builder<Supplier<ArtifactInformation>> lazyArtifactsBuilder =
+ ImmutableList.builder();
for (String path : pathsToStage) {
File file = new File(path);
- if (new File(path).exists()) {
- // Spurious items get added to the classpath. Filter by just those
that exist.
- if (file.isDirectory()) {
- // Zip up directories so we can upload them to the artifact service.
- try {
- filesToStage.add(createArtifactInformation(zipDirectory(file)));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- } else {
- filesToStage.add(createArtifactInformation(file));
- }
+ // Spurious items get added to the classpath. Filter by just those that
exist.
+ if (file.exists()) {
+ ArtifactInformation.Builder artifactBuilder =
ArtifactInformation.newBuilder();
+
artifactBuilder.setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.FILE));
+
artifactBuilder.setRoleUrn(BeamUrns.getUrn(StandardArtifacts.Roles.STAGING_TO));
+ artifactBuilder.setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName(createStagingFileName(file))
+ .build()
+ .toByteString());
+ lazyArtifactsBuilder.add(
+ file.isDirectory()
+ ? () -> {
+ File zippedFile;
+ HashCode hashCode;
+ try {
+ zippedFile = zipDirectory(file);
+ hashCode =
Files.asByteSource(zippedFile).hash(Hashing.sha256());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return artifactBuilder
+ .setTypePayload(
+ RunnerApi.ArtifactFilePayload.newBuilder()
+ .setPath(zippedFile.getPath())
+ .setSha256(hashCode.toString())
+ .build()
+ .toByteString())
+ .build();
+ }
+ : () -> {
+ HashCode hashCode;
+ try {
+ hashCode = Files.asByteSource(file).hash(Hashing.sha256());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return artifactBuilder
+ .setTypePayload(
+ RunnerApi.ArtifactFilePayload.newBuilder()
+ .setPath(file.getPath())
+ .setSha256(hashCode.toString())
+ .build()
+ .toByteString())
+ .build();
+ });
}
}
- return filesToStage.build();
+
+ List<Supplier<ArtifactInformation>> lazyArtifacts =
lazyArtifactsBuilder.build();
+ String id = UUID.randomUUID().toString();
+ DefaultArtifactResolver.INSTANCE.register(
+ (info) -> {
+ if
(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED).equals(info.getTypeUrn())) {
+ RunnerApi.DeferredArtifactPayload deferredArtifactPayload;
+ try {
+ deferredArtifactPayload =
+
RunnerApi.DeferredArtifactPayload.parseFrom(info.getTypePayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Error parsing deferred artifact
payload.", e);
+ }
+ if (id.equals(deferredArtifactPayload.getKey())) {
+ return
lazyArtifacts.stream().map(Supplier::get).collect(Collectors.toList());
+ } else {
+ return ImmutableList.of();
+ }
+ } else {
+ return ImmutableList.of();
+ }
+ });
+
+ return ImmutableList.of(
+ ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED))
+ .setTypePayload(
+
RunnerApi.DeferredArtifactPayload.newBuilder().setKey(id).build().toByteString())
Review comment:
```suggestion
String key = UUID.randomUUID().toString();
DefaultArtifactResolver.INSTANCE.register(
(info) -> {
if
(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED).equals(info.getTypeUrn())) {
RunnerApi.DeferredArtifactPayload deferredArtifactPayload;
try {
deferredArtifactPayload =
RunnerApi.DeferredArtifactPayload.parseFrom(info.getTypePayload());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Error parsing deferred artifact
payload.", e);
}
if (key.equals(deferredArtifactPayload.getKey())) {
return
lazyArtifacts.stream().map(Supplier::get).collect(Collectors.toList());
} else {
return ImmutableList.of();
}
} else {
return ImmutableList.of();
}
});
return ImmutableList.of(
ArtifactInformation.newBuilder()
.setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED))
.setTypePayload(
RunnerApi.DeferredArtifactPayload.newBuilder().setKey(key).build().toByteString())
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services