lukecwik commented on a change in pull request #11205: [BEAM-9578] Enumerating 
artifacts is too expensive in Java
URL: https://github.com/apache/beam/pull/11205#discussion_r402700601
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
 ##########
 @@ -214,24 +220,87 @@ public static Environment createProcessEnvironment(
       pathsToStage.addAll(stagingFiles);
     }
 
-    ImmutableList.Builder<ArtifactInformation> filesToStage = 
ImmutableList.builder();
+    ImmutableList.Builder<Supplier<ArtifactInformation>> lazyArtifactsBuilder =
+        ImmutableList.builder();
     for (String path : pathsToStage) {
       File file = new File(path);
-      if (new File(path).exists()) {
-        // Spurious items get added to the classpath. Filter by just those 
that exist.
-        if (file.isDirectory()) {
-          // Zip up directories so we can upload them to the artifact service.
-          try {
-            filesToStage.add(createArtifactInformation(zipDirectory(file)));
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        } else {
-          filesToStage.add(createArtifactInformation(file));
-        }
+      // Spurious items get added to the classpath. Filter by just those that 
exist.
+      if (file.exists()) {
+        ArtifactInformation.Builder artifactBuilder = 
ArtifactInformation.newBuilder();
+        
artifactBuilder.setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.FILE));
+        
artifactBuilder.setRoleUrn(BeamUrns.getUrn(StandardArtifacts.Roles.STAGING_TO));
+        artifactBuilder.setRolePayload(
+            RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                .setStagedName(createStagingFileName(file))
+                .build()
+                .toByteString());
+        lazyArtifactsBuilder.add(
+            file.isDirectory()
+                ? () -> {
+                  File zippedFile;
+                  HashCode hashCode;
+                  try {
+                    zippedFile = zipDirectory(file);
+                    hashCode = 
Files.asByteSource(zippedFile).hash(Hashing.sha256());
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return artifactBuilder
+                      .setTypePayload(
+                          RunnerApi.ArtifactFilePayload.newBuilder()
+                              .setPath(zippedFile.getPath())
+                              .setSha256(hashCode.toString())
+                              .build()
+                              .toByteString())
+                      .build();
+                }
+                : () -> {
+                  HashCode hashCode;
+                  try {
+                    hashCode = Files.asByteSource(file).hash(Hashing.sha256());
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return artifactBuilder
+                      .setTypePayload(
+                          RunnerApi.ArtifactFilePayload.newBuilder()
+                              .setPath(file.getPath())
+                              .setSha256(hashCode.toString())
+                              .build()
+                              .toByteString())
+                      .build();
+                });
       }
     }
-    return filesToStage.build();
+
+    List<Supplier<ArtifactInformation>> lazyArtifacts = 
lazyArtifactsBuilder.build();
+    String id = UUID.randomUUID().toString();
+    DefaultArtifactResolver.INSTANCE.register(
+        (info) -> {
+          if 
(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED).equals(info.getTypeUrn())) {
+            RunnerApi.DeferredArtifactPayload deferredArtifactPayload;
+            try {
+              deferredArtifactPayload =
+                  
RunnerApi.DeferredArtifactPayload.parseFrom(info.getTypePayload());
+            } catch (InvalidProtocolBufferException e) {
+              throw new RuntimeException("Error parsing deferred artifact 
payload.", e);
+            }
+            if (id.equals(deferredArtifactPayload.getKey())) {
+              return 
lazyArtifacts.stream().map(Supplier::get).collect(Collectors.toList());
+            } else {
+              return ImmutableList.of();
+            }
+          } else {
+            return ImmutableList.of();
+          }
+        });
+
+    return ImmutableList.of(
+        ArtifactInformation.newBuilder()
+            .setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED))
+            .setTypePayload(
+                
RunnerApi.DeferredArtifactPayload.newBuilder().setKey(id).build().toByteString())
 
 Review comment:
   ```suggestion
       String key = UUID.randomUUID().toString();
       DefaultArtifactResolver.INSTANCE.register(
           (info) -> {
             if 
(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED).equals(info.getTypeUrn())) {
               RunnerApi.DeferredArtifactPayload deferredArtifactPayload;
               try {
                 deferredArtifactPayload =
                     
RunnerApi.DeferredArtifactPayload.parseFrom(info.getTypePayload());
               } catch (InvalidProtocolBufferException e) {
                 throw new RuntimeException("Error parsing deferred artifact 
payload.", e);
               }
               if (key.equals(deferredArtifactPayload.getKey())) {
                 return 
lazyArtifacts.stream().map(Supplier::get).collect(Collectors.toList());
               } else {
                 return ImmutableList.of();
               }
             } else {
               return ImmutableList.of();
             }
           });
   
       return ImmutableList.of(
           ArtifactInformation.newBuilder()
               .setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED))
               .setTypePayload(
                   
RunnerApi.DeferredArtifactPayload.newBuilder().setKey(key).build().toByteString())
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to